Coverage for apis_core / generic / abc.py: 59%
232 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 14:27 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-30 14:27 +0000
1import logging
2import re
3from typing import Tuple
5from django.contrib.contenttypes.models import ContentType
6from django.core.exceptions import ImproperlyConfigured
7from django.db import models
8from django.db.models import BooleanField, CharField, TextField
9from django.db.models.fields.related import ForeignKey, ManyToManyField
10from django.db.models.query import QuerySet
11from django.forms import model_to_dict
12from django.urls import reverse
14from apis_core.generic.helpers import mro_paths, permission_fullname
15from apis_core.generic.signals import (
16 post_duplicate,
17 post_merge_with,
18 pre_duplicate,
19 pre_import_from,
20 pre_merge_with,
21)
22from apis_core.generic.utils import get_autocomplete_data_and_normalized_uri
23from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix
25logger = logging.getLogger(__name__)
28class GenericModel(models.Model):
29 class Meta:
30 abstract = True
32 def __repr__(self):
33 if id := getattr(self, "id", None):
34 return super().__repr__() + f" (ID: {id})"
35 return super().__repr__()
37 @property
38 def content_type(self):
39 return ContentType.objects.get_for_model(self)
41 @classmethod
42 def get_listview_url(cls):
43 ct = ContentType.objects.get_for_model(cls)
44 return reverse("apis_core:generic:list", args=[ct])
46 @classmethod
47 def get_createview_url(cls):
48 ct = ContentType.objects.get_for_model(cls)
49 return reverse("apis_core:generic:create", args=[ct])
51 @classmethod
52 def get_importview_url(cls):
53 ct = ContentType.objects.get_for_model(cls)
54 return reverse("apis_core:generic:import", args=[ct])
56 @classmethod
57 def get_openapi_tags(cls):
58 return [item[-1] for item in mro_paths(cls)]
60 @classmethod
61 def get_namespace_prefix(cls):
62 ct = ContentType.objects.get_for_model(cls)
63 return f"{rdf_namespace_prefix()}-{ct.model}"
65 @classmethod
66 def get_namespace_uri(cls):
67 return apis_base_uri() + cls.get_listview_url()
69 @classmethod
70 def get_rdf_types(cls):
71 return []
73 def get_edit_url(self):
74 ct = ContentType.objects.get_for_model(self)
75 return reverse("apis_core:generic:update", args=[ct, self.id])
77 def get_duplicate_url(self):
78 ct = ContentType.objects.get_for_model(self)
79 return reverse("apis_core:generic:duplicate", args=[ct, self.id])
81 def get_enrich_url(self):
82 ct = ContentType.objects.get_for_model(self)
83 return reverse("apis_core:generic:enrich", args=[ct, self.id])
85 def get_absolute_url(self):
86 ct = ContentType.objects.get_for_model(self)
87 return reverse("apis_core:generic:detail", args=[ct, self.id])
89 def get_delete_url(self):
90 ct = ContentType.objects.get_for_model(self)
91 return reverse("apis_core:generic:delete", args=[ct, self.id])
93 def get_merge_url(self, other_id):
94 ct = ContentType.objects.get_for_model(self)
95 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])
97 def get_select_merge_or_enrich_url(self):
98 ct = ContentType.objects.get_for_model(self)
99 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])
101 def get_create_success_url(self):
102 return self.get_absolute_url()
104 def get_update_success_url(self):
105 return self.get_edit_url()
107 def get_api_detail_endpoint(self):
108 ct = ContentType.objects.get_for_model(self)
109 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])
111 @classmethod
112 def get_change_permission(self):
113 return permission_fullname("change", self)
115 @classmethod
116 def get_add_permission(self):
117 return permission_fullname("add", self)
119 @classmethod
120 def get_delete_permission(self):
121 return permission_fullname("delete", self)
123 @classmethod
124 def get_view_permission(self):
125 return permission_fullname("view", self)
127 @classmethod
128 def get_verbose_name_plural(cls):
129 return cls._meta.verbose_name_plural
131 @classmethod
132 def get_verbose_name(cls):
133 return cls._meta.verbose_name
135 @classmethod
136 def valid_import_url(cls, uri: str):
137 """
138 Check if an URI is a can be imported.
139 The exact fetching logic for an URI is defined in the
140 `import_definitions` attribute of the class.
141 `import_definitions` has to be a dict, mapping a regex
142 matching the URI to a callable taking the URI as an argument.
143 This method check if there is a callable defined for this URI.
144 """
145 _, uri = get_autocomplete_data_and_normalized_uri(uri)
146 for regex, fn in getattr(cls, "import_definitions", {}).items():
147 if re.match(regex, uri):
148 return fn
149 return False
151 @classmethod
152 def get_data_and_normalized_uri(cls, uri: str) -> Tuple[dict, str]:
153 data, uri = get_autocomplete_data_and_normalized_uri(uri)
154 return data, uri
156 @classmethod
157 def fetch_from(cls, uri: str):
158 """
159 Normalize the URI and extract the autocomplete data.
160 Then try to fetch data from an URI:
161 Check if there is import logic configured for this URI and if
162 so, use that import logic to fetch the data.
163 Finally, combine the fetched data and the autocomplete data.
164 """
165 logger.debug("Fetch from %s", uri)
166 data, nuri = cls.get_data_and_normalized_uri(uri)
167 if fn := cls.valid_import_url(nuri):
168 fetcheddata = fn(nuri) or {}
169 # merge the two dicts
170 ret = fetcheddata | data
171 # combine values that exist in both dicts
172 for key in set(fetcheddata).intersection(data):
173 ret[key] = fetcheddata[key] + data[key]
174 return ret
175 raise ImproperlyConfigured(f"Import not configured for URI {uri}")
177 @classmethod
178 def import_from(cls, uri: str, allow_empty: bool = True):
179 """
180 Fetch data from an URI and create a model instance using
181 that data. If the `allow_empty` argument is set, this also
182 creates a model instance if the data fetched was empty. This
183 might make sense if you still want to create an instance and
184 attach the URI to it.
185 """
186 # we allow other apps to injercept the import
187 # whatever they return will be used instead of
188 # creating a new object
189 _, nuri = cls.get_data_and_normalized_uri(uri)
190 for receiver, response in pre_import_from.send(sender=cls, uri=nuri):
191 if response:
192 return response
193 data = cls.fetch_from(uri) or {}
194 if allow_empty or data:
195 instance = cls()
196 instance.save()
197 instance.import_data(data)
198 return instance
199 raise ValueError(f"Could not fetch data to import from {uri}")
201 def import_from_dict_subset(self, **data):
202 """
203 Import attributes of this instance from data in a dict.
204 We iterate through the individual values of the dict and
205 a) only set them if the instance has an attribute matching
206 the key and b) use the fields `clean` method to check if
207 the value validates. If it does not validate, we return
208 the validation error in the errors dict.
209 """
210 self._import_errors = {}
211 if data:
212 for field in self._meta.fields:
213 if data.get(field.name, False):
214 value = str(data[field.name][0])
215 try:
216 field.clean(value, self)
217 except Exception as e:
218 logger.info(
219 "Could not set %s on %s: %s", field.name, str(self), str(e)
220 )
221 self._import_errors[field.name] = str(e)
222 else:
223 setattr(self, field.name, value)
224 self.save()
226 def import_data(self, data):
227 self.import_from_dict_subset(**data)
229 def get_merge_charfield_value(self, other: CharField, field: CharField):
230 res = getattr(self, str(field.name))
231 if not field.choices:
232 otherres = getattr(other, str(field.name), res)
233 if otherres and otherres != res:
234 res += f" ({otherres})"
235 return res
237 def get_merge_textfield_value(self, other: TextField, field: TextField):
238 res = getattr(self, str(field.name))
239 if getattr(other, str(field.name)):
240 # if own value is None, fallback to empty string
241 res = res or ""
242 res += "\n" + f"Merged from {other}:\n" + getattr(other, str(field.name))
243 return res
245 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
246 return getattr(other, str(field.name))
248 def get_field_value_after_merge(self, other, field):
249 """
250 This method finds the value of a field after merging `other` into `self`.
251 It first tries to find a merge method that is specific to that field
252 (merge_{fieldname}) and then tries to find a method that is specific to
253 the type of the field (merge_{fieldtype})
254 If neither of those exist, it uses the others field value if the field
255 in self is not set, otherwise it keeps the value in self.
256 """
257 fieldtype = field.get_internal_type().lower()
258 # if there is a `get_merge_{fieldname}` method in this model, use that one
259 if callable(getattr(self, f"get_merge_{field.name}_value", None)):
260 return getattr(self, f"get_merge_{field.name}_value")(other)
261 # otherwise we check if there is a method for the field type and use that one
262 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
263 return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
264 else:
265 if not getattr(self, str(field.name)):
266 return getattr(other, str(field.name))
267 return getattr(self, field.name)
269 def merge_fields(self, other):
270 """
271 This method iterates through the model fields and uses the
272 `get_field_value_after_merge` method to copy values from `other` to `self`.
273 It is called by the `merge_with` method.
274 """
275 for field in self._meta.fields:
276 newval = self.get_field_value_after_merge(other, field)
277 if newval != getattr(self, str(field.name)):
278 setattr(self, str(field.name), newval)
279 self.save()
281 def merge_with(self, entities):
282 if self in entities:
283 entities.remove(self)
284 origin = self.__class__
285 pre_merge_with.send(sender=origin, instance=self, entities=entities)
287 e_a = type(self).__name__
288 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
289 if isinstance(entities, int):
290 entities = self_model_class.objects.get(pk=entities)
291 if not isinstance(entities, list) and not isinstance(entities, QuerySet):
292 entities = [entities]
293 entities = [
294 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
295 for ent in entities
296 ]
297 for ent in entities:
298 e_b = type(ent).__name__
299 if e_a != e_b:
300 continue
301 for f in ent._meta.local_many_to_many:
302 if not f.name.endswith("_set"):
303 sl = list(getattr(self, f.name).all())
304 for s in getattr(ent, f.name).all():
305 if s not in sl:
306 getattr(self, f.name).add(s)
308 for ent in entities:
309 self.merge_fields(ent)
311 post_merge_with.send(sender=origin, instance=self, entities=entities)
313 for ent in entities:
314 ent.delete()
316 def duplicate(self):
317 origin = self.__class__
318 pre_duplicate.send(sender=origin, instance=self)
319 # usually, copying instances would work like
320 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
321 # but we are working with abstract classes,
322 # so we have to do it by hand using model_to_dict:(
323 objdict = model_to_dict(self)
325 # remove unique fields from dict representation
326 unique_fields = [field for field in self._meta.fields if field.unique]
327 for field in unique_fields:
328 logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
329 objdict.pop(field.name, None)
331 # remove related fields from dict representation
332 related_fields = [
333 field for field in self._meta.get_fields() if field.is_relation
334 ]
335 for field in related_fields:
336 objdict.pop(field.name, None)
338 newobj = type(self).objects.create(**objdict)
340 for field in related_fields:
341 # we are not using `isinstance` because we want to
342 # differentiate between different levels of inheritance
343 if type(field) is ForeignKey:
344 setattr(newobj, field.name, getattr(self, field.name))
345 if type(field) is ManyToManyField:
346 objfield = getattr(newobj, field.name)
347 values = getattr(self, field.name).all()
348 objfield.set(values)
350 newobj.save()
351 post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
352 return newobj
354 duplicate.alters_data = True
356 def uri_set(self):
357 ct = ContentType.objects.get_for_model(self)
358 return (
359 ContentType.objects.get(app_label="uris", model="uri")
360 .model_class()
361 .objects.filter(content_type=ct, object_id=self.id)
362 .all()
363 )
365 def uri_set_with_importer(self):
366 return [uri for uri in self.uri_set() if self.valid_import_url(uri.uri)]