Coverage for apis_core/generic/abc.py: 56%
142 statements
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-20 09:24 +0000
« prev ^ index » next coverage.py v7.6.8, created at 2024-12-20 09:24 +0000
1import logging
3from django.contrib.contenttypes.models import ContentType
4from django.db.models import BooleanField, CharField, TextField
5from django.db.models.fields.related import ForeignKey, ManyToManyField
6from django.db.models.query import QuerySet
7from django.forms import model_to_dict
8from django.urls import reverse
10from apis_core.generic.helpers import permission_fullname
11from apis_core.generic.signals import (
12 post_duplicate,
13 post_merge_with,
14 pre_duplicate,
15 pre_merge_with,
16)
18logger = logging.getLogger(__name__)
21class GenericModel:
22 def __repr__(self):
23 if id := getattr(self, "id", None):
24 return super().__repr__() + f" (ID: {id})"
25 return super().__repr__()
27 @classmethod
28 def get_listview_url(cls):
29 ct = ContentType.objects.get_for_model(cls)
30 return reverse("apis_core:generic:list", args=[ct])
32 @classmethod
33 def get_createview_url(cls):
34 ct = ContentType.objects.get_for_model(cls)
35 return reverse("apis_core:generic:create", args=[ct])
37 @classmethod
38 def get_importview_url(cls):
39 ct = ContentType.objects.get_for_model(cls)
40 return reverse("apis_core:generic:import", args=[ct])
42 def get_edit_url(self):
43 ct = ContentType.objects.get_for_model(self)
44 return reverse("apis_core:generic:update", args=[ct, self.id])
46 def get_enrich_url(self):
47 ct = ContentType.objects.get_for_model(self)
48 return reverse("apis_core:generic:enrich", args=[ct, self.id])
50 def get_absolute_url(self):
51 ct = ContentType.objects.get_for_model(self)
52 return reverse("apis_core:generic:detail", args=[ct, self.id])
54 def get_delete_url(self):
55 ct = ContentType.objects.get_for_model(self)
56 return reverse("apis_core:generic:delete", args=[ct, self.id])
58 def get_merge_url(self, other_id):
59 ct = ContentType.objects.get_for_model(self)
60 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id])
62 def get_select_merge_or_enrich_url(self):
63 ct = ContentType.objects.get_for_model(self)
64 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id])
66 def get_create_success_url(self):
67 return self.get_absolute_url()
69 def get_update_success_url(self):
70 return self.get_edit_url()
72 def get_api_detail_endpoint(self):
73 ct = ContentType.objects.get_for_model(self)
74 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id])
76 @classmethod
77 def get_change_permission(self):
78 return permission_fullname("change", self)
80 @classmethod
81 def get_add_permission(self):
82 return permission_fullname("add", self)
84 @classmethod
85 def get_delete_permission(self):
86 return permission_fullname("delete", self)
88 def get_merge_charfield_value(self, other: CharField, field: CharField):
89 res = getattr(self, field.name)
90 if not field.choices:
91 otherres = getattr(other, field.name, res)
92 if otherres != res:
93 res += f" ({otherres})"
94 return res
96 def get_merge_textfield_value(self, other: TextField, field: TextField):
97 res = getattr(self, field.name)
98 if getattr(other, field.name):
99 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name)
100 return res
102 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField):
103 return getattr(other, field.name)
105 def get_field_value_after_merge(self, other, field):
106 """
107 This method finds the value of a field after merging `other` into `self`.
108 It first tries to find a merge method that is specific to that field
109 (merge_{fieldname}) and then tries to find a method that is specific to
110 the type of the field (merge_{fieldtype})
111 If neither of those exist, it uses the others field value if the field
112 in self is not set, otherwise it keeps the value in self.
113 """
114 fieldtype = field.get_internal_type().lower()
115 # if there is a `get_merge_{fieldname}` method in this model, use that one
116 if callable(getattr(self, f"get_merge_{field.name}_value", None)):
117 return getattr(self, f"get_merge_{field.name}_value")(other)
118 # otherwise we check if there is a method for the field type and use that one
119 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)):
120 return getattr(self, f"get_merge_{fieldtype}_value")(other, field)
121 else:
122 if not getattr(self, field.name):
123 return getattr(other, field.name)
124 return getattr(self, field.name)
126 def merge_fields(self, other):
127 """
128 This method iterates through the model fields and uses the
129 `get_field_value_after_merge` method to copy values from `other` to `self`.
130 It is called by the `merge_with` method.
131 """
132 for field in self._meta.fields:
133 newval = self.get_field_value_after_merge(other, field)
134 if newval != getattr(self, field.name):
135 setattr(self, field.name, newval)
136 self.save()
138 def merge_with(self, entities):
139 if self in entities:
140 entities.remove(self)
141 origin = self.__class__
142 pre_merge_with.send(sender=origin, instance=self, entities=entities)
144 # TODO: check if these imports can be put to top of module without
145 # causing circular import issues.
146 from apis_core.apis_metainfo.models import Uri
148 e_a = type(self).__name__
149 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class()
150 if isinstance(entities, int):
151 entities = self_model_class.objects.get(pk=entities)
152 if not isinstance(entities, list) and not isinstance(entities, QuerySet):
153 entities = [entities]
154 entities = [
155 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent
156 for ent in entities
157 ]
158 for ent in entities:
159 e_b = type(ent).__name__
160 if e_a != e_b:
161 continue
162 for f in ent._meta.local_many_to_many:
163 if not f.name.endswith("_set"):
164 sl = list(getattr(self, f.name).all())
165 for s in getattr(ent, f.name).all():
166 if s not in sl:
167 getattr(self, f.name).add(s)
168 Uri.objects.filter(root_object=ent).update(root_object=self)
170 for ent in entities:
171 self.merge_fields(ent)
173 post_merge_with.send(sender=origin, instance=self, entities=entities)
175 for ent in entities:
176 ent.delete()
178 def duplicate(self):
179 origin = self.__class__
180 pre_duplicate.send(sender=origin, instance=self)
181 # usually, copying instances would work like
182 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances
183 # but we are working with abstract classes,
184 # so we have to do it by hand using model_to_dict:(
185 objdict = model_to_dict(self)
187 # remove unique fields from dict representation
188 unique_fields = [field for field in self._meta.fields if field.unique]
189 for field in unique_fields:
190 logger.info(f"Duplicating {self}: ignoring unique field {field.name}")
191 objdict.pop(field.name, None)
193 # remove related fields from dict representation
194 related_fields = [
195 field for field in self._meta.get_fields() if field.is_relation
196 ]
197 for field in related_fields:
198 objdict.pop(field.name, None)
200 newobj = type(self).objects.create(**objdict)
202 for field in related_fields:
203 # we are not using `isinstance` because we want to
204 # differentiate between different levels of inheritance
205 if type(field) is ForeignKey:
206 setattr(newobj, field.name, getattr(self, field.name))
207 if type(field) is ManyToManyField:
208 objfield = getattr(newobj, field.name)
209 values = getattr(self, field.name).all()
210 objfield.set(values)
212 newobj.save()
213 post_duplicate.send(sender=origin, instance=self, duplicate=newobj)
214 return newobj
216 duplicate.alters_data = True