import copy
import unicodedata
from crum import get_current_request
from django.conf import settings
from django.contrib.contenttypes.models import ContentType
from django.db import models
from django.db.models import Q
from django.db.models.fields.related_descriptors import ForwardManyToOneDescriptor
from django.db.models.signals import m2m_changed
from model_utils.managers import InheritanceManager
from apis_core.apis_metainfo import signals
from apis_core.apis_metainfo.models import RootObject
from apis_core.generic.abc import GenericModel
from apis_core.history.models import VersionMixin
from apis_core.utils import DateParser
[docs]
def find_if_user_accepted():
request = get_current_request()
if request is not None:
if request.user.is_authenticated:
return {}
else:
return {"published": True}
else:
return {}
[docs]
class BaseRelationManager(models.Manager):
[docs]
def get_queryset(self):
return RelationPublishedQueryset(self.model, using=self._db)
[docs]
def filter_ann_proj(self, request=None, ann_proj=1, include_all=True):
return self.get_queryset().filter_ann_proj(
request=request, ann_proj=ann_proj, include_all=include_all
)
[docs]
def filter_for_user(self):
if hasattr(settings, "APIS_SHOW_ONLY_PUBLISHED"):
return self.get_queryset().filter_for_user()
else:
return self.get_queryset()
[docs]
class Property(RootObject):
class Meta:
verbose_name_plural = "Properties"
objects = BaseRelationManager()
property_class_uri = models.CharField(
max_length=255, verbose_name="Property Class URI", blank=True
)
# TODO RDF: Redundancy between name_forward and name, solve this.
name_forward = models.CharField(
max_length=255,
verbose_name="Name forward",
help_text='Inverse relation like: "is sub-class of" vs. "is super-class of".',
blank=True,
)
# TODO RDF: Maybe rename name to name_subj_to_obj and name_reverse to name_obj_to_subj
name_reverse = models.CharField(
max_length=255,
verbose_name="Name reverse",
help_text='Inverse relation like: "is sub-class of" vs. "is super-class of".',
blank=True,
)
subj_class = models.ManyToManyField(
ContentType,
related_name="property_set_subj",
)
obj_class = models.ManyToManyField(
ContentType,
related_name="property_set_obj",
)
def __str__(self):
return self.name_forward
[docs]
def save(self, *args, **kwargs):
if self.name_reverse == "":
self.name_reverse = f"{self.name_forward} [INVERSE]"
self.name_forward = unicodedata.normalize("NFC", str(self.name_forward))
self.name_reverse = unicodedata.normalize("NFC", str(self.name_reverse))
if (update_fields := kwargs.get("update_fields")) is not None:
if "name_forward" in update_fields and "name_reverse" not in update_fields:
modified_update_fields = set(update_fields)
modified_update_fields.add("name_reverse")
kwargs["update_fields"] = modified_update_fields
super(Property, self).save(*args, **kwargs)
return self
# TODO: comment and explain
[docs]
def subj_or_obj_class_changed(sender, is_subj, **kwargs):
def cascade_subj_obj_class_to_children(
contenttype_to_add_or_remove,
contenttype_already_saved_list,
subj_or_obj_field_function,
):
def get_all_parents(contenttype_current):
parent_list = []
class_current = contenttype_current.model_class()
for class_parent in class_current.__bases__:
# TODO: Avoid ContentType DB fetch
contenttype_parent = ContentType.objects.filter(
model=class_parent.__name__
)
if len(contenttype_parent) == 1:
contenttype_parent = contenttype_parent[0]
parent_list.append(contenttype_parent)
parent_list.extend(get_all_parents(contenttype_parent))
return parent_list
def get_all_children(contenttype_current):
child_list = []
class_current = contenttype_current.model_class()
for class_child in class_current.__subclasses__():
# TODO: Avoid ContentType DB fetch
contenttype_child = ContentType.objects.get_for_model(class_child)
child_list.append(contenttype_child)
child_list.extend(get_all_children(contenttype_child))
return child_list
parent_contenttype_list = get_all_parents(contenttype_to_add_or_remove)
for parent_contenttype in parent_contenttype_list:
if parent_contenttype in contenttype_already_saved_list:
raise Exception(
f"Pre-existing parent class found when trying to save or remove a property subject or object class."
f" The current class to be saved is '{contenttype_to_add_or_remove.model_class().__name__}',"
f" but already saved is '{parent_contenttype.model_class().__name__}'."
f" Such a save could potentially be in conflict with an ontology."
f" Better save or remove the respective top parent subject or object class from this property."
)
children_contenttype_list = get_all_children(contenttype_to_add_or_remove)
for child_contenttype in children_contenttype_list:
subj_or_obj_field_function(child_contenttype)
if kwargs["pk_set"] is not None and len(kwargs["pk_set"]) == 1:
sending_property = kwargs["instance"]
if sender == Property.subj_class.through:
subj_or_obj_field = sending_property.subj_class
elif sender == Property.obj_class.through:
subj_or_obj_field = sending_property.obj_class
else:
raise Exception
subj_or_obj_field_function = None
if kwargs["action"] == "pre_add":
subj_or_obj_field_function = subj_or_obj_field.add
elif kwargs["action"] == "post_remove":
subj_or_obj_field_function = subj_or_obj_field.remove
if subj_or_obj_field_function is not None:
cascade_subj_obj_class_to_children(
contenttype_to_add_or_remove=ContentType.objects.get(
pk=min(kwargs["pk_set"])
),
contenttype_already_saved_list=subj_or_obj_field.all(),
subj_or_obj_field_function=subj_or_obj_field_function,
)
[docs]
def subj_class_changed(sender, **kwargs):
subj_or_obj_class_changed(sender, is_subj=True, **kwargs)
[docs]
def obj_class_changed(sender, **kwargs):
subj_or_obj_class_changed(sender, is_subj=False, **kwargs)
m2m_changed.connect(subj_class_changed, sender=Property.subj_class.through)
m2m_changed.connect(obj_class_changed, sender=Property.obj_class.through)
[docs]
class RelationPublishedQueryset(models.QuerySet):
[docs]
def filter_for_user(self, *args, **kwargs):
if getattr(settings, "APIS_SHOW_ONLY_PUBLISHED", False):
request = get_current_request()
if request is not None:
if request.user.is_authenticated:
return self
else:
return self.filter(published=True)
else:
return self.filter(published=True)
else:
return self
[docs]
def filter_ann_proj(self, request=None, ann_proj=1, include_all=True):
"""The filter function provided by the manager class.
:param request: `django.request` object
:return: queryset that contains only objects that are shown in the highlighted text or those not connected
to an annotation at all.
"""
qs = self
users_show = None
if request:
ann_proj = request.session.get("annotation_project", False)
if not ann_proj:
return qs
users_show = request.session.get("users_show_highlighter", None)
query = Q(annotation__annotation_project_id=ann_proj)
if users_show is not None:
query.add(Q(annotation__user_added_id__in=users_show), Q.AND)
if include_all:
query.add(Q(annotation__annotation_project__isnull=True), Q.OR)
return qs.filter(query)
[docs]
class InheritanceForwardManyToOneDescriptor(ForwardManyToOneDescriptor):
[docs]
def get_queryset(self, **hints):
return self.field.remote_field.model.objects_inheritance.db_manager(
hints=hints
).select_subclasses()
[docs]
class InheritanceForeignKey(models.ForeignKey):
forward_related_accessor_class = InheritanceForwardManyToOneDescriptor
[docs]
class Triple(models.Model, GenericModel):
subj = InheritanceForeignKey(
RootObject,
blank=True,
null=True,
on_delete=models.CASCADE,
related_name="triple_set_from_subj",
verbose_name="Subject",
)
obj = InheritanceForeignKey(
RootObject,
blank=True,
null=True,
on_delete=models.CASCADE,
related_name="triple_set_from_obj",
verbose_name="Object",
)
prop = models.ForeignKey(
Property,
blank=True,
null=True,
on_delete=models.CASCADE,
related_name="triple_set_from_prop",
verbose_name="Property",
)
objects = BaseRelationManager()
objects_inheritance = InheritanceManager()
def __repr__(self):
try:
return f"<{self.__class__.__name__}: subj: {self.subj}, prop: {self.prop}, obj: {self.obj}>"
except RootObject.DoesNotExist:
return f"<{self.__class__.__name__}: None>"
def __str__(self):
return self.__repr__()
[docs]
def get_web_object(self):
return {
"relation_pk": self.pk,
"subj": str(self.subj),
"obj": str(self.obj),
"prop": self.prop.name_forward,
}
[docs]
def save(self, *args, **kwargs):
# TODO RDF: Integrate more proper check if subj and obj instances are of valid class as defined in prop.subj_class and prop.obj_class
# def get_all_parents(cls_current):
# parent_list = []
# for p in cls_current.__bases__:
# parent_list.append(p)
# parent_list.extend(get_all_parents(p))
# return parent_list
def get_all_childs(cls_current):
child_list = []
for p in cls_current.__subclasses__():
child_list.append(p)
child_list.extend(get_all_childs(p))
return child_list
if self.subj is None or self.obj is None or self.prop is None:
raise Exception("subj, obj, or prop is None")
if self.subj is not None:
subj_class_name = self.subj.__class__.__name__
if (
ContentType.objects.get_for_model(self.subj.__class__)
not in self.prop.subj_class.all()
):
raise Exception(
f"Subject class '{subj_class_name}' is not in valid subject class list of property '{self.prop}'"
)
if self.obj is not None:
obj_class_name = self.obj.__class__.__name__
if (
ContentType.objects.get_for_model(model=self.obj.__class__)
not in self.prop.obj_class.all()
):
raise Exception(
f"Object class '{obj_class_name}' is not in valid object class list of property '{self.prop}'"
)
super().save(*args, **kwargs)
[docs]
def duplicate(self):
origin = self.__class__
signals.pre_duplicate.send(sender=origin, instance=self)
instance = copy.copy(self)
self.pk = None
self.id = None
self._state.adding = True
duplicate = self.save()
signals.post_duplicate.send(
sender=origin, instance=instance, duplicate=duplicate
)
return duplicate
[docs]
class TempTriple(Triple, VersionMixin):
review = models.BooleanField(
default=False,
help_text="Should be set to True, if the data record holds up quality standards.",
)
start_date = models.DateField(blank=True, null=True)
start_start_date = models.DateField(blank=True, null=True)
start_end_date = models.DateField(blank=True, null=True)
end_date = models.DateField(blank=True, null=True)
end_start_date = models.DateField(blank=True, null=True)
end_end_date = models.DateField(blank=True, null=True)
start_date_written = models.CharField(
max_length=255,
blank=True,
null=True,
verbose_name="Start",
)
end_date_written = models.CharField(
max_length=255,
blank=True,
null=True,
verbose_name="End",
)
# text = models.ManyToManyField("Text", blank=True)
# collection = models.ManyToManyField("Collection")
status = models.CharField(max_length=100)
# source = models.ForeignKey(
# "Source", blank=True, null=True, on_delete=models.SET_NULL
# )
references = models.TextField(blank=True, null=True)
notes = models.TextField(blank=True, null=True)
[docs]
def save(self, parse_dates=True, *args, **kwargs):
"""Adaption of the save() method of the class to automatically parse string-dates into date objects"""
if parse_dates:
# overwrite every field with None as default
start_date = None
start_start_date = None
start_end_date = None
end_date = None
end_start_date = None
end_end_date = None
if self.start_date_written:
# If some textual user input of a start date is there, then parse it
start_date, start_start_date, start_end_date = DateParser.parse_date(
self.start_date_written
)
if self.end_date_written:
# If some textual user input of an end date is there, then parse it
end_date, end_start_date, end_end_date = DateParser.parse_date(
self.end_date_written
)
self.start_date = start_date
self.start_start_date = start_start_date
self.start_end_date = start_end_date
self.end_date = end_date
self.end_start_date = end_start_date
self.end_end_date = end_end_date
super().save(*args, **kwargs)
return self