Coverage for apis_core/generic/abc.py: 56%
176 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-09-03 06:15 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-09-03 06:15 +0000
1import logging
3from django.contrib.contenttypes.models import ContentType
4from django.db import models
5from django.db.models import BooleanField, CharField, TextField
6from django.db.models.fields.related import ForeignKey, ManyToManyField
7from django.db.models.query import QuerySet
8from django.forms import model_to_dict
9from django.urls import reverse
11from apis_core.generic.helpers import mro_paths, permission_fullname
12from apis_core.generic.signals import (
13 post_duplicate,
14 post_merge_with,
15 pre_duplicate,
16 pre_merge_with,
17)
18from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix
20logger = logging.getLogger(__name__)
23class GenericModel(models.Model):
24 class Meta:
25 abstract = True
27 def __repr__(self):
28 if id := getattr(self, "id", None):
29 return super().__repr__() + f" (ID: {id})"
30 return super().__repr__()
32 @property
33 def content_type(self):
34 return ContentType.objects.get_for_model(self)
36 @classmethod
37 def get_listview_url(cls):
38 ct = ContentType.objects.get_for_model(cls)
39 return reverse("apis_core:generic:list", args=[ct])
41 @classmethod
42 def get_createview_url(cls):
43 ct = ContentType.objects.get_for_model(cls)
44 return reverse("apis_core:generic:create", args=[ct])
46 @classmethod
47 def get_importview_url(cls):
48 ct = ContentType.objects.get_for_model(cls)
49 return reverse("apis_core:generic:import", args=[ct])
51 @classmethod
52 def get_openapi_tags(cls):
53 return [item[-1] for item in mro_paths(cls)]
55 @classmethod
56 def get_namespace_prefix(cls):
57 ct = ContentType.objects.get_for_model(cls)
58 return f"{rdf_namespace_prefix()}-{ct.model}"
60 @classmethod
61 def get_namespace_uri(cls):
62 return apis_base_uri() + cls.get_listview_url()
64 @classmethod
65 def get_rdf_types(cls):
66 return []
68 def get_edit_url(self):
69 ct = ContentType.objects.get_for_model(self)
70 return reverse("apis_core:generic:update", args=[ct, self.id])
72 def get_duplicate_url(self):
73 ct = ContentType.objects.get_for_model(self)
74 return reverse("apis_core:generic:duplicate", args=[ct, self.id])
76 def get_enrich_url(self):
77 ct = ContentType.objects.get_for_model(self)
78 return reverse("apis_core:generic:enrich", args=[ct, self.id])
80 def get_absolute_url(self):
81 ct = ContentType.objects.get_for_model(self)
82 return reverse("apis_core:generic:detail", args=[ct, self.id])
84 def get_delete_url(self):
85 ct = ContentType.objects.get_for_model(self)
86 return reverse("apis_core:generic:delete", args=[ct, self.id])
88 def get_merge_url(self, other_id):
89 ct = ContentType.objects.get_for_model(self)
90 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])
92 def get_select_merge_or_enrich_url(self):
93 ct = ContentType.objects.get_for_model(self)
94 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])
96 def get_create_success_url(self):
97 return self.get_absolute_url()
99 def get_update_success_url(self):
100 return self.get_edit_url()
102 def get_api_detail_endpoint(self):
103 ct = ContentType.objects.get_for_model(self)
104 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])
106 @classmethod
107 def get_change_permission(self):
108 return permission_fullname("change", self)
110 @classmethod
111 def get_add_permission(self):
112 return permission_fullname("add", self)
114 @classmethod
115 def get_delete_permission(self):
116 return permission_fullname("delete", self)
118 @classmethod
119 def get_view_permission(self):
120 return permission_fullname("view", self)
122 @classmethod
123 def get_verbose_name_plural(cls):
124 return cls._meta.verbose_name_plural
126 @classmethod
127 def get_verbose_name(cls):
128 return cls._meta.verbose_name
130 def get_merge_charfield_value(self, other: CharField, field: CharField):
131 res = getattr(self, field.name)
132 if not field.choices:
133 otherres = getattr(other, field.name, res)
134 if otherres and otherres != res:
135 res += f" ({otherres})"
136 return res
138 def get_merge_textfield_value(self, other: TextField, field: TextField):
139 res = getattr(self, field.name)
140 if getattr(other, field.name):
141 # if own value is None, fallback to empty string
142 res = res or ""
143 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name)
144 return res
146 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
147 return getattr(other, field.name)
149 def get_field_value_after_merge(self, other, field):
150 """
151 This method finds the value of a field after merging `other` into `self`.
152 It first tries to find a merge method that is specific to that field
153 (merge_{fieldname}) and then tries to find a method that is specific to
154 the type of the field (merge_{fieldtype})
155 If neither of those exist, it uses the others field value if the field
156 in self is not set, otherwise it keeps the value in self.
157 """
158 fieldtype = field.get_internal_type().lower()
159 # if there is a `get_merge_{fieldname}` method in this model, use that one
160 if callable(getattr(self, f"get_merge_{field.name}_value", None)):
161 return getattr(self, f"get_merge_{field.name}_value")(other)
162 # otherwise we check if there is a method for the field type and use that one
163 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
164 return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
165 else:
166 if not getattr(self, field.name):
167 return getattr(other, field.name)
168 return getattr(self, field.name)
170 def merge_fields(self, other):
171 """
172 This method iterates through the model fields and uses the
173 `get_field_value_after_merge` method to copy values from `other` to `self`.
174 It is called by the `merge_with` method.
175 """
176 for field in self._meta.fields:
177 newval = self.get_field_value_after_merge(other, field)
178 if newval != getattr(self, field.name):
179 setattr(self, field.name, newval)
180 self.save()
182 def merge_with(self, entities):
183 if self in entities:
184 entities.remove(self)
185 origin = self.__class__
186 pre_merge_with.send(sender=origin, instance=self, entities=entities)
188 e_a = type(self).__name__
189 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
190 if isinstance(entities, int):
191 entities = self_model_class.objects.get(pk=entities)
192 if not isinstance(entities, list) and not isinstance(entities, QuerySet):
193 entities = [entities]
194 entities = [
195 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
196 for ent in entities
197 ]
198 for ent in entities:
199 e_b = type(ent).__name__
200 if e_a != e_b:
201 continue
202 for f in ent._meta.local_many_to_many:
203 if not f.name.endswith("_set"):
204 sl = list(getattr(self, f.name).all())
205 for s in getattr(ent, f.name).all():
206 if s not in sl:
207 getattr(self, f.name).add(s)
209 for ent in entities:
210 self.merge_fields(ent)
212 post_merge_with.send(sender=origin, instance=self, entities=entities)
214 for ent in entities:
215 ent.delete()
217 def duplicate(self):
218 origin = self.__class__
219 pre_duplicate.send(sender=origin, instance=self)
220 # usually, copying instances would work like
221 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
222 # but we are working with abstract classes,
223 # so we have to do it by hand using model_to_dict:(
224 objdict = model_to_dict(self)
226 # remove unique fields from dict representation
227 unique_fields = [field for field in self._meta.fields if field.unique]
228 for field in unique_fields:
229 logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
230 objdict.pop(field.name, None)
232 # remove related fields from dict representation
233 related_fields = [
234 field for field in self._meta.get_fields() if field.is_relation
235 ]
236 for field in related_fields:
237 objdict.pop(field.name, None)
239 newobj = type(self).objects.create(**objdict)
241 for field in related_fields:
242 # we are not using `isinstance` because we want to
243 # differentiate between different levels of inheritance
244 if type(field) is ForeignKey:
245 setattr(newobj, field.name, getattr(self, field.name))
246 if type(field) is ManyToManyField:
247 objfield = getattr(newobj, field.name)
248 values = getattr(self, field.name).all()
249 objfield.set(values)
251 newobj.save()
252 post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
253 return newobj
255 duplicate.alters_data = True
257 def uri_set(self):
258 ct = ContentType.objects.get_for_model(self)
259 return (
260 ContentType.objects.get(app_label="uris", model="uri")
261 .model_class()
262 .objects.filter(content_type=ct, object_id=self.id)
263 .all()
264 )