Coverage for apis_core/generic/abc.py: 57%

173 statements  

« prev     ^ index     » next       coverage.py v7.5.3, created at 2025-06-25 10:00 +0000

1import logging 

2 

3from django.contrib.contenttypes.models import ContentType 

4from django.db.models import BooleanField, CharField, TextField 

5from django.db.models.fields.related import ForeignKey, ManyToManyField 

6from django.db.models.query import QuerySet 

7from django.forms import model_to_dict 

8from django.urls import reverse 

9 

10from apis_core.generic.helpers import mro_paths, permission_fullname 

11from apis_core.generic.signals import ( 

12 post_duplicate, 

13 post_merge_with, 

14 pre_duplicate, 

15 pre_merge_with, 

16) 

17from apis_core.utils.settings import apis_base_uri, rdf_namespace_prefix 

18 

19logger = logging.getLogger(__name__) 

20 

21 

22class GenericModel: 

23 def __repr__(self): 

24 if id := getattr(self, "id", None): 

25 return super().__repr__() + f" (ID: {id})" 

26 return super().__repr__() 

27 

28 @property 

29 def content_type(self): 

30 return ContentType.objects.get_for_model(self) 

31 

32 @classmethod 

33 def get_listview_url(cls): 

34 ct = ContentType.objects.get_for_model(cls) 

35 return reverse("apis_core:generic:list", args=[ct]) 

36 

37 @classmethod 

38 def get_createview_url(cls): 

39 ct = ContentType.objects.get_for_model(cls) 

40 return reverse("apis_core:generic:create", args=[ct]) 

41 

42 @classmethod 

43 def get_importview_url(cls): 

44 ct = ContentType.objects.get_for_model(cls) 

45 return reverse("apis_core:generic:import", args=[ct]) 

46 

47 @classmethod 

48 def get_openapi_tags(cls): 

49 return [item[-1] for item in mro_paths(cls)] 

50 

51 @classmethod 

52 def get_namespace_prefix(cls): 

53 ct = ContentType.objects.get_for_model(cls) 

54 return f"{rdf_namespace_prefix()}-{ct.model}" 

55 

56 @classmethod 

57 def get_namespace_uri(cls): 

58 return apis_base_uri() + cls.get_listview_url() 

59 

60 @classmethod 

61 def get_rdf_types(cls): 

62 return [] 

63 

64 def get_edit_url(self): 

65 ct = ContentType.objects.get_for_model(self) 

66 return reverse("apis_core:generic:update", args=[ct, self.id]) 

67 

68 def get_duplicate_url(self): 

69 ct = ContentType.objects.get_for_model(self) 

70 return reverse("apis_core:generic:duplicate", args=[ct, self.id]) 

71 

72 def get_enrich_url(self): 

73 ct = ContentType.objects.get_for_model(self) 

74 return reverse("apis_core:generic:enrich", args=[ct, self.id]) 

75 

76 def get_absolute_url(self): 

77 ct = ContentType.objects.get_for_model(self) 

78 return reverse("apis_core:generic:detail", args=[ct, self.id]) 

79 

80 def get_delete_url(self): 

81 ct = ContentType.objects.get_for_model(self) 

82 return reverse("apis_core:generic:delete", args=[ct, self.id]) 

83 

84 def get_merge_url(self, other_id): 

85 ct = ContentType.objects.get_for_model(self) 

86 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id]) 

87 

88 def get_select_merge_or_enrich_url(self): 

89 ct = ContentType.objects.get_for_model(self) 

90 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id]) 

91 

92 def get_create_success_url(self): 

93 return self.get_absolute_url() 

94 

95 def get_update_success_url(self): 

96 return self.get_edit_url() 

97 

98 def get_api_detail_endpoint(self): 

99 ct = ContentType.objects.get_for_model(self) 

100 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id]) 

101 

102 @classmethod 

103 def get_change_permission(self): 

104 return permission_fullname("change", self) 

105 

106 @classmethod 

107 def get_add_permission(self): 

108 return permission_fullname("add", self) 

109 

110 @classmethod 

111 def get_delete_permission(self): 

112 return permission_fullname("delete", self) 

113 

114 @classmethod 

115 def get_view_permission(self): 

116 return permission_fullname("view", self) 

117 

118 @classmethod 

119 def get_verbose_name_plural(cls): 

120 return cls._meta.verbose_name_plural 

121 

122 def get_merge_charfield_value(self, other: CharField, field: CharField): 

123 res = getattr(self, field.name) 

124 if not field.choices: 

125 otherres = getattr(other, field.name, res) 

126 if otherres != res: 

127 res += f" ({otherres})" 

128 return res 

129 

130 def get_merge_textfield_value(self, other: TextField, field: TextField): 

131 res = getattr(self, field.name) 

132 if getattr(other, field.name): 

133 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name) 

134 return res 

135 

