Coverage for apis_core / generic / abc.py: 59%

232 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-30 14:27 +0000

1import logging 

2import re 

3from typing import Tuple 

4 

5from django.contrib.contenttypes.models import ContentType 

6from django.core.exceptions import ImproperlyConfigured 

7from django.db import models 

8from django.db.models import BooleanField, CharField, TextField 

9from django.db.models.fields.related import ForeignKey, ManyToManyField 

10from django.db.models.query import QuerySet 

11from django.forms import model_to_dict 

12from django.urls import reverse 

13 

14from apis_core.generic.helpers import mro_paths, permission_fullname 

15from apis_core.generic.signals import ( 

16 post_duplicate, 

17 post_merge_with, 

18 pre_duplicate, 

19 pre_import_from, 

20 pre_merge_with, 

21) 

22from apis_core.generic.utils import get_autocomplete_data_and_normalized_uri 

23from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix 

24 

25logger = logging.getLogger(__name__) 

26 

27 

28class GenericModel(models.Model): 

29 class Meta: 

30 abstract = True 

31 

32 def __repr__(self): 

33 if id := getattr(self, "id", None): 

34 return super().__repr__() + f" (ID: {id})" 

35 return super().__repr__() 

36 

37 @property 

38 def content_type(self): 

39 return ContentType.objects.get_for_model(self) 

40 

41 @classmethod 

42 def get_listview_url(cls): 

43 ct = ContentType.objects.get_for_model(cls) 

44 return reverse("apis_core:generic:list", args=[ct]) 

45 

46 @classmethod 

47 def get_createview_url(cls): 

48 ct = ContentType.objects.get_for_model(cls) 

49 return reverse("apis_core:generic:create", args=[ct]) 

50 

51 @classmethod 

52 def get_importview_url(cls): 

53 ct = ContentType.objects.get_for_model(cls) 

54 return reverse("apis_core:generic:import", args=[ct]) 

55 

56 @classmethod 

57 def get_openapi_tags(cls): 

58 return [item[-1] for item in mro_paths(cls)] 

59 

60 @classmethod 

61 def get_namespace_prefix(cls): 

62 ct = ContentType.objects.get_for_model(cls) 

63 return f"{rdf_namespace_prefix()}-{ct.model}" 

64 

65 @classmethod 

66 def get_namespace_uri(cls): 

67 return apis_base_uri() + cls.get_listview_url() 

68 

69 @classmethod 

70 def get_rdf_types(cls): 

71 return [] 

72 

73 def get_edit_url(self): 

74 ct = ContentType.objects.get_for_model(self) 

75 return reverse("apis_core:generic:update", args=[ct, self.id]) 

76 

77 def get_duplicate_url(self): 

78 ct = ContentType.objects.get_for_model(self) 

79 return reverse("apis_core:generic:duplicate", args=[ct, self.id]) 

80 

81 def get_enrich_url(self): 

82 ct = ContentType.objects.get_for_model(self) 

83 return reverse("apis_core:generic:enrich", args=[ct, self.id]) 

84 

85 def get_absolute_url(self): 

86 ct = ContentType.objects.get_for_model(self) 

87 return reverse("apis_core:generic:detail", args=[ct, self.id]) 

88 

89 def get_delete_url(self): 

90 ct = ContentType.objects.get_for_model(self) 

91 return reverse("apis_core:generic:delete", args=[ct, self.id]) 

92 

93 def get_merge_url(self, other_id): 

94 ct = ContentType.objects.get_for_model(self) 

95 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id]) 

96 

97 def get_select_merge_or_enrich_url(self): 

98 ct = ContentType.objects.get_for_model(self) 

99 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id]) 

100 

101 def get_create_success_url(self): 

102 return self.get_absolute_url() 

103 

104 def get_update_success_url(self): 

105 return self.get_edit_url() 

106 

107 def get_api_detail_endpoint(self): 

108 ct = ContentType.objects.get_for_model(self) 

109 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id]) 

110 

111 @classmethod 

112 def get_change_permission(self): 

113 return permission_fullname("change", self) 

114 

115 @classmethod 

116 def get_add_permission(self): 

117 return permission_fullname("add", self) 

118 

119 @classmethod 

120 def get_delete_permission(self): 

121 return permission_fullname("delete", self) 

122 

123 @classmethod 

124 def get_view_permission(self): 

125 return permission_fullname("view", self) 

126 

127 @classmethod 

128 def get_verbose_name_plural(cls): 

129 return cls._meta.verbose_name_plural 

