Coverage for apis_core/generic/abc.py: 56%

176 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-09-03 06:15 +0000

1import logging 

2 

3from django.contrib.contenttypes.models import ContentType 

4from django.db import models 

5from django.db.models import BooleanField, CharField, TextField 

6from django.db.models.fields.related import ForeignKey, ManyToManyField 

7from django.db.models.query import QuerySet 

8from django.forms import model_to_dict 

9from django.urls import reverse 

10 

11from apis_core.generic.helpers import mro_paths, permission_fullname 

12from apis_core.generic.signals import ( 

13 post_duplicate, 

14 post_merge_with, 

15 pre_duplicate, 

16 pre_merge_with, 

17) 

18from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23class GenericModel(models.Model): 

24 class Meta: 

25 abstract = True 

26 

27 def __repr__(self): 

28 if id := getattr(self, "id", None): 

29 return super().__repr__() + f" (ID: {id})" 

30 return super().__repr__() 

31 

32 @property 

33 def content_type(self): 

34 return ContentType.objects.get_for_model(self) 

35 

36 @classmethod 

37 def get_listview_url(cls): 

38 ct = ContentType.objects.get_for_model(cls) 

39 return reverse("apis_core:generic:list", args=[ct]) 

40 

41 @classmethod 

42 def get_createview_url(cls): 

43 ct = ContentType.objects.get_for_model(cls) 

44 return reverse("apis_core:generic:create", args=[ct]) 

45 

46 @classmethod 

47 def get_importview_url(cls): 

48 ct = ContentType.objects.get_for_model(cls) 

49 return reverse("apis_core:generic:import", args=[ct]) 

50 

51 @classmethod 

52 def get_openapi_tags(cls): 

53 return [item[-1] for item in mro_paths(cls)] 

54 

55 @classmethod 

56 def get_namespace_prefix(cls): 

57 ct = ContentType.objects.get_for_model(cls) 

58 return f"{rdf_namespace_prefix()}-{ct.model}" 

59 

60 @classmethod 

61 def get_namespace_uri(cls): 

62 return apis_base_uri() + cls.get_listview_url() 

63 

64 @classmethod 

65 def get_rdf_types(cls): 

66 return [] 

67 

68 def get_edit_url(self): 

69 ct = ContentType.objects.get_for_model(self) 

70 return reverse("apis_core:generic:update", args=[ct, self.id]) 

71 

72 def get_duplicate_url(self): 

73 ct = ContentType.objects.get_for_model(self) 

74 return reverse("apis_core:generic:duplicate", args=[ct, self.id]) 

75 

76 def get_enrich_url(self): 

77 ct = ContentType.objects.get_for_model(self) 

78 return reverse("apis_core:generic:enrich", args=[ct, self.id]) 

79 

80 def get_absolute_url(self): 

81 ct = ContentType.objects.get_for_model(self) 

82 return reverse("apis_core:generic:detail", args=[ct, self.id]) 

83 

84 def get_delete_url(self): 

85 ct = ContentType.objects.get_for_model(self) 

86 return reverse("apis_core:generic:delete", args=[ct, self.id]) 

87 

88 def get_merge_url(self, other_id): 

89 ct = ContentType.objects.get_for_model(self) 

90 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id]) 

91 

92 def get_select_merge_or_enrich_url(self): 

93 ct = ContentType.objects.get_for_model(self) 

94 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id]) 

95 

96 def get_create_success_url(self): 

97 return self.get_absolute_url() 

98 

99 def get_update_success_url(self): 

100 return self.get_edit_url() 

101 

102 def get_api_detail_endpoint(self): 

103 ct = ContentType.objects.get_for_model(self) 

104 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id]) 

105 

106 @classmethod 

107 def get_change_permission(self): 

108 return permission_fullname("change", self) 

109 

110 @classmethod 

111 def get_add_permission(self): 

112 return permission_fullname("add", self) 

113 

114 @classmethod 

115 def get_delete_permission(self): 

116 return permission_fullname("delete", self) 

117 

118 @classmethod 

119 def get_view_permission(self): 

120 return permission_fullname("view", self) 

121 

122 @classmethod 

123 def get_verbose_name_plural(cls): 

124 return cls._meta.verbose_name_plural 

125 

126 @classmethod 

127 def get_verbose_name(cls): 

128 return cls._meta.verbose_name 

129 

130 def get_merge_charfield_value(self, other: CharField, field: CharField): 

131 res = getattr(self, field.name) 

132 if not field.choices: 

133 otherres = getattr(other, field.name, res) 

134 if otherres and otherres != res: 

135 res += f" ({otherres})" 

136 return res 

137 

