Coverage for apis_core/generic/abc.py: 57%

228 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-12-04 11:32 +0000

1import logging 

2import re 

3 

4from django.contrib.contenttypes.models import ContentType 

5from django.core.exceptions import ImproperlyConfigured 

6from django.db import models 

7from django.db.models import BooleanField, CharField, TextField 

8from django.db.models.fields.related import ForeignKey, ManyToManyField 

9from django.db.models.query import QuerySet 

10from django.forms import model_to_dict 

11from django.urls import reverse 

12 

13from apis_core.generic.helpers import mro_paths, permission_fullname 

14from apis_core.generic.signals import ( 

15 post_duplicate, 

16 post_merge_with, 

17 pre_duplicate, 

18 pre_import_from, 

19 pre_merge_with, 

20) 

21from apis_core.generic.utils import get_autocomplete_data_and_normalized_uri 

22from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27class GenericModel(models.Model): 

28 class Meta: 

29 abstract = True 

30 

31 def __repr__(self): 

32 if id := getattr(self, "id", None): 

33 return super().__repr__() + f" (ID: {id})" 

34 return super().__repr__() 

35 

36 @property 

37 def content_type(self): 

38 return ContentType.objects.get_for_model(self) 

39 

40 @classmethod 

41 def get_listview_url(cls): 

42 ct = ContentType.objects.get_for_model(cls) 

43 return reverse("apis_core:generic:list", args=[ct]) 

44 

45 @classmethod 

46 def get_createview_url(cls): 

47 ct = ContentType.objects.get_for_model(cls) 

48 return reverse("apis_core:generic:create", args=[ct]) 

49 

50 @classmethod 

51 def get_importview_url(cls): 

52 ct = ContentType.objects.get_for_model(cls) 

53 return reverse("apis_core:generic:import", args=[ct]) 

54 

55 @classmethod 

56 def get_openapi_tags(cls): 

57 return [item[-1] for item in mro_paths(cls)] 

58 

59 @classmethod 

60 def get_namespace_prefix(cls): 

61 ct = ContentType.objects.get_for_model(cls) 

62 return f"{rdf_namespace_prefix()}-{ct.model}" 

63 

64 @classmethod 

65 def get_namespace_uri(cls): 

66 return apis_base_uri() + cls.get_listview_url() 

67 

68 @classmethod 

69 def get_rdf_types(cls): 

70 return [] 

71 

72 def get_edit_url(self): 

73 ct = ContentType.objects.get_for_model(self) 

74 return reverse("apis_core:generic:update", args=[ct, self.id]) 

75 

76 def get_duplicate_url(self): 

77 ct = ContentType.objects.get_for_model(self) 

78 return reverse("apis_core:generic:duplicate", args=[ct, self.id]) 

79 

80 def get_enrich_url(self): 

81 ct = ContentType.objects.get_for_model(self) 

82 return reverse("apis_core:generic:enrich", args=[ct, self.id]) 

83 

84 def get_absolute_url(self): 

85 ct = ContentType.objects.get_for_model(self) 

86 return reverse("apis_core:generic:detail", args=[ct, self.id]) 

87 

88 def get_delete_url(self): 

89 ct = ContentType.objects.get_for_model(self) 

90 return reverse("apis_core:generic:delete", args=[ct, self.id]) 

91 

92 def get_merge_url(self, other_id): 

93 ct = ContentType.objects.get_for_model(self) 

94 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id]) 

95 

96 def get_select_merge_or_enrich_url(self): 

97 ct = ContentType.objects.get_for_model(self) 

98 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id]) 

99 

100 def get_create_success_url(self): 

101 return self.get_absolute_url() 

102 

103 def get_update_success_url(self): 

104 return self.get_edit_url() 

105 

106 def get_api_detail_endpoint(self): 

107 ct = ContentType.objects.get_for_model(self) 

108 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id]) 

109 

110 @classmethod 

111 def get_change_permission(self): 

112 return permission_fullname("change", self) 

113 

114 @classmethod 

115 def get_add_permission(self): 

116 return permission_fullname("add", self) 

117 

118 @classmethod 

119 def get_delete_permission(self): 

120 return permission_fullname("delete", self) 

121 

122 @classmethod 

123 def get_view_permission(self): 

124 return permission_fullname("view", self) 

125 

126 @classmethod 

127 def get_verbose_name_plural(cls): 

