Coverage for apis_core/apis_relations/models.py: 65%
206 statements
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:51 +0000
« prev ^ index » next coverage.py v7.6.4, created at 2024-11-22 07:51 +0000
1import copy
2import unicodedata
4from crum import get_current_request
5from django.conf import settings
6from django.contrib.contenttypes.models import ContentType
7from django.db import models
8from django.db.models import Q
9from django.db.models.fields.related_descriptors import ForwardManyToOneDescriptor
10from django.db.models.signals import m2m_changed
11from model_utils.managers import InheritanceManager
13from apis_core.apis_metainfo import signals
14from apis_core.apis_metainfo.models import RootObject
15from apis_core.generic.abc import GenericModel
16from apis_core.history.models import VersionMixin
17from apis_core.utils import DateParser
20def find_if_user_accepted():
21 request = get_current_request()
22 if request is not None:
23 if request.user.is_authenticated:
24 return {}
25 else:
26 return {"published": True}
27 else:
28 return {}
31class BaseRelationManager(models.Manager):
32 def get_queryset(self):
33 return RelationPublishedQueryset(self.model, using=self._db)
35 def filter_ann_proj(self, request=None, ann_proj=1, include_all=True):
36 return self.get_queryset().filter_ann_proj(
37 request=request, ann_proj=ann_proj, include_all=include_all
38 )
40 def filter_for_user(self):
41 if hasattr(settings, "APIS_SHOW_ONLY_PUBLISHED"):
42 return self.get_queryset().filter_for_user()
43 else:
44 return self.get_queryset()
47class Property(RootObject):
48 class Meta:
49 verbose_name_plural = "Properties"
51 objects = BaseRelationManager()
52 property_class_uri = models.CharField(
53 max_length=255, verbose_name="Property Class URI", blank=True
54 )
55 # TODO RDF: Redundancy between name_forward and name, solve this.
56 name_forward = models.CharField(
57 max_length=255,
58 verbose_name="Name forward",
59 help_text='Inverse relation like: "is sub-class of" vs. "is super-class of".',
60 blank=True,
61 )
62 # TODO RDF: Maybe rename name to name_subj_to_obj and name_reverse to name_obj_to_subj
63 name_reverse = models.CharField(
64 max_length=255,
65 verbose_name="Name reverse",
66 help_text='Inverse relation like: "is sub-class of" vs. "is super-class of".',
67 blank=True,
68 )
69 subj_class = models.ManyToManyField(
70 ContentType,
71 related_name="property_set_subj",
72 )
73 obj_class = models.ManyToManyField(
74 ContentType,
75 related_name="property_set_obj",
76 )
78 def __str__(self):
79 return self.name_forward
81 def save(self, *args, **kwargs):
82 if self.name_reverse == "":
83 self.name_reverse = f"{self.name_forward} [INVERSE]"
85 self.name_forward = unicodedata.normalize("NFC", str(self.name_forward))
86 self.name_reverse = unicodedata.normalize("NFC", str(self.name_reverse))
88 if (update_fields := kwargs.get("update_fields")) is not None:
89 if "name_forward" in update_fields and "name_reverse" not in update_fields:
90 modified_update_fields = set(update_fields)
91 modified_update_fields.add("name_reverse")
92 kwargs["update_fields"] = modified_update_fields
94 super(Property, self).save(*args, **kwargs)
95 return self
98# TODO: comment and explain
99def subj_or_obj_class_changed(sender, is_subj, **kwargs):
100 def cascade_subj_obj_class_to_children(
101 contenttype_to_add_or_remove,
102 contenttype_already_saved_list,
103 subj_or_obj_field_function,
104 ):
105 def get_all_parents(contenttype_current):
106 parent_list = []
107 class_current = contenttype_current.model_class()
108 for class_parent in class_current.__bases__:
109 # TODO: Avoid ContentType DB fetch
110 contenttype_parent = ContentType.objects.filter(
111 model=class_parent.__name__
112 )
113 if len(contenttype_parent) == 1:
114 contenttype_parent = contenttype_parent[0]
115 parent_list.append(contenttype_parent)
116 parent_list.extend(get_all_parents(contenttype_parent))
118 return parent_list
120 def get_all_children(contenttype_current):
121 child_list = []
122 class_current = contenttype_current.model_class()
123 for class_child in class_current.__subclasses__():
124 # TODO: Avoid ContentType DB fetch
125 contenttype_child = ContentType.objects.get_for_model(class_child)
126 child_list.append(contenttype_child)
127 child_list.extend(get_all_children(contenttype_child))
129 return child_list
131 parent_contenttype_list = get_all_parents(contenttype_to_add_or_remove)
132 for parent_contenttype in parent_contenttype_list:
133 if parent_contenttype in contenttype_already_saved_list:
134 raise Exception(
135 f"Pre-existing parent class found when trying to save or remove a property subject or object class."
136 f" The current class to be saved is '{contenttype_to_add_or_remove.model_class().__name__}',"
137 f" but already saved is '{parent_contenttype.model_class().__name__}'."
138 f" Such a save could potentially be in conflict with an ontology."
139 f" Better save or remove the respective top parent subject or object class from this property."
140 )
141 children_contenttype_list = get_all_children(contenttype_to_add_or_remove)
142 for child_contenttype in children_contenttype_list:
143 subj_or_obj_field_function(child_contenttype)
145 if kwargs["pk_set"] is not None and len(kwargs["pk_set"]) == 1:
146 sending_property = kwargs["instance"]
147 if sender == Property.subj_class.through:
148 subj_or_obj_field = sending_property.subj_class
149 elif sender == Property.obj_class.through:
150 subj_or_obj_field = sending_property.obj_class
151 else:
152 raise Exception
153 subj_or_obj_field_function = None
154 if kwargs["action"] == "pre_add":
155 subj_or_obj_field_function = subj_or_obj_field.add
156 elif kwargs["action"] == "post_remove":
157 subj_or_obj_field_function = subj_or_obj_field.remove
158 if subj_or_obj_field_function is not None:
159 cascade_subj_obj_class_to_children(
160 contenttype_to_add_or_remove=ContentType.objects.get(
161 pk=min(kwargs["pk_set"])
162 ),
163 contenttype_already_saved_list=subj_or_obj_field.all(),
164 subj_or_obj_field_function=subj_or_obj_field_function,
165 )
168def subj_class_changed(sender, **kwargs):
169 subj_or_obj_class_changed(sender, is_subj=True, **kwargs)
172def obj_class_changed(sender, **kwargs):
173 subj_or_obj_class_changed(sender, is_subj=False, **kwargs)
176m2m_changed.connect(subj_class_changed, sender=Property.subj_class.through)
177m2m_changed.connect(obj_class_changed, sender=Property.obj_class.through)
180class RelationPublishedQueryset(models.QuerySet):
181 def filter_for_user(self, *args, **kwargs):
182 if getattr(settings, "APIS_SHOW_ONLY_PUBLISHED", False):
183 request = get_current_request()
184 if request is not None:
185 if request.user.is_authenticated:
186 return self
187 else:
188 return self.filter(published=True)
189 else:
190 return self.filter(published=True)
191 else:
192 return self
194 def filter_ann_proj(self, request=None, ann_proj=1, include_all=True):
195 """The filter function provided by the manager class.
197 :param request: `django.request` object
198 :return: queryset that contains only objects that are shown in the highlighted text or those not connected
199 to an annotation at all.
200 """
201 qs = self
202 users_show = None
203 if request:
204 ann_proj = request.session.get("annotation_project", False)
205 if not ann_proj:
206 return qs
207 users_show = request.session.get("users_show_highlighter", None)
208 query = Q(annotation__annotation_project_id=ann_proj)
209 if users_show is not None:
210 query.add(Q(annotation__user_added_id__in=users_show), Q.AND)
211 if include_all:
212 query.add(Q(annotation__annotation_project__isnull=True), Q.OR)
213 return qs.filter(query)
216class InheritanceForwardManyToOneDescriptor(ForwardManyToOneDescriptor):
217 def get_queryset(self, **hints):
218 return self.field.remote_field.model.objects_inheritance.db_manager(
219 hints=hints
220 ).select_subclasses()
223class InheritanceForeignKey(models.ForeignKey):
224 forward_related_accessor_class = InheritanceForwardManyToOneDescriptor
227class Triple(models.Model, GenericModel):
228 subj = InheritanceForeignKey(
229 RootObject,
230 blank=True,
231 null=True,
232 on_delete=models.CASCADE,
233 related_name="triple_set_from_subj",
234 verbose_name="Subject",
235 )
236 obj = InheritanceForeignKey(
237 RootObject,
238 blank=True,
239 null=True,
240 on_delete=models.CASCADE,
241 related_name="triple_set_from_obj",
242 verbose_name="Object",
243 )
244 prop = models.ForeignKey(
245 Property,
246 blank=True,
247 null=True,
248 on_delete=models.CASCADE,
249 related_name="triple_set_from_prop",
250 verbose_name="Property",
251 )
253 objects = BaseRelationManager()
254 objects_inheritance = InheritanceManager()
256 def __repr__(self):
257 try:
258 return f"<{self.__class__.__name__}: subj: {self.subj}, prop: {self.prop}, obj: {self.obj}>"
259 except RootObject.DoesNotExist:
260 return f"<{self.__class__.__name__}: None>"
262 def __str__(self):
263 return self.__repr__()
265 def get_web_object(self):
266 return {
267 "relation_pk": self.pk,
268 "subj": str(self.subj),
269 "obj": str(self.obj),
270 "prop": self.prop.name_forward,
271 }
273 def save(self, *args, **kwargs):
274 # TODO RDF: Integrate more proper check if subj and obj instances are of valid class as defined in prop.subj_class and prop.obj_class
275 # def get_all_parents(cls_current):
276 # parent_list = []
277 # for p in cls_current.__bases__:
278 # parent_list.append(p)
279 # parent_list.extend(get_all_parents(p))
280 # return parent_list
282 def get_all_childs(cls_current):
283 child_list = []
284 for p in cls_current.__subclasses__():
285 child_list.append(p)
286 child_list.extend(get_all_childs(p))
288 return child_list
290 if self.subj is None or self.obj is None or self.prop is None:
291 raise Exception("subj, obj, or prop is None")
292 if self.subj is not None:
293 subj_class_name = self.subj.__class__.__name__
294 if (
295 ContentType.objects.get_for_model(self.subj.__class__)
296 not in self.prop.subj_class.all()
297 ):
298 raise Exception(
299 f"Subject class '{subj_class_name}' is not in valid subject class list of property '{self.prop}'"
300 )
301 if self.obj is not None:
302 obj_class_name = self.obj.__class__.__name__
303 if (
304 ContentType.objects.get_for_model(model=self.obj.__class__)
305 not in self.prop.obj_class.all()
306 ):
307 raise Exception(
308 f"Object class '{obj_class_name}' is not in valid object class list of property '{self.prop}'"
309 )
311 super().save(*args, **kwargs)
313 def duplicate(self):
314 origin = self.__class__
315 signals.pre_duplicate.send(sender=origin, instance=self)
317 instance = copy.copy(self)
319 self.pk = None
320 self.id = None
321 self._state.adding = True
322 duplicate = self.save()
324 signals.post_duplicate.send(
325 sender=origin, instance=instance, duplicate=duplicate
326 )
327 return duplicate
330class TempTriple(Triple, VersionMixin):
331 review = models.BooleanField(
332 default=False,
333 help_text="Should be set to True, if the data record holds up quality standards.",
334 )
335 start_date = models.DateField(blank=True, null=True)
336 start_start_date = models.DateField(blank=True, null=True)
337 start_end_date = models.DateField(blank=True, null=True)
338 end_date = models.DateField(blank=True, null=True)
339 end_start_date = models.DateField(blank=True, null=True)
340 end_end_date = models.DateField(blank=True, null=True)
341 start_date_written = models.CharField(
342 max_length=255,
343 blank=True,
344 null=True,
345 verbose_name="Start",
346 )
347 end_date_written = models.CharField(
348 max_length=255,
349 blank=True,
350 null=True,
351 verbose_name="End",
352 )
353 # text = models.ManyToManyField("Text", blank=True)
354 # collection = models.ManyToManyField("Collection")
355 status = models.CharField(max_length=100)
356 # source = models.ForeignKey(
357 # "Source", blank=True, null=True, on_delete=models.SET_NULL
358 # )
359 references = models.TextField(blank=True, null=True)
360 notes = models.TextField(blank=True, null=True)
362 def save(self, parse_dates=True, *args, **kwargs):
363 """Adaption of the save() method of the class to automatically parse string-dates into date objects"""
365 if parse_dates:
366 # overwrite every field with None as default
367 start_date = None
368 start_start_date = None
369 start_end_date = None
370 end_date = None
371 end_start_date = None
372 end_end_date = None
374 if self.start_date_written:
375 # If some textual user input of a start date is there, then parse it
377 start_date, start_start_date, start_end_date = DateParser.parse_date(
378 self.start_date_written
379 )
381 if self.end_date_written:
382 # If some textual user input of an end date is there, then parse it
384 end_date, end_start_date, end_end_date = DateParser.parse_date(
385 self.end_date_written
386 )
388 self.start_date = start_date
389 self.start_start_date = start_start_date
390 self.start_end_date = start_end_date
391 self.end_date = end_date
392 self.end_start_date = end_start_date
393 self.end_end_date = end_end_date
395 super().save(*args, **kwargs)
397 return self