Coverage for apis_core/generic/abc.py: 58%
222 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-10 13:36 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-10-10 13:36 +0000
1import logging
2import re
4from AcdhArcheAssets.uri_norm_rules import get_normalized_uri
5from django.contrib.contenttypes.models import ContentType
6from django.core.exceptions import ImproperlyConfigured
7from django.db import models
8from django.db.models import BooleanField, CharField, TextField
9from django.db.models.fields.related import ForeignKey, ManyToManyField
10from django.db.models.query import QuerySet
11from django.forms import model_to_dict
12from django.urls import reverse
14from apis_core.generic.helpers import mro_paths, permission_fullname
15from apis_core.generic.signals import (
16 post_duplicate,
17 post_merge_with,
18 pre_duplicate,
19 pre_import_from,
20 pre_merge_with,
21)
22from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix
24logger = logging.getLogger(__name__)
27class GenericModel(models.Model):
28 class Meta:
29 abstract = True
31 def __repr__(self):
32 if id := getattr(self, "id", None):
33 return super().__repr__() + f" (ID: {id})"
34 return super().__repr__()
36 @property
37 def content_type(self):
38 return ContentType.objects.get_for_model(self)
40 @classmethod
41 def get_listview_url(cls):
42 ct = ContentType.objects.get_for_model(cls)
43 return reverse("apis_core:generic:list", args=[ct])
45 @classmethod
46 def get_createview_url(cls):
47 ct = ContentType.objects.get_for_model(cls)
48 return reverse("apis_core:generic:create", args=[ct])
50 @classmethod
51 def get_importview_url(cls):
52 ct = ContentType.objects.get_for_model(cls)
53 return reverse("apis_core:generic:import", args=[ct])
55 @classmethod
56 def get_openapi_tags(cls):
57 return [item[-1] for item in mro_paths(cls)]
59 @classmethod
60 def get_namespace_prefix(cls):
61 ct = ContentType.objects.get_for_model(cls)
62 return f"{rdf_namespace_prefix()}-{ct.model}"
64 @classmethod
65 def get_namespace_uri(cls):
66 return apis_base_uri() + cls.get_listview_url()
68 @classmethod
69 def get_rdf_types(cls):
70 return []
72 def get_edit_url(self):
73 ct = ContentType.objects.get_for_model(self)
74 return reverse("apis_core:generic:update", args=[ct, self.id])
76 def get_duplicate_url(self):
77 ct = ContentType.objects.get_for_model(self)
78 return reverse("apis_core:generic:duplicate", args=[ct, self.id])
80 def get_enrich_url(self):
81 ct = ContentType.objects.get_for_model(self)
82 return reverse("apis_core:generic:enrich", args=[ct, self.id])
84 def get_absolute_url(self):
85 ct = ContentType.objects.get_for_model(self)
86 return reverse("apis_core:generic:detail", args=[ct, self.id])
88 def get_delete_url(self):
89 ct = ContentType.objects.get_for_model(self)
90 return reverse("apis_core:generic:delete", args=[ct, self.id])
92 def get_merge_url(self, other_id):
93 ct = ContentType.objects.get_for_model(self)
94 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])
96 def get_select_merge_or_enrich_url(self):
97 ct = ContentType.objects.get_for_model(self)
98 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])
100 def get_create_success_url(self):
101 return self.get_absolute_url()
103 def get_update_success_url(self):
104 return self.get_edit_url()
106 def get_api_detail_endpoint(self):
107 ct = ContentType.objects.get_for_model(self)
108 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])
110 @classmethod
111 def get_change_permission(self):
112 return permission_fullname("change", self)
114 @classmethod
115 def get_add_permission(self):
116 return permission_fullname("add", self)
118 @classmethod
119 def get_delete_permission(self):
120 return permission_fullname("delete", self)
122 @classmethod
123 def get_view_permission(self):
124 return permission_fullname("view", self)
126 @classmethod
127 def get_verbose_name_plural(cls):
128 return cls._meta.verbose_name_plural
130 @classmethod
131 def get_verbose_name(cls):
132 return cls._meta.verbose_name
134 @classmethod
135 def valid_import_url(cls, uri: str):
136 """
137 Check if an URI is a can be imported.
138 The exact fetching logic for an URI is defined in the
139 `import_definitions` attribute of the class.
140 `import_definitions` has to be a dict, mapping a regex
141 matching the URI to a callable taking the URI as an argument.
142 This method check if there is a callable defined for this URI.
143 """
144 uri = get_normalized_uri(uri)
145 for regex, fn in getattr(cls, "import_definitions", {}).items():
146 if re.match(regex, uri):
147 return fn
148 return False
150 @classmethod
151 def fetch_from(cls, uri: str):
152 """
153 Fetch data from an URI. Check if there is import logic
154 configured for this URI and if so, use that import logic
155 to fetch the data.
156 """
157 uri = get_normalized_uri(uri)
158 if fn := cls.valid_import_url(uri):
159 return fn(uri) or {}
160 raise ImproperlyConfigured(f"Import not configured for URI {uri}")
162 @classmethod
163 def import_from(cls, uri: str, allow_empty: bool = True):
164 """
165 Fetch data from an URI and create a model instance using
166 that data. If the `allow_empty` argument is set, this also
167 creates a model instance if the data fetched was empty. This
168 might make sense if you still want to create an instance and
169 attach the URI to it.
170 """
171 uri = get_normalized_uri(uri)
172 # we allow other apps to injercept the import
173 # whatever they return will be used instead of
174 # creating a new object
175 for receiver, response in pre_import_from.send(sender=cls, uri=uri):
176 if response:
177 return response
178 data = cls.fetch_from(uri) or {}
179 if allow_empty or data:
180 instance = cls()
181 instance.import_data(data)
182 instance._uris = [uri]
183 instance.save()
184 return instance
185 raise ValueError(f"Could not fetch data to import from {uri}")
187 def import_from_dict_subset(self, **data) -> dict:
188 """
189 Import attributes of this instance from data in a dict.
190 We iterate through the individual values of the dict and
191 a) only set them if the instance has an attribute matching
192 the key and b) use the fields `clean` method to check if
193 the value validates. If it does not validate, we return
194 the validation error in the errors dict.
195 """
196 errors = {}
197 if data:
198 for field in self._meta.fields:
199 if data.get(field.name, False):
200 value = str(data[field.name][0])
201 try:
202 field.clean(self, value)
203 except Exception as e:
204 logger.info(
205 "Could not set %s on %s: %s", field.name, str(self), str(e)
206 )
207 errors[field.name] = str(e)
208 else:
209 setattr(self, field.name, value)
210 self.save()
211 return errors
213 def import_data(self, data) -> dict:
214 return self.import_from_dict_subset(**data)
216 def get_merge_charfield_value(self, other: CharField, field: CharField):
217 res = getattr(self, field.name)
218 if not field.choices:
219 otherres = getattr(other, field.name, res)
220 if otherres and otherres != res:
221 res += f" ({otherres})"
222 return res
224 def get_merge_textfield_value(self, other: TextField, field: TextField):
225 res = getattr(self, field.name)
226 if getattr(other, field.name):
227 # if own value is None, fallback to empty string
228 res = res or ""
229 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name)
230 return res
232 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
233 return getattr(other, field.name)
235 def get_field_value_after_merge(self, other, field):
236 """
237 This method finds the value of a field after merging `other` into `self`.
238 It first tries to find a merge method that is specific to that field
239 (merge_{fieldname}) and then tries to find a method that is specific to
240 the type of the field (merge_{fieldtype})
241 If neither of those exist, it uses the others field value if the field
242 in self is not set, otherwise it keeps the value in self.
243 """
244 fieldtype = field.get_internal_type().lower()
245 # if there is a `get_merge_{fieldname}` method in this model, use that one
246 if callable(getattr(self, f"get_merge_{field.name}_value", None)):
247 return getattr(self, f"get_merge_{field.name}_value")(other)
248 # otherwise we check if there is a method for the field type and use that one
249 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
250 return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
251 else:
252 if not getattr(self, field.name):
253 return getattr(other, field.name)
254 return getattr(self, field.name)
256 def merge_fields(self, other):
257 """
258 This method iterates through the model fields and uses the
259 `get_field_value_after_merge` method to copy values from `other` to `self`.
260 It is called by the `merge_with` method.
261 """
262 for field in self._meta.fields:
263 newval = self.get_field_value_after_merge(other, field)
264 if newval != getattr(self, field.name):
265 setattr(self, field.name, newval)
266 self.save()
268 def merge_with(self, entities):
269 if self in entities:
270 entities.remove(self)
271 origin = self.__class__
272 pre_merge_with.send(sender=origin, instance=self, entities=entities)
274 e_a = type(self).__name__
275 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
276 if isinstance(entities, int):
277 entities = self_model_class.objects.get(pk=entities)
278 if not isinstance(entities, list) and not isinstance(entities, QuerySet):
279 entities = [entities]
280 entities = [
281 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
282 for ent in entities
283 ]
284 for ent in entities:
285 e_b = type(ent).__name__
286 if e_a != e_b:
287 continue
288 for f in ent._meta.local_many_to_many:
289 if not f.name.endswith("_set"):
290 sl = list(getattr(self, f.name).all())
291 for s in getattr(ent, f.name).all():
292 if s not in sl:
293 getattr(self, f.name).add(s)
295 for ent in entities:
296 self.merge_fields(ent)
298 post_merge_with.send(sender=origin, instance=self, entities=entities)
300 for ent in entities:
301 ent.delete()
303 def duplicate(self):
304 origin = self.__class__
305 pre_duplicate.send(sender=origin, instance=self)
306 # usually, copying instances would work like
307 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
308 # but we are working with abstract classes,
309 # so we have to do it by hand using model_to_dict:(
310 objdict = model_to_dict(self)
312 # remove unique fields from dict representation
313 unique_fields = [field for field in self._meta.fields if field.unique]
314 for field in unique_fields:
315 logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
316 objdict.pop(field.name, None)
318 # remove related fields from dict representation
319 related_fields = [
320 field for field in self._meta.get_fields() if field.is_relation
321 ]
322 for field in related_fields:
323 objdict.pop(field.name, None)
325 newobj = type(self).objects.create(**objdict)
327 for field in related_fields:
328 # we are not using `isinstance` because we want to
329 # differentiate between different levels of inheritance
330 if type(field) is ForeignKey:
331 setattr(newobj, field.name, getattr(self, field.name))
332 if type(field) is ManyToManyField:
333 objfield = getattr(newobj, field.name)
334 values = getattr(self, field.name).all()
335 objfield.set(values)
337 newobj.save()
338 post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
339 return newobj
341 duplicate.alters_data = True
343 def uri_set(self):
344 ct = ContentType.objects.get_for_model(self)
345 return (
346 ContentType.objects.get(app_label="uris", model="uri")
347 .model_class()
348 .objects.filter(content_type=ct, object_id=self.id)
349 .all()
350 )