136 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField): 

137 return getattr(other, field.name) 

138 

139 def get_field_value_after_merge(self, other, field): 

140 """ 

141 This method finds the value of a field after merging `other` into `self`. 

142 It first tries to find a merge method that is specific to that field 

143 (merge_{fieldname}) and then tries to find a method that is specific to 

144 the type of the field (merge_{fieldtype}) 

145 If neither of those exist, it uses the others field value if the field 

146 in self is not set, otherwise it keeps the value in self. 

147 """ 

148 fieldtype = field.get_internal_type().lower() 

149 # if there is a `get_merge_{fieldname}` method in this model, use that one 

150 if callable(getattr(self, f"get_merge_{field.name}_value", None)): 

151 return getattr(self, f"get_merge_{field.name}_value")(other) 

152 # otherwise we check if there is a method for the field type and use that one 

153 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)): 

154 return getattr(self, f"get_merge_{fieldtype}_value")(other, field) 

155 else: 

156 if not getattr(self, field.name): 

157 return getattr(other, field.name) 

158 return getattr(self, field.name) 

159 

160 def merge_fields(self, other): 

161 """ 

162 This method iterates through the model fields and uses the 

163 `get_field_value_after_merge` method to copy values from `other` to `self`. 

164 It is called by the `merge_with` method. 

165 """ 

166 for field in self._meta.fields: 

167 newval = self.get_field_value_after_merge(other, field) 

168 if newval != getattr(self, field.name): 

169 setattr(self, field.name, newval) 

170 self.save() 

171 

172 def merge_with(self, entities): 

173 if self in entities: 

174 entities.remove(self) 

175 origin = self.__class__ 

176 pre_merge_with.send(sender=origin, instance=self, entities=entities) 

177 

178 # TODO: check if these imports can be put to top of module without 

179 # causing circular import issues. 

180 from apis_core.apis_metainfo.models import Uri 

181 

182 e_a = type(self).__name__ 

183 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

184 if isinstance(entities, int): 

185 entities = self_model_class.objects.get(pk=entities) 

186 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

187 entities = [entities] 

188 entities = [ 

189 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

190 for ent in entities 

191 ] 

192 for ent in entities: 

193 e_b = type(ent).__name__ 

194 if e_a != e_b: 

195 continue 

196 for f in ent._meta.local_many_to_many: 

197 if not f.name.endswith("_set"): 

198 sl = list(getattr(self, f.name).all()) 

199 for s in getattr(ent, f.name).all(): 

200 if s not in sl: 

201 getattr(self, f.name).add(s) 

202 self_content_type = ContentType.objects.get_for_model(self) 

203 ent_content_type = ContentType.objects.get_for_model(ent) 

204 Uri.objects.filter(content_type=ent_content_type, object_id=ent.id).update( 

205 content_type=self_content_type, object_id=self.id 

206 ) 

207 

208 for ent in entities: 

209 self.merge_fields(ent) 

210 

211 post_merge_with.send(sender=origin, instance=self, entities=entities) 

212 

213 for ent in entities: 

214 ent.delete() 

215 

216 def duplicate(self): 

217 origin = self.__class__ 

218 pre_duplicate.send(sender=origin, instance=self) 

219 # usually, copying instances would work like 

220 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances 

221 # but we are working with abstract classes, 

222 # so we have to do it by hand using model_to_dict:( 

223 objdict = model_to_dict(self) 

224 

225 # remove unique fields from dict representation 

226 unique_fields = [field for field in self._meta.fields if field.unique] 

227 for field in unique_fields: 

228 logger.info(f"Duplicating {self}: ignoring unique field {field.name}") 

229 objdict.pop(field.name, None) 

230 

231 # remove related fields from dict representation 

232 related_fields = [ 

233 field for field in self._meta.get_fields() if field.is_relation 

234 ] 

235 for field in related_fields: 

236 objdict.pop(field.name, None) 

237 

238 newobj = type(self).objects.create(**objdict) 

239 

240 for field in related_fields: 

241 # we are not using `isinstance` because we want to 

242 # differentiate between different levels of inheritance 

243 if type(field) is ForeignKey: 

244 setattr(newobj, field.name, getattr(self, field.name)) 

245 if type(field) is ManyToManyField: 

246 objfield = getattr(newobj, field.name) 

247 values = getattr(self, field.name).all() 

248 objfield.set(values) 

249 

250 newobj.save() 

251 post_duplicate.send(sender=origin, instance=self, duplicate=newobj) 

252 return newobj 

253 

254 duplicate.alters_data = True 

255 

256 def uri_set(self): 

257 ct = ContentType.objects.get_for_model(self) 

258 return ( 

259 ContentType.objects.get(app_label="apis_metainfo", model="uri") 

260 .model_class() 

261 .objects.filter(content_type=ct, object_id=self.id) 

262 .all() 

263 )