Coverage for apis_core/generic/abc.py: 57%
228 statements
« prev ^ index » next coverage.py v7.5.3, created at 2025-12-04 11:32 +0000
« prev ^ index » next coverage.py v7.5.3, created at 2025-12-04 11:32 +0000
1import logging
2import re
4from django.contrib.contenttypes.models import ContentType
5from django.core.exceptions import ImproperlyConfigured
6from django.db import models
7from django.db.models import BooleanField, CharField, TextField
8from django.db.models.fields.related import ForeignKey, ManyToManyField
9from django.db.models.query import QuerySet
10from django.forms import model_to_dict
11from django.urls import reverse
13from apis_core.generic.helpers import mro_paths, permission_fullname
14from apis_core.generic.signals import (
15 post_duplicate,
16 post_merge_with,
17 pre_duplicate,
18 pre_import_from,
19 pre_merge_with,
20)
21from apis_core.generic.utils import get_autocomplete_data_and_normalized_uri
22from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix
24logger = logging.getLogger(__name__)
27class GenericModel(models.Model):
28 class Meta:
29 abstract = True
31 def __repr__(self):
32 if id := getattr(self, "id", None):
33 return super().__repr__() + f" (ID: {id})"
34 return super().__repr__()
36 @property
37 def content_type(self):
38 return ContentType.objects.get_for_model(self)
40 @classmethod
41 def get_listview_url(cls):
42 ct = ContentType.objects.get_for_model(cls)
43 return reverse("apis_core:generic:list", args=[ct])
45 @classmethod
46 def get_createview_url(cls):
47 ct = ContentType.objects.get_for_model(cls)
48 return reverse("apis_core:generic:create", args=[ct])
50 @classmethod
51 def get_importview_url(cls):
52 ct = ContentType.objects.get_for_model(cls)
53 return reverse("apis_core:generic:import", args=[ct])
55 @classmethod
56 def get_openapi_tags(cls):
57 return [item[-1] for item in mro_paths(cls)]
59 @classmethod
60 def get_namespace_prefix(cls):
61 ct = ContentType.objects.get_for_model(cls)
62 return f"{rdf_namespace_prefix()}-{ct.model}"
64 @classmethod
65 def get_namespace_uri(cls):
66 return apis_base_uri() + cls.get_listview_url()
68 @classmethod
69 def get_rdf_types(cls):
70 return []
72 def get_edit_url(self):
73 ct = ContentType.objects.get_for_model(self)
74 return reverse("apis_core:generic:update", args=[ct, self.id])
76 def get_duplicate_url(self):
77 ct = ContentType.objects.get_for_model(self)
78 return reverse("apis_core:generic:duplicate", args=[ct, self.id])
80 def get_enrich_url(self):
81 ct = ContentType.objects.get_for_model(self)
82 return reverse("apis_core:generic:enrich", args=[ct, self.id])
84 def get_absolute_url(self):
85 ct = ContentType.objects.get_for_model(self)
86 return reverse("apis_core:generic:detail", args=[ct, self.id])
88 def get_delete_url(self):
89 ct = ContentType.objects.get_for_model(self)
90 return reverse("apis_core:generic:delete", args=[ct, self.id])
92 def get_merge_url(self, other_id):
93 ct = ContentType.objects.get_for_model(self)
94 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])
96 def get_select_merge_or_enrich_url(self):
97 ct = ContentType.objects.get_for_model(self)
98 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])
100 def get_create_success_url(self):
101 return self.get_absolute_url()
103 def get_update_success_url(self):
104 return self.get_edit_url()
106 def get_api_detail_endpoint(self):
107 ct = ContentType.objects.get_for_model(self)
108 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])
110 @classmethod
111 def get_change_permission(self):
112 return permission_fullname("change", self)
114 @classmethod
115 def get_add_permission(self):
116 return permission_fullname("add", self)
118 @classmethod
119 def get_delete_permission(self):
120 return permission_fullname("delete", self)
122 @classmethod
123 def get_view_permission(self):
124 return permission_fullname("view", self)
126 @classmethod
127 def get_verbose_name_plural(cls):
128 return cls._meta.verbose_name_plural
130 @classmethod
131 def get_verbose_name(cls):
132 return cls._meta.verbose_name
134 @classmethod
135 def valid_import_url(cls, uri: str):
136 """
137 Check if an URI is a can be imported.
138 The exact fetching logic for an URI is defined in the
139 `import_definitions` attribute of the class.
140 `import_definitions` has to be a dict, mapping a regex
141 matching the URI to a callable taking the URI as an argument.
142 This method check if there is a callable defined for this URI.
143 """
144 _, uri = get_autocomplete_data_and_normalized_uri(uri)
145 for regex, fn in getattr(cls, "import_definitions", {}).items():
146 if re.match(regex, uri):
147 return fn
148 return False
150 @classmethod
151 def fetch_from(cls, uri: str):
152 """
153 Normalize the URI and extract the autocomplete data.
154 Then try to fetch data from an URI:
155 Check if there is import logic configured for this URI and if
156 so, use that import logic to fetch the data.
157 Finally, combine the fetched data and the autocomplete data.
158 """
159 logger.debug("Fetch from %s", uri)
160 data, nuri = get_autocomplete_data_and_normalized_uri(uri)
161 if fn := cls.valid_import_url(nuri):
162 fetcheddata = fn(nuri) or {}
163 # merge the two dicts
164 ret = fetcheddata | data
165 # combine values that exist in both dicts
166 for key in set(fetcheddata).intersection(data):
167 ret[key] = fetcheddata[key] + data[key]
168 return ret
169 raise ImproperlyConfigured(f"Import not configured for URI {uri}")
171 @classmethod
172 def import_from(cls, uri: str, allow_empty: bool = True):
173 """
174 Fetch data from an URI and create a model instance using
175 that data. If the `allow_empty` argument is set, this also
176 creates a model instance if the data fetched was empty. This
177 might make sense if you still want to create an instance and
178 attach the URI to it.
179 """
180 # we allow other apps to injercept the import
181 # whatever they return will be used instead of
182 # creating a new object
183 _, nuri = get_autocomplete_data_and_normalized_uri(uri)
184 for receiver, response in pre_import_from.send(sender=cls, uri=nuri):
185 if response:
186 return response
187 data = cls.fetch_from(uri) or {}
188 if allow_empty or data:
189 instance = cls()
190 instance._uris = [data.get("uri", nuri)]
191 instance.save()
192 instance.import_data(data)
193 return instance
194 raise ValueError(f"Could not fetch data to import from {uri}")
196 def import_from_dict_subset(self, **data):
197 """
198 Import attributes of this instance from data in a dict.
199 We iterate through the individual values of the dict and
200 a) only set them if the instance has an attribute matching
201 the key and b) use the fields `clean` method to check if
202 the value validates. If it does not validate, we return
203 the validation error in the errors dict.
204 """
205 self._import_errors = {}
206 if data:
207 for field in self._meta.fields:
208 if data.get(field.name, False):
209 value = str(data[field.name][0])
210 try:
211 field.clean(value, self)
212 except Exception as e:
213 logger.info(
214 "Could not set %s on %s: %s", field.name, str(self), str(e)
215 )
216 self._import_errors[field.name] = str(e)
217 else:
218 setattr(self, field.name, value)
219 self.save()
221 def import_data(self, data):
222 self.import_from_dict_subset(**data)
224 def get_merge_charfield_value(self, other: CharField, field: CharField):
225 res = getattr(self, field.name)
226 if not field.choices:
227 otherres = getattr(other, field.name, res)
228 if otherres and otherres != res:
229 res += f" ({otherres})"
230 return res
232 def get_merge_textfield_value(self, other: TextField, field: TextField):
233 res = getattr(self, field.name)
234 if getattr(other, field.name):
235 # if own value is None, fallback to empty string
236 res = res or ""
237 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name)
238 return res
240 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
241 return getattr(other, field.name)
243 def get_field_value_after_merge(self, other, field):
244 """
245 This method finds the value of a field after merging `other` into `self`.
246 It first tries to find a merge method that is specific to that field
247 (merge_{fieldname}) and then tries to find a method that is specific to
248 the type of the field (merge_{fieldtype})
249 If neither of those exist, it uses the others field value if the field
250 in self is not set, otherwise it keeps the value in self.
251 """
252 fieldtype = field.get_internal_type().lower()
253 # if there is a `get_merge_{fieldname}` method in this model, use that one
254 if callable(getattr(self, f"get_merge_{field.name}_value", None)):
255 return getattr(self, f"get_merge_{field.name}_value")(other)
256 # otherwise we check if there is a method for the field type and use that one
257 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
258 return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
259 else:
260 if not getattr(self, field.name):
261 return getattr(other, field.name)
262 return getattr(self, field.name)
264 def merge_fields(self, other):
265 """
266 This method iterates through the model fields and uses the
267 `get_field_value_after_merge` method to copy values from `other` to `self`.
268 It is called by the `merge_with` method.
269 """
270 for field in self._meta.fields:
271 newval = self.get_field_value_after_merge(other, field)
272 if newval != getattr(self, field.name):
273 setattr(self, field.name, newval)
274 self.save()
276 def merge_with(self, entities):
277 if self in entities:
278 entities.remove(self)
279 origin = self.__class__
280 pre_merge_with.send(sender=origin, instance=self, entities=entities)
282 e_a = type(self).__name__
283 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
284 if isinstance(entities, int):
285 entities = self_model_class.objects.get(pk=entities)
286 if not isinstance(entities, list) and not isinstance(entities, QuerySet):
287 entities = [entities]
288 entities = [
289 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
290 for ent in entities
291 ]
292 for ent in entities:
293 e_b = type(ent).__name__
294 if e_a != e_b:
295 continue
296 for f in ent._meta.local_many_to_many:
297 if not f.name.endswith("_set"):
298 sl = list(getattr(self, f.name).all())
299 for s in getattr(ent, f.name).all():
300 if s not in sl:
301 getattr(self, f.name).add(s)
303 for ent in entities:
304 self.merge_fields(ent)
306 post_merge_with.send(sender=origin, instance=self, entities=entities)
308 for ent in entities:
309 ent.delete()
311 def duplicate(self):
312 origin = self.__class__
313 pre_duplicate.send(sender=origin, instance=self)
314 # usually, copying instances would work like
315 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
316 # but we are working with abstract classes,
317 # so we have to do it by hand using model_to_dict:(
318 objdict = model_to_dict(self)
320 # remove unique fields from dict representation
321 unique_fields = [field for field in self._meta.fields if field.unique]
322 for field in unique_fields:
323 logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
324 objdict.pop(field.name, None)
326 # remove related fields from dict representation
327 related_fields = [
328 field for field in self._meta.get_fields() if field.is_relation
329 ]
330 for field in related_fields:
331 objdict.pop(field.name, None)
333 newobj = type(self).objects.create(**objdict)
335 for field in related_fields:
336 # we are not using `isinstance` because we want to
337 # differentiate between different levels of inheritance
338 if type(field) is ForeignKey:
339 setattr(newobj, field.name, getattr(self, field.name))
340 if type(field) is ManyToManyField:
341 objfield = getattr(newobj, field.name)
342 values = getattr(self, field.name).all()
343 objfield.set(values)
345 newobj.save()
346 post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
347 return newobj
349 duplicate.alters_data = True
351 def uri_set(self):
352 ct = ContentType.objects.get_for_model(self)
353 return (
354 ContentType.objects.get(app_label="uris", model="uri")
355 .model_class()
356 .objects.filter(content_type=ct, object_id=self.id)
357 .all()
358 )
360 def uri_set_with_importer(self):
361 return [uri for uri in self.uri_set() if self.valid_import_url(uri.uri)]