128 return cls._meta.verbose_name_plural 

129 

130 @classmethod 

131 def get_verbose_name(cls): 

132 return cls._meta.verbose_name 

133 

134 @classmethod 

135 def valid_import_url(cls, uri: str): 

136 """ 

137 Check if an URI is a can be imported. 

138 The exact fetching logic for an URI is defined in the 

139 `import_definitions` attribute of the class. 

140 `import_definitions` has to be a dict, mapping a regex 

141 matching the URI to a callable taking the URI as an argument. 

142 This method check if there is a callable defined for this URI. 

143 """ 

144 _, uri = get_autocomplete_data_and_normalized_uri(uri) 

145 for regex, fn in getattr(cls, "import_definitions", {}).items(): 

146 if re.match(regex, uri): 

147 return fn 

148 return False 

149 

150 @classmethod 

151 def fetch_from(cls, uri: str): 

152 """ 

153 Normalize the URI and extract the autocomplete data. 

154 Then try to fetch data from an URI: 

155 Check if there is import logic configured for this URI and if 

156 so, use that import logic to fetch the data. 

157 Finally, combine the fetched data and the autocomplete data. 

158 """ 

159 logger.debug("Fetch from %s", uri) 

160 data, nuri = get_autocomplete_data_and_normalized_uri(uri) 

161 if fn := cls.valid_import_url(nuri): 

162 fetcheddata = fn(nuri) or {} 

163 # merge the two dicts 

164 ret = fetcheddata | data 

165 # combine values that exist in both dicts 

166 for key in set(fetcheddata).intersection(data): 

167 ret[key] = fetcheddata[key] + data[key] 

168 return ret 

169 raise ImproperlyConfigured(f"Import not configured for URI {uri}") 

170 

171 @classmethod 

172 def import_from(cls, uri: str, allow_empty: bool = True): 

173 """ 

174 Fetch data from an URI and create a model instance using 

175 that data. If the `allow_empty` argument is set, this also 

176 creates a model instance if the data fetched was empty. This 

177 might make sense if you still want to create an instance and 

178 attach the URI to it. 

179 """ 

180 # we allow other apps to injercept the import 

181 # whatever they return will be used instead of 

182 # creating a new object 

183 _, nuri = get_autocomplete_data_and_normalized_uri(uri) 

184 for receiver, response in pre_import_from.send(sender=cls, uri=nuri): 

185 if response: 

186 return response 

187 data = cls.fetch_from(uri) or {} 

188 if allow_empty or data: 

189 instance = cls() 

190 instance._uris = [data.get("uri", nuri)] 

191 instance.save() 

192 instance.import_data(data) 

193 return instance 

194 raise ValueError(f"Could not fetch data to import from {uri}") 

195 

196 def import_from_dict_subset(self, **data): 

197 """ 

198 Import attributes of this instance from data in a dict. 

199 We iterate through the individual values of the dict and 

200 a) only set them if the instance has an attribute matching 

201 the key and b) use the fields `clean` method to check if 

202 the value validates. If it does not validate, we return 

203 the validation error in the errors dict. 

204 """ 

205 self._import_errors = {} 

206 if data: 

207 for field in self._meta.fields: 

208 if data.get(field.name, False): 

209 value = str(data[field.name][0]) 

210 try: 

211 field.clean(value, self) 

212 except Exception as e: 

213 logger.info( 

214 "Could not set %s on %s: %s", field.name, str(self), str(e) 

215 ) 

216 self._import_errors[field.name] = str(e) 

217 else: 

218 setattr(self, field.name, value) 

219 self.save() 

220 

221 def import_data(self, data): 

222 self.import_from_dict_subset(**data) 

223 

224 def get_merge_charfield_value(self, other: CharField, field: CharField): 

225 res = getattr(self, field.name) 

226 if not field.choices: 

227 otherres = getattr(other, field.name, res) 

228 if otherres and otherres != res: 

229 res += f" ({otherres})" 

230 return res 

231 

232 def get_merge_textfield_value(self, other: TextField, field: TextField): 

233 res = getattr(self, field.name) 

234 if getattr(other, field.name): 

235 # if own value is None, fallback to empty string 

236 res = res or "" 

237 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name) 

238 return res 

239 

240 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField): 

241 return getattr(other, field.name) 

242 

243 def get_field_value_after_merge(self, other, field): 