138 def get_merge_textfield_value(self, other: TextField, field: TextField): 

139 res = getattr(self, field.name) 

140 if getattr(other, field.name): 

141 # if own value is None, fallback to empty string 

142 res = res or "" 

143 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name) 

144 return res 

145 

146 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField): 

147 return getattr(other, field.name) 

148 

149 def get_field_value_after_merge(self, other, field): 

150 """ 

151 This method finds the value of a field after merging `other` into `self`. 

152 It first tries to find a merge method that is specific to that field 

153 (merge_{fieldname}) and then tries to find a method that is specific to 

154 the type of the field (merge_{fieldtype}) 

155 If neither of those exist, it uses the others field value if the field 

156 in self is not set, otherwise it keeps the value in self. 

157 """ 

158 fieldtype = field.get_internal_type().lower() 

159 # if there is a `get_merge_{fieldname}` method in this model, use that one 

160 if callable(getattr(self, f"get_merge_{field.name}_value", None)): 

161 return getattr(self, f"get_merge_{field.name}_value")(other) 

162 # otherwise we check if there is a method for the field type and use that one 

163 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)): 

164 return getattr(self, f"get_merge_{fieldtype}_value")(other, field) 

165 else: 

166 if not getattr(self, field.name): 

167 return getattr(other, field.name) 

168 return getattr(self, field.name) 

169 

170 def merge_fields(self, other): 

171 """ 

172 This method iterates through the model fields and uses the 

173 `get_field_value_after_merge` method to copy values from `other` to `self`. 

174 It is called by the `merge_with` method. 

175 """ 

176 for field in self._meta.fields: 

177 newval = self.get_field_value_after_merge(other, field) 

178 if newval != getattr(self, field.name): 

179 setattr(self, field.name, newval) 

180 self.save() 

181 

182 def merge_with(self, entities): 

183 if self in entities: 

184 entities.remove(self) 

185 origin = self.__class__ 

186 pre_merge_with.send(sender=origin, instance=self, entities=entities) 

187 

188 e_a = type(self).__name__ 

189 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

190 if isinstance(entities, int): 

191 entities = self_model_class.objects.get(pk=entities) 

192 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

193 entities = [entities] 

194 entities = [ 

195 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

196 for ent in entities 

197 ] 

198 for ent in entities: 

199 e_b = type(ent).__name__ 

200 if e_a != e_b: 

201 continue 

202 for f in ent._meta.local_many_to_many: 

203 if not f.name.endswith("_set"): 

204 sl = list(getattr(self, f.name).all()) 

205 for s in getattr(ent, f.name).all(): 

206 if s not in sl: 

207 getattr(self, f.name).add(s) 

208 

209 for ent in entities: 

210 self.merge_fields(ent) 

211 

212 post_merge_with.send(sender=origin, instance=self, entities=entities) 

213 

214 for ent in entities: 

215 ent.delete() 

216 

217 def duplicate(self): 

218 origin = self.__class__ 

219 pre_duplicate.send(sender=origin, instance=self) 

220 # usually, copying instances would work like 

221 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances 

222 # but we are working with abstract classes, 

223 # so we have to do it by hand using model_to_dict:( 

224 objdict = model_to_dict(self) 

225 

226 # remove unique fields from dict representation 

227 unique_fields = [field for field in self._meta.fields if field.unique] 

228 for field in unique_fields: 

229 logger.info(f"Duplicating {self}: ignoring unique field {field.name}") 

230 objdict.pop(field.name, None) 

231 

232 # remove related fields from dict representation 

233 related_fields = [ 

234 field for field in self._meta.get_fields() if field.is_relation 

235 ] 

236 for field in related_fields: 

237 objdict.pop(field.name, None) 

238 

239 newobj = type(self).objects.create(**objdict) 

240 

241 for field in related_fields: 

242 # we are not using `isinstance` because we want to 

243 # differentiate between different levels of inheritance 

244 if type(field) is ForeignKey: 

245 setattr(newobj, field.name, getattr(self, field.name)) 

246 if type(field) is ManyToManyField: 

247 objfield = getattr(newobj, field.name) 

248 values = getattr(self, field.name).all() 

249 objfield.set(values) 

250 

251 newobj.save() 

252 post_duplicate.send(sender=origin, instance=self, duplicate=newobj) 

253 return newobj 

254 

255 duplicate.alters_data = True 

256 

257 def uri_set(self): 

258 ct = ContentType.objects.get_for_model(self) 

259 return ( 

260 ContentType.objects.get(app_label="uris", model="uri") 

261 .model_class() 

262 .objects.filter(content_type=ct, object_id=self.id) 

263 .all() 

264 )