130 

131 @classmethod 

132 def get_verbose_name(cls): 

133 return cls._meta.verbose_name 

134 

135 @classmethod 

136 def valid_import_url(cls, uri: str): 

137 """ 

138 Check if an URI is a can be imported. 

139 The exact fetching logic for an URI is defined in the 

140 `import_definitions` attribute of the class. 

141 `import_definitions` has to be a dict, mapping a regex 

142 matching the URI to a callable taking the URI as an argument. 

143 This method check if there is a callable defined for this URI. 

144 """ 

145 _, uri = get_autocomplete_data_and_normalized_uri(uri) 

146 for regex, fn in getattr(cls, "import_definitions", {}).items(): 

147 if re.match(regex, uri): 

148 return fn 

149 return False 

150 

151 @classmethod 

152 def get_data_and_normalized_uri(cls, uri: str) -> Tuple[dict, str]: 

153 data, uri = get_autocomplete_data_and_normalized_uri(uri) 

154 return data, uri 

155 

156 @classmethod 

157 def fetch_from(cls, uri: str): 

158 """ 

159 Normalize the URI and extract the autocomplete data. 

160 Then try to fetch data from an URI: 

161 Check if there is import logic configured for this URI and if 

162 so, use that import logic to fetch the data. 

163 Finally, combine the fetched data and the autocomplete data. 

164 """ 

165 logger.debug("Fetch from %s", uri) 

166 data, nuri = cls.get_data_and_normalized_uri(uri) 

167 if fn := cls.valid_import_url(nuri): 

168 fetcheddata = fn(nuri) or {} 

169 # merge the two dicts 

170 ret = fetcheddata | data 

171 # combine values that exist in both dicts 

172 for key in set(fetcheddata).intersection(data): 

173 ret[key] = fetcheddata[key] + data[key] 

174 return ret 

175 raise ImproperlyConfigured(f"Import not configured for URI {uri}") 

176 

177 @classmethod 

178 def import_from(cls, uri: str, allow_empty: bool = True): 

179 """ 

180 Fetch data from an URI and create a model instance using 

181 that data. If the `allow_empty` argument is set, this also 

182 creates a model instance if the data fetched was empty. This 

183 might make sense if you still want to create an instance and 

184 attach the URI to it. 

185 """ 

186 # we allow other apps to injercept the import 

187 # whatever they return will be used instead of 

188 # creating a new object 

189 _, nuri = cls.get_data_and_normalized_uri(uri) 

190 for receiver, response in pre_import_from.send(sender=cls, uri=nuri): 

191 if response: 

192 return response 

193 data = cls.fetch_from(uri) or {} 

194 if allow_empty or data: 

195 instance = cls() 

196 instance.save() 

197 instance.import_data(data) 

198 return instance 

199 raise ValueError(f"Could not fetch data to import from {uri}") 

200 

201 def import_from_dict_subset(self, **data): 

202 """ 

203 Import attributes of this instance from data in a dict. 

204 We iterate through the individual values of the dict and 

205 a) only set them if the instance has an attribute matching 

206 the key and b) use the fields `clean` method to check if 

207 the value validates. If it does not validate, we return 

208 the validation error in the errors dict. 

209 """ 

210 self._import_errors = {} 

211 if data: 

212 for field in self._meta.fields: 

213 if data.get(field.name, False): 

214 value = str(data[field.name][0]) 

215 try: 

216 field.clean(value, self) 

217 except Exception as e: 

218 logger.info( 

219 "Could not set %s on %s: %s", field.name, str(self), str(e) 

220 ) 

221 self._import_errors[field.name] = str(e) 

222 else: 

223 setattr(self, field.name, value) 

224 self.save() 

225 

226 def import_data(self, data): 

227 self.import_from_dict_subset(**data) 

228 

229 def get_merge_charfield_value(self, other: CharField, field: CharField): 

230 res = getattr(self, str(field.name)) 

231 if not field.choices: 

232 otherres = getattr(other, str(field.name), res) 

233 if otherres and otherres != res: 

234 res += f" ({otherres})" 

235 return res 

236 

237 def get_merge_textfield_value(self, other: TextField, field: TextField): 

238 res = getattr(self, str(field.name)) 

239 if getattr(other, str(field.name)): 

240 # if own value is None, fallback to empty string 

241 res = res or "" 

242 res += "\n" + f"Merged from {other}:\n" + getattr(other, str(field.name)) 

243 return res 

244 

245 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField): 

246 return getattr(other, str(field.name)) 