244 """ 

245 This method finds the value of a field after merging `other` into `self`. 

246 It first tries to find a merge method that is specific to that field 

247 (merge_{fieldname}) and then tries to find a method that is specific to 

248 the type of the field (merge_{fieldtype}) 

249 If neither of those exist, it uses the others field value if the field 

250 in self is not set, otherwise it keeps the value in self. 

251 """ 

252 fieldtype = field.get_internal_type().lower() 

253 # if there is a `get_merge_{fieldname}` method in this model, use that one 

254 if callable(getattr(self, f"get_merge_{field.name}_value", None)): 

255 return getattr(self, f"get_merge_{field.name}_value")(other) 

256 # otherwise we check if there is a method for the field type and use that one 

257 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)): 

258 return getattr(self, f"get_merge_{fieldtype}_value")(other, field) 

259 else: 

260 if not getattr(self, field.name): 

261 return getattr(other, field.name) 

262 return getattr(self, field.name) 

263 

264 def merge_fields(self, other): 

265 """ 

266 This method iterates through the model fields and uses the 

267 `get_field_value_after_merge` method to copy values from `other` to `self`. 

268 It is called by the `merge_with` method. 

269 """ 

270 for field in self._meta.fields: 

271 newval = self.get_field_value_after_merge(other, field) 

272 if newval != getattr(self, field.name): 

273 setattr(self, field.name, newval) 

274 self.save() 

275 

276 def merge_with(self, entities): 

277 if self in entities: 

278 entities.remove(self) 

279 origin = self.__class__ 

280 pre_merge_with.send(sender=origin, instance=self, entities=entities) 

281 

282 e_a = type(self).__name__ 

283 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

284 if isinstance(entities, int): 

285 entities = self_model_class.objects.get(pk=entities) 

286 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

287 entities = [entities] 

288 entities = [ 

289 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

290 for ent in entities 

291 ] 

292 for ent in entities: 

293 e_b = type(ent).__name__ 

294 if e_a != e_b: 

295 continue 

296 for f in ent._meta.local_many_to_many: 

297 if not f.name.endswith("_set"): 

298 sl = list(getattr(self, f.name).all()) 

299 for s in getattr(ent, f.name).all(): 

300 if s not in sl: 

301 getattr(self, f.name).add(s) 

302 

303 for ent in entities: 

304 self.merge_fields(ent) 

305 

306 post_merge_with.send(sender=origin, instance=self, entities=entities) 

307 

308 for ent in entities: 

309 ent.delete() 

310 

311 def duplicate(self): 

312 origin = self.__class__ 

313 pre_duplicate.send(sender=origin, instance=self) 

314 # usually, copying instances would work like 

315 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances 

316 # but we are working with abstract classes, 

317 # so we have to do it by hand using model_to_dict:( 

318 objdict = model_to_dict(self) 

319 

320 # remove unique fields from dict representation 

321 unique_fields = [field for field in self._meta.fields if field.unique] 

322 for field in unique_fields: 

323 logger.info(f"Duplicating {self}: ignoring unique field {field.name}") 

324 objdict.pop(field.name, None) 

325 

326 # remove related fields from dict representation 

327 related_fields = [ 

328 field for field in self._meta.get_fields() if field.is_relation 

329 ] 

330 for field in related_fields: 

331 objdict.pop(field.name, None) 

332 

333 newobj = type(self).objects.create(**objdict) 

334 

335 for field in related_fields: 

336 # we are not using `isinstance` because we want to 

337 # differentiate between different levels of inheritance 

338 if type(field) is ForeignKey: 

339 setattr(newobj, field.name, getattr(self, field.name)) 

340 if type(field) is ManyToManyField: 

341 objfield = getattr(newobj, field.name) 

342 values = getattr(self, field.name).all() 

343 objfield.set(values) 

344 

345 newobj.save() 

346 post_duplicate.send(sender=origin, instance=self, duplicate=newobj) 

347 return newobj 

348 

349 duplicate.alters_data = True 

350 

351 def uri_set(self): 

352 ct = ContentType.objects.get_for_model(self) 

353 return ( 

354 ContentType.objects.get(app_label="uris", model="uri") 

355 .model_class() 

356 .objects.filter(content_type=ct, object_id=self.id) 

357 .all() 

358 ) 

359 

360 def uri_set_with_importer(self): 

361 return [uri for uri in self.uri_set() if self.valid_import_url(uri.uri)]