Coverage for apis_core/generic/abc.py: 56%
153 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-19 16:54 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-02-19 16:54 +0000
1import logging
3from django.contrib.contenttypes.models import ContentType
4from django.db.models import BooleanField, CharField, TextField
5from django.db.models.fields.related import ForeignKey, ManyToManyField
6from django.db.models.query import QuerySet
7from django.forms import model_to_dict
8from django.urls import reverse
10from apis_core.generic.helpers import mro_paths, permission_fullname
11from apis_core.generic.signals import (
12 post_duplicate,
13 post_merge_with,
14 pre_duplicate,
15 pre_merge_with,
16)
18logger = logging.getLogger(__name__)
21class GenericModel:
22 def __repr__(self):
23 if id := getattr(self, "id", None):
24 return super().__repr__() + f" (ID: {id})"
25 return super().__repr__()
27 @classmethod
28 def get_listview_url(cls):
29 ct = ContentType.objects.get_for_model(cls)
30 return reverse("apis_core:generic:list", args=[ct])
32 @classmethod
33 def get_createview_url(cls):
34 ct = ContentType.objects.get_for_model(cls)
35 return reverse("apis_core:generic:create", args=[ct])
37 @classmethod
38 def get_importview_url(cls):
39 ct = ContentType.objects.get_for_model(cls)
40 return reverse("apis_core:generic:import", args=[ct])
42 @classmethod
43 def get_openapi_tags(cls):
44 return [item[-1] for item in mro_paths(cls)]
46 def get_edit_url(self):
47 ct = ContentType.objects.get_for_model(self)
48 return reverse("apis_core:generic:update", args=[ct, self.id])
50 def get_enrich_url(self):
51 ct = ContentType.objects.get_for_model(self)
52 return reverse("apis_core:generic:enrich", args=[ct, self.id])
54 def get_absolute_url(self):
55 ct = ContentType.objects.get_for_model(self)
56 return reverse("apis_core:generic:detail", args=[ct, self.id])
58 def get_delete_url(self):
59 ct = ContentType.objects.get_for_model(self)
60 return reverse("apis_core:generic:delete", args=[ct, self.id])
62 def get_merge_url(self, other_id):
63 ct = ContentType.objects.get_for_model(self)
64 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])
66 def get_select_merge_or_enrich_url(self):
67 ct = ContentType.objects.get_for_model(self)
68 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])
70 def get_create_success_url(self):
71 return self.get_absolute_url()
73 def get_update_success_url(self):
74 return self.get_edit_url()
76 def get_api_detail_endpoint(self):
77 ct = ContentType.objects.get_for_model(self)
78 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])
80 @classmethod
81 def get_change_permission(self):
82 return permission_fullname("change", self)
84 @classmethod
85 def get_add_permission(self):
86 return permission_fullname("add", self)
88 @classmethod
89 def get_delete_permission(self):
90 return permission_fullname("delete", self)
92 @classmethod
93 def get_view_permission(self):
94 return permission_fullname("view", self)
96 def get_merge_charfield_value(self, other: CharField, field: CharField):
97 res = getattr(self, field.name)
98 if not field.choices:
99 otherres = getattr(other, field.name, res)
100 if otherres != res:
101 res += f" ({otherres})"
102 return res
104 def get_merge_textfield_value(self, other: TextField, field: TextField):
105 res = getattr(self, field.name)
106 if getattr(other, field.name):
107 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name)
108 return res
110 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
111 return getattr(other, field.name)
113 def get_field_value_after_merge(self, other, field):
114 """
115 This method finds the value of a field after merging `other` into `self`.
116 It first tries to find a merge method that is specific to that field
117 (merge_{fieldname}) and then tries to find a method that is specific to
118 the type of the field (merge_{fieldtype})
119 If neither of those exist, it uses the others field value if the field
120 in self is not set, otherwise it keeps the value in self.
121 """
122 fieldtype = field.get_internal_type().lower()
123 # if there is a `get_merge_{fieldname}` method in this model, use that one
124 if callable(getattr(self, f"get_merge_{field.name}_value", None)):
125 return getattr(self, f"get_merge_{field.name}_value")(other)
126 # otherwise we check if there is a method for the field type and use that one
127 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
128 return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
129 else:
130 if not getattr(self, field.name):
131 return getattr(other, field.name)
132 return getattr(self, field.name)
134 def merge_fields(self, other):
135 """
136 This method iterates through the model fields and uses the
137 `get_field_value_after_merge` method to copy values from `other` to `self`.
138 It is called by the `merge_with` method.
139 """
140 for field in self._meta.fields:
141 newval = self.get_field_value_after_merge(other, field)
142 if newval != getattr(self, field.name):
143 setattr(self, field.name, newval)
144 self.save()
146 def merge_with(self, entities):
147 if self in entities:
148 entities.remove(self)
149 origin = self.__class__
150 pre_merge_with.send(sender=origin, instance=self, entities=entities)
152 # TODO: check if these imports can be put to top of module without
153 # causing circular import issues.
154 from apis_core.apis_metainfo.models import Uri
156 e_a = type(self).__name__
157 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
158 if isinstance(entities, int):
159 entities = self_model_class.objects.get(pk=entities)
160 if not isinstance(entities, list) and not isinstance(entities, QuerySet):
161 entities = [entities]
162 entities = [
163 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
164 for ent in entities
165 ]
166 for ent in entities:
167 e_b = type(ent).__name__
168 if e_a != e_b:
169 continue
170 for f in ent._meta.local_many_to_many:
171 if not f.name.endswith("_set"):
172 sl = list(getattr(self, f.name).all())
173 for s in getattr(ent, f.name).all():
174 if s not in sl:
175 getattr(self, f.name).add(s)
176 self_content_type = ContentType.objects.get_for_model(self)
177 ent_content_type = ContentType.objects.get_for_model(ent)
178 Uri.objects.filter(content_type=ent_content_type, object_id=ent.id).update(
179 content_type=self_content_type, object_id=self.id
180 )
182 for ent in entities:
183 self.merge_fields(ent)
185 post_merge_with.send(sender=origin, instance=self, entities=entities)
187 for ent in entities:
188 ent.delete()
190 def duplicate(self):
191 origin = self.__class__
192 pre_duplicate.send(sender=origin, instance=self)
193 # usually, copying instances would work like
194 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
195 # but we are working with abstract classes,
196 # so we have to do it by hand using model_to_dict:(
197 objdict = model_to_dict(self)
199 # remove unique fields from dict representation
200 unique_fields = [field for field in self._meta.fields if field.unique]
201 for field in unique_fields:
202 logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
203 objdict.pop(field.name, None)
205 # remove related fields from dict representation
206 related_fields = [
207 field for field in self._meta.get_fields() if field.is_relation
208 ]
209 for field in related_fields:
210 objdict.pop(field.name, None)
212 newobj = type(self).objects.create(**objdict)
214 for field in related_fields:
215 # we are not using `isinstance` because we want to
216 # differentiate between different levels of inheritance
217 if type(field) is ForeignKey:
218 setattr(newobj, field.name, getattr(self, field.name))
219 if type(field) is ManyToManyField:
220 objfield = getattr(newobj, field.name)
221 values = getattr(self, field.name).all()
222 objfield.set(values)
224 newobj.save()
225 post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
226 return newobj
228 duplicate.alters_data = True
230 def uri_set(self):
231 ct = ContentType.objects.get_for_model(self)
232 return (
233 ContentType.objects.get(app_label="apis_metainfo", model="uri")
234 .model_class()
235 .objects.filter(content_type=ct, object_id=self.id)
236 .all()
237 )