247 

248 def get_field_value_after_merge(self, other, field): 

249 """ 

250 This method finds the value of a field after merging `other` into `self`. 

251 It first tries to find a merge method that is specific to that field 

252 (merge_{fieldname}) and then tries to find a method that is specific to 

253 the type of the field (merge_{fieldtype}) 

254 If neither of those exist, it uses the others field value if the field 

255 in self is not set, otherwise it keeps the value in self. 

256 """ 

257 fieldtype = field.get_internal_type().lower() 

258 # if there is a `get_merge_{fieldname}` method in this model, use that one 

259 if callable(getattr(self, f"get_merge_{field.name}_value", None)): 

260 return getattr(self, f"get_merge_{field.name}_value")(other) 

261 # otherwise we check if there is a method for the field type and use that one 

262 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)): 

263 return getattr(self, f"get_merge_{fieldtype}_value")(other, field) 

264 else: 

265 if not getattr(self, str(field.name)): 

266 return getattr(other, str(field.name)) 

267 return getattr(self, field.name) 

268 

269 def merge_fields(self, other): 

270 """ 

271 This method iterates through the model fields and uses the 

272 `get_field_value_after_merge` method to copy values from `other` to `self`. 

273 It is called by the `merge_with` method. 

274 """ 

275 for field in self._meta.fields: 

276 newval = self.get_field_value_after_merge(other, field) 

277 if newval != getattr(self, str(field.name)): 

278 setattr(self, str(field.name), newval) 

279 self.save() 

280 

281 def merge_with(self, entities): 

282 if self in entities: 

283 entities.remove(self) 

284 origin = self.__class__ 

285 pre_merge_with.send(sender=origin, instance=self, entities=entities) 

286 

287 e_a = type(self).__name__ 

288 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

289 if isinstance(entities, int): 

290 entities = self_model_class.objects.get(pk=entities) 

291 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

292 entities = [entities] 

293 entities = [ 

294 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

295 for ent in entities 

296 ] 

297 for ent in entities: 

298 e_b = type(ent).__name__ 

299 if e_a != e_b: 

300 continue 

301 for f in ent._meta.local_many_to_many: 

302 if not f.name.endswith("_set"): 

303 sl = list(getattr(self, f.name).all()) 

304 for s in getattr(ent, f.name).all(): 

305 if s not in sl: 

306 getattr(self, f.name).add(s) 

307 

308 for ent in entities: 

309 self.merge_fields(ent) 

310 

311 post_merge_with.send(sender=origin, instance=self, entities=entities) 

312 

313 for ent in entities: 

314 ent.delete() 

315 

316 def duplicate(self): 

317 origin = self.__class__ 

318 pre_duplicate.send(sender=origin, instance=self) 

319 # usually, copying instances would work like 

320 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances 

321 # but we are working with abstract classes, 

322 # so we have to do it by hand using model_to_dict:( 

323 objdict = model_to_dict(self) 

324 

325 # remove unique fields from dict representation 

326 unique_fields = [field for field in self._meta.fields if field.unique] 

327 for field in unique_fields: 

328 logger.info(f"Duplicating {self}: ignoring unique field {field.name}") 

329 objdict.pop(field.name, None) 

330 

331 # remove related fields from dict representation 

332 related_fields = [ 

333 field for field in self._meta.get_fields() if field.is_relation 

334 ] 

335 for field in related_fields: 

336 objdict.pop(field.name, None) 

337 

338 newobj = type(self).objects.create(**objdict) 

339 

340 for field in related_fields: 

341 # we are not using `isinstance` because we want to 

342 # differentiate between different levels of inheritance 

343 if type(field) is ForeignKey: 

344 setattr(newobj, field.name, getattr(self, field.name)) 

345 if type(field) is ManyToManyField: 

346 objfield = getattr(newobj, field.name) 

347 values = getattr(self, field.name).all() 

348 objfield.set(values) 

349 

350 newobj.save() 

351 post_duplicate.send(sender=origin, instance=self, duplicate=newobj) 

352 return newobj 

353 

354 duplicate.alters_data = True 

355 

356 def uri_set(self): 

357 ct = ContentType.objects.get_for_model(self) 

358 return ( 

359 ContentType.objects.get(app_label="uris", model="uri") 

360 .model_class() 

361 .objects.filter(content_type=ct, object_id=self.id) 

362 .all() 

363 ) 

364 

365 def uri_set_with_importer(self): 

366 return [uri for uri in self.uri_set() if self.valid_import_url(uri.uri)]