import logging
from django.contrib.contenttypes.models import ContentType
from django.db.models import BooleanField, CharField, TextField
from django.db.models.fields.related import ForeignKey, ManyToManyField
from django.db.models.query import QuerySet
from django.forms import model_to_dict
from django.urls import reverse
from apis_core.generic.helpers import permission_fullname
from apis_core.generic.signals import (
post_duplicate,
post_merge_with,
pre_duplicate,
pre_merge_with,
)
logger = logging.getLogger(__name__)
[docs]
class GenericModel:
def __repr__(self):
if id := getattr(self, "id", None):
return super().__repr__() + f" (ID: {id})"
return super().__repr__()
[docs]
@classmethod
def get_listview_url(cls):
ct = ContentType.objects.get_for_model(cls)
return reverse("apis_core:generic:list", args=[ct])
[docs]
@classmethod
def get_createview_url(cls):
ct = ContentType.objects.get_for_model(cls)
return reverse("apis_core:generic:create", args=[ct])
[docs]
@classmethod
def get_importview_url(cls):
ct = ContentType.objects.get_for_model(cls)
return reverse("apis_core:generic:import", args=[ct])
[docs]
def get_edit_url(self):
ct = ContentType.objects.get_for_model(self)
return reverse("apis_core:generic:update", args=[ct, self.id])
[docs]
def get_enrich_url(self):
ct = ContentType.objects.get_for_model(self)
return reverse("apis_core:generic:enrich", args=[ct, self.id])
[docs]
def get_absolute_url(self):
ct = ContentType.objects.get_for_model(self)
return reverse("apis_core:generic:detail", args=[ct, self.id])
[docs]
def get_delete_url(self):
ct = ContentType.objects.get_for_model(self)
return reverse("apis_core:generic:delete", args=[ct, self.id])
[docs]
def get_merge_url(self, other_id):
ct = ContentType.objects.get_for_model(self)
return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])
[docs]
def get_select_merge_or_enrich_url(self):
ct = ContentType.objects.get_for_model(self)
return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])
[docs]
def get_create_success_url(self):
return self.get_absolute_url()
[docs]
def get_update_success_url(self):
return self.get_edit_url()
[docs]
def get_api_detail_endpoint(self):
ct = ContentType.objects.get_for_model(self)
return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])
[docs]
@classmethod
def get_change_permission(self):
return permission_fullname("change", self)
[docs]
@classmethod
def get_add_permission(self):
return permission_fullname("add", self)
[docs]
@classmethod
def get_delete_permission(self):
return permission_fullname("delete", self)
[docs]
def get_merge_charfield_value(self, other: CharField, field: CharField):
res = getattr(self, field.name)
if not field.choices:
otherres = getattr(other, field.name, res)
if otherres != res:
res += f" ({otherres})"
return res
[docs]
def get_merge_textfield_value(self, other: TextField, field: TextField):
res = getattr(self, field.name)
if getattr(other, field.name):
res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name)
return res
[docs]
def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
return getattr(other, field.name)
[docs]
def get_field_value_after_merge(self, other, field):
"""
This method finds the value of a field after merging `other` into `self`.
It first tries to find a merge method that is specific to that field
(merge_{fieldname}) and then tries to find a method that is specific to
the type of the field (merge_{fieldtype})
If neither of those exist, it uses the others field value if the field
in self is not set, otherwise it keeps the value in self.
"""
fieldtype = field.get_internal_type().lower()
# if there is a `get_merge_{fieldname}` method in this model, use that one
if callable(getattr(self, f"get_merge_{field.name}_value", None)):
return getattr(self, f"get_merge_{field.name}_value")(other)
# otherwise we check if there is a method for the field type and use that one
elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
else:
if not getattr(self, field.name):
return getattr(other, field.name)
return getattr(self, field.name)
[docs]
def merge_fields(self, other):
"""
This method iterates through the model fields and uses the
`get_field_value_after_merge` method to copy values from `other` to `self`.
It is called by the `merge_with` method.
"""
for field in self._meta.fields:
newval = self.get_field_value_after_merge(other, field)
if newval != getattr(self, field.name):
setattr(self, field.name, newval)
self.save()
[docs]
def merge_with(self, entities):
if self in entities:
entities.remove(self)
origin = self.__class__
pre_merge_with.send(sender=origin, instance=self, entities=entities)
# TODO: check if these imports can be put to top of module without
# causing circular import issues.
from apis_core.apis_metainfo.models import Uri
e_a = type(self).__name__
self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
if isinstance(entities, int):
entities = self_model_class.objects.get(pk=entities)
if not isinstance(entities, list) and not isinstance(entities, QuerySet):
entities = [entities]
entities = [
self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
for ent in entities
]
for ent in entities:
e_b = type(ent).__name__
if e_a != e_b:
continue
for f in ent._meta.local_many_to_many:
if not f.name.endswith("_set"):
sl = list(getattr(self, f.name).all())
for s in getattr(ent, f.name).all():
if s not in sl:
getattr(self, f.name).add(s)
Uri.objects.filter(root_object=ent).update(root_object=self)
for ent in entities:
self.merge_fields(ent)
post_merge_with.send(sender=origin, instance=self, entities=entities)
for ent in entities:
ent.delete()
[docs]
def duplicate(self):
origin = self.__class__
pre_duplicate.send(sender=origin, instance=self)
# usually, copying instances would work like
# https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
# but we are working with abstract classes,
# so we have to do it by hand using model_to_dict:(
objdict = model_to_dict(self)
# remove unique fields from dict representation
unique_fields = [field for field in self._meta.fields if field.unique]
for field in unique_fields:
logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
objdict.pop(field.name, None)
# remove related fields from dict representation
related_fields = [
field for field in self._meta.get_fields() if field.is_relation
]
for field in related_fields:
objdict.pop(field.name, None)
newobj = type(self).objects.create(**objdict)
for field in related_fields:
# we are not using `isinstance` because we want to
# differentiate between different levels of inheritance
if type(field) is ForeignKey:
setattr(newobj, field.name, getattr(self, field.name))
if type(field) is ManyToManyField:
objfield = getattr(newobj, field.name)
values = getattr(self, field.name).all()
objfield.set(values)
newobj.save()
post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
return newobj
duplicate.alters_data = True