Coverage for apis_core/generic/abc.py: 58%

222 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-10-10 13:36 +0000

1import logging 

2import re 

3 

4from AcdhArcheAssets.uri_norm_rules import get_normalized_uri 

5from django.contrib.contenttypes.models import ContentType 

6from django.core.exceptions import ImproperlyConfigured 

7from django.db import models 

8from django.db.models import BooleanField, CharField, TextField 

9from django.db.models.fields.related import ForeignKey, ManyToManyField 

10from django.db.models.query import QuerySet 

11from django.forms import model_to_dict 

12from django.urls import reverse 

13 

14from apis_core.generic.helpers import mro_paths, permission_fullname 

15from apis_core.generic.signals import ( 

16 post_duplicate, 

17 post_merge_with, 

18 pre_duplicate, 

19 pre_import_from, 

20 pre_merge_with, 

21) 

22from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class GenericModel(models.Model): 

28 class Meta: 

29 abstract = True 

30 

31 def __repr__(self): 

32 if id := getattr(self, "id", None): 

33 return super().__repr__() + f" (ID: {id})" 

34 return super().__repr__() 

35 

36 @property 

37 def content_type(self): 

38 return ContentType.objects.get_for_model(self) 

39 

40 @classmethod 

41 def get_listview_url(cls): 

42 ct = ContentType.objects.get_for_model(cls) 

43 return reverse("apis_core:generic:list", args=[ct]) 

44 

45 @classmethod 

46 def get_createview_url(cls): 

47 ct = ContentType.objects.get_for_model(cls) 

48 return reverse("apis_core:generic:create", args=[ct]) 

49 

50 @classmethod 

51 def get_importview_url(cls): 

52 ct = ContentType.objects.get_for_model(cls) 

53 return reverse("apis_core:generic:import", args=[ct]) 

54 

55 @classmethod 

56 def get_openapi_tags(cls): 

57 return [item[-1] for item in mro_paths(cls)] 

58 

59 @classmethod 

60 def get_namespace_prefix(cls): 

61 ct = ContentType.objects.get_for_model(cls) 

62 return f"{rdf_namespace_prefix()}-{ct.model}" 

63 

64 @classmethod 

65 def get_namespace_uri(cls): 

66 return apis_base_uri() + cls.get_listview_url() 

67 

68 @classmethod 

69 def get_rdf_types(cls): 

70 return [] 

71 

72 def get_edit_url(self): 

73 ct = ContentType.objects.get_for_model(self) 

74 return reverse("apis_core:generic:update", args=[ct, self.id]) 

75 

76 def get_duplicate_url(self): 

77 ct = ContentType.objects.get_for_model(self) 

78 return reverse("apis_core:generic:duplicate", args=[ct, self.id]) 

79 

80 def get_enrich_url(self): 

81 ct = ContentType.objects.get_for_model(self) 

82 return reverse("apis_core:generic:enrich", args=[ct, self.id]) 

83 

84 def get_absolute_url(self): 

85 ct = ContentType.objects.get_for_model(self) 

86 return reverse("apis_core:generic:detail", args=[ct, self.id]) 

87 

88 def get_delete_url(self): 

89 ct = ContentType.objects.get_for_model(self) 

90 return reverse("apis_core:generic:delete", args=[ct, self.id]) 

91 

92 def get_merge_url(self, other_id): 

93 ct = ContentType.objects.get_for_model(self) 

94 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id]) 

95 

96 def get_select_merge_or_enrich_url(self): 

97 ct = ContentType.objects.get_for_model(self) 

98 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id]) 

99 

100 def get_create_success_url(self): 

101 return self.get_absolute_url() 

102 

103 def get_update_success_url(self): 

104 return self.get_edit_url() 

105 

106 def get_api_detail_endpoint(self): 

107 ct = ContentType.objects.get_for_model(self) 

108 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id]) 

109 

110 @classmethod 

111 def get_change_permission(self): 

112 return permission_fullname("change", self) 

113 

114 @classmethod 

115 def get_add_permission(self): 

116 return permission_fullname("add", self) 

117 

118 @classmethod 

119 def get_delete_permission(self): 

120 return permission_fullname("delete", self) 

121 

122 @classmethod 

123 def get_view_permission(self): 

124 return permission_fullname("view", self) 

125 

126 @classmethod 

127 def get_verbose_name_plural(cls): 

128 return cls._meta.verbose_name_plural 

129 

130 @classmethod 

131 def get_verbose_name(cls): 

132 return cls._meta.verbose_name 

133 

134 @classmethod 

135 def valid_import_url(cls, uri: str): 

136 """ 

137 Check if an URI is a can be imported. 

138 The exact fetching logic for an URI is defined in the 

139 `import_definitions` attribute of the class. 

140 `import_definitions` has to be a dict, mapping a regex 

141 matching the URI to a callable taking the URI as an argument. 

142 This method check if there is a callable defined for this URI. 

143 """ 

144 uri = get_normalized_uri(uri) 

145 for regex, fn in getattr(cls, "import_definitions", {}).items(): 

146 if re.match(regex, uri): 

147 return fn 

148 return False 

149 

150 @classmethod 

151 def fetch_from(cls, uri: str): 

152 """ 

153 Fetch data from an URI. Check if there is import logic 

154 configured for this URI and if so, use that import logic 

155 to fetch the data. 

156 """ 

157 uri = get_normalized_uri(uri) 

158 if fn := cls.valid_import_url(uri): 

159 return fn(uri) or {} 

160 raise ImproperlyConfigured(f"Import not configured for URI {uri}") 

161 

162 @classmethod 

163 def import_from(cls, uri: str, allow_empty: bool = True): 

164 """ 

165 Fetch data from an URI and create a model instance using 

166 that data. If the `allow_empty` argument is set, this also 

167 creates a model instance if the data fetched was empty. This 

168 might make sense if you still want to create an instance and 

169 attach the URI to it. 

170 """ 

171 uri = get_normalized_uri(uri) 

172 # we allow other apps to injercept the import 

173 # whatever they return will be used instead of 

174 # creating a new object 

175 for receiver, response in pre_import_from.send(sender=cls, uri=uri): 

176 if response: 

177 return response 

178 data = cls.fetch_from(uri) or {} 

179 if allow_empty or data: 

180 instance = cls() 

181 instance.import_data(data) 

182 instance._uris = [uri] 

183 instance.save() 

184 return instance 

185 raise ValueError(f"Could not fetch data to import from {uri}") 

186 

187 def import_from_dict_subset(self, **data) -> dict: 

188 """ 

189 Import attributes of this instance from data in a dict. 

190 We iterate through the individual values of the dict and 

191 a) only set them if the instance has an attribute matching 

192 the key and b) use the fields `clean` method to check if 

193 the value validates. If it does not validate, we return 

194 the validation error in the errors dict. 

195 """ 

196 errors = {} 

197 if data: 

198 for field in self._meta.fields: 

199 if data.get(field.name, False): 

200 value = str(data[field.name][0]) 

201 try: 

202 field.clean(self, value) 

203 except Exception as e: 

204 logger.info( 

205 "Could not set %s on %s: %s", field.name, str(self), str(e) 

206 ) 

207 errors[field.name] = str(e) 

208 else: 

209 setattr(self, field.name, value) 

210 self.save() 

211 return errors 

212 

213 def import_data(self, data) -> dict: 

214 return self.import_from_dict_subset(**data) 

215 

216 def get_merge_charfield_value(self, other: CharField, field: CharField): 

217 res = getattr(self, field.name) 

218 if not field.choices: 

219 otherres = getattr(other, field.name, res) 

220 if otherres and otherres != res: 

221 res += f" ({otherres})" 

222 return res 

223 

224 def get_merge_textfield_value(self, other: TextField, field: TextField): 

225 res = getattr(self, field.name) 

226 if getattr(other, field.name): 

227 # if own value is None, fallback to empty string 

228 res = res or "" 

229 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name) 

230 return res 

231 

232 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField): 

233 return getattr(other, field.name) 

234 

235 def get_field_value_after_merge(self, other, field): 

236 """ 

237 This method finds the value of a field after merging `other` into `self`. 

238 It first tries to find a merge method that is specific to that field 

239 (merge_{fieldname}) and then tries to find a method that is specific to 

240 the type of the field (merge_{fieldtype}) 

241 If neither of those exist, it uses the others field value if the field 

242 in self is not set, otherwise it keeps the value in self. 

243 """ 

244 fieldtype = field.get_internal_type().lower() 

245 # if there is a `get_merge_{fieldname}` method in this model, use that one 

246 if callable(getattr(self, f"get_merge_{field.name}_value", None)): 

247 return getattr(self, f"get_merge_{field.name}_value")(other) 

248 # otherwise we check if there is a method for the field type and use that one 

249 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)): 

250 return getattr(self, f"get_merge_{fieldtype}_value")(other, field) 

251 else: 

252 if not getattr(self, field.name): 

253 return getattr(other, field.name) 

254 return getattr(self, field.name) 

255 

256 def merge_fields(self, other): 

257 """ 

258 This method iterates through the model fields and uses the 

259 `get_field_value_after_merge` method to copy values from `other` to `self`. 

260 It is called by the `merge_with` method. 

261 """ 

262 for field in self._meta.fields: 

263 newval = self.get_field_value_after_merge(other, field) 

264 if newval != getattr(self, field.name): 

265 setattr(self, field.name, newval) 

266 self.save() 

267 

268 def merge_with(self, entities): 

269 if self in entities: 

270 entities.remove(self) 

271 origin = self.__class__ 

272 pre_merge_with.send(sender=origin, instance=self, entities=entities) 

273 

274 e_a = type(self).__name__ 

275 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

276 if isinstance(entities, int): 

277 entities = self_model_class.objects.get(pk=entities) 

278 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

279 entities = [entities] 

280 entities = [ 

281 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

282 for ent in entities 

283 ] 

284 for ent in entities: 

285 e_b = type(ent).__name__ 

286 if e_a != e_b: 

287 continue 

288 for f in ent._meta.local_many_to_many: 

289 if not f.name.endswith("_set"): 

290 sl = list(getattr(self, f.name).all()) 

291 for s in getattr(ent, f.name).all(): 

292 if s not in sl: 

293 getattr(self, f.name).add(s) 

294 

295 for ent in entities: 

296 self.merge_fields(ent) 

297 

298 post_merge_with.send(sender=origin, instance=self, entities=entities) 

299 

300 for ent in entities: 

301 ent.delete() 

302 

303 def duplicate(self): 

304 origin = self.__class__ 

305 pre_duplicate.send(sender=origin, instance=self) 

306 # usually, copying instances would work like 

307 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances 

308 # but we are working with abstract classes, 

309 # so we have to do it by hand using model_to_dict:( 

310 objdict = model_to_dict(self) 

311 

312 # remove unique fields from dict representation 

313 unique_fields = [field for field in self._meta.fields if field.unique] 

314 for field in unique_fields: 

315 logger.info(f"Duplicating {self}: ignoring unique field {field.name}") 

316 objdict.pop(field.name, None) 

317 

318 # remove related fields from dict representation 

319 related_fields = [ 

320 field for field in self._meta.get_fields() if field.is_relation 

321 ] 

322 for field in related_fields: 

323 objdict.pop(field.name, None) 

324 

325 newobj = type(self).objects.create(**objdict) 

326 

327 for field in related_fields: 

328 # we are not using `isinstance` because we want to 

329 # differentiate between different levels of inheritance 

330 if type(field) is ForeignKey: 

331 setattr(newobj, field.name, getattr(self, field.name)) 

332 if type(field) is ManyToManyField: 

333 objfield = getattr(newobj, field.name) 

334 values = getattr(self, field.name).all() 

335 objfield.set(values) 

336 

337 newobj.save() 

338 post_duplicate.send(sender=origin, instance=self, duplicate=newobj) 

339 return newobj 

340 

341 duplicate.alters_data = True 

342 

343 def uri_set(self): 

344 ct = ContentType.objects.get_for_model(self) 

345 return ( 

346 ContentType.objects.get(app_label="uris", model="uri") 

347 .model_class() 

348 .objects.filter(content_type=ct, object_id=self.id) 

349 .all() 

350 )