Coverage for apis_core/generic/abc.py: 56%

153 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-02-19 16:54 +0000

1import logging 

2 

3from django.contrib.contenttypes.models import ContentType 

4from django.db.models import BooleanField, CharField, TextField 

5from django.db.models.fields.related import ForeignKey, ManyToManyField 

6from django.db.models.query import QuerySet 

7from django.forms import model_to_dict 

8from django.urls import reverse 

9 

10from apis_core.generic.helpers import mro_paths, permission_fullname 

11from apis_core.generic.signals import ( 

12 post_duplicate, 

13 post_merge_with, 

14 pre_duplicate, 

15 pre_merge_with, 

16) 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class GenericModel: 

22 def __repr__(self): 

23 if id := getattr(self, "id", None): 

24 return super().__repr__() + f" (ID: {id})" 

25 return super().__repr__() 

26 

27 @classmethod 

28 def get_listview_url(cls): 

29 ct = ContentType.objects.get_for_model(cls) 

30 return reverse("apis_core:generic:list", args=[ct]) 

31 

32 @classmethod 

33 def get_createview_url(cls): 

34 ct = ContentType.objects.get_for_model(cls) 

35 return reverse("apis_core:generic:create", args=[ct]) 

36 

37 @classmethod 

38 def get_importview_url(cls): 

39 ct = ContentType.objects.get_for_model(cls) 

40 return reverse("apis_core:generic:import", args=[ct]) 

41 

42 @classmethod 

43 def get_openapi_tags(cls): 

44 return [item[-1] for item in mro_paths(cls)] 

45 

46 def get_edit_url(self): 

47 ct = ContentType.objects.get_for_model(self) 

48 return reverse("apis_core:generic:update", args=[ct, self.id]) 

49 

50 def get_enrich_url(self): 

51 ct = ContentType.objects.get_for_model(self) 

52 return reverse("apis_core:generic:enrich", args=[ct, self.id]) 

53 

54 def get_absolute_url(self): 

55 ct = ContentType.objects.get_for_model(self) 

56 return reverse("apis_core:generic:detail", args=[ct, self.id]) 

57 

58 def get_delete_url(self): 

59 ct = ContentType.objects.get_for_model(self) 

60 return reverse("apis_core:generic:delete", args=[ct, self.id]) 

61 

62 def get_merge_url(self, other_id): 

63 ct = ContentType.objects.get_for_model(self) 

64 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id]) 

65 

66 def get_select_merge_or_enrich_url(self): 

67 ct = ContentType.objects.get_for_model(self) 

68 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id]) 

69 

70 def get_create_success_url(self): 

71 return self.get_absolute_url() 

72 

73 def get_update_success_url(self): 

74 return self.get_edit_url() 

75 

76 def get_api_detail_endpoint(self): 

77 ct = ContentType.objects.get_for_model(self) 

78 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id]) 

79 

80 @classmethod 

81 def get_change_permission(self): 

82 return permission_fullname("change", self) 

83 

84 @classmethod 

85 def get_add_permission(self): 

86 return permission_fullname("add", self) 

87 

88 @classmethod 

89 def get_delete_permission(self): 

90 return permission_fullname("delete", self) 

91 

92 @classmethod 

93 def get_view_permission(self): 

94 return permission_fullname("view", self) 

95 

96 def get_merge_charfield_value(self, other: CharField, field: CharField): 

97 res = getattr(self, field.name) 

98 if not field.choices: 

99 otherres = getattr(other, field.name, res) 

100 if otherres != res: 

101 res += f" ({otherres})" 

102 return res 

103 

104 def get_merge_textfield_value(self, other: TextField, field: TextField): 

105 res = getattr(self, field.name) 

106 if getattr(other, field.name): 

107 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name) 

108 return res 

109 

110 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField): 

111 return getattr(other, field.name) 

112 

113 def get_field_value_after_merge(self, other, field): 

114 """ 

115 This method finds the value of a field after merging `other` into `self`. 

116 It first tries to find a merge method that is specific to that field 

117 (merge_{fieldname}) and then tries to find a method that is specific to 

118 the type of the field (merge_{fieldtype}) 

119 If neither of those exist, it uses the others field value if the field 

120 in self is not set, otherwise it keeps the value in self. 

121 """ 

122 fieldtype = field.get_internal_type().lower() 

123 # if there is a `get_merge_{fieldname}` method in this model, use that one 

124 if callable(getattr(self, f"get_merge_{field.name}_value", None)): 

125 return getattr(self, f"get_merge_{field.name}_value")(other) 

126 # otherwise we check if there is a method for the field type and use that one 

127 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)): 

128 return getattr(self, f"get_merge_{fieldtype}_value")(other, field) 

129 else: 

130 if not getattr(self, field.name): 

131 return getattr(other, field.name) 

132 return getattr(self, field.name) 

133 

134 def merge_fields(self, other): 

135 """ 

136 This method iterates through the model fields and uses the 

137 `get_field_value_after_merge` method to copy values from `other` to `self`. 

138 It is called by the `merge_with` method. 

139 """ 

140 for field in self._meta.fields: 

141 newval = self.get_field_value_after_merge(other, field) 

142 if newval != getattr(self, field.name): 

143 setattr(self, field.name, newval) 

144 self.save() 

145 

146 def merge_with(self, entities): 

147 if self in entities: 

148 entities.remove(self) 

149 origin = self.__class__ 

150 pre_merge_with.send(sender=origin, instance=self, entities=entities) 

151 

152 # TODO: check if these imports can be put to top of module without 

153 # causing circular import issues. 

154 from apis_core.apis_metainfo.models import Uri 

155 

156 e_a = type(self).__name__ 

157 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

158 if isinstance(entities, int): 

159 entities = self_model_class.objects.get(pk=entities) 

160 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

161 entities = [entities] 

162 entities = [ 

163 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

164 for ent in entities 

165 ] 

166 for ent in entities: 

167 e_b = type(ent).__name__ 

168 if e_a != e_b: 

169 continue 

170 for f in ent._meta.local_many_to_many: 

171 if not f.name.endswith("_set"): 

172 sl = list(getattr(self, f.name).all()) 

173 for s in getattr(ent, f.name).all(): 

174 if s not in sl: 

175 getattr(self, f.name).add(s) 

176 self_content_type = ContentType.objects.get_for_model(self) 

177 ent_content_type = ContentType.objects.get_for_model(ent) 

178 Uri.objects.filter(content_type=ent_content_type, object_id=ent.id).update( 

179 content_type=self_content_type, object_id=self.id 

180 ) 

181 

182 for ent in entities: 

183 self.merge_fields(ent) 

184 

185 post_merge_with.send(sender=origin, instance=self, entities=entities) 

186 

187 for ent in entities: 

188 ent.delete() 

189 

190 def duplicate(self): 

191 origin = self.__class__ 

192 pre_duplicate.send(sender=origin, instance=self) 

193 # usually, copying instances would work like 

194 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances 

195 # but we are working with abstract classes, 

196 # so we have to do it by hand using model_to_dict:( 

197 objdict = model_to_dict(self) 

198 

199 # remove unique fields from dict representation 

200 unique_fields = [field for field in self._meta.fields if field.unique] 

201 for field in unique_fields: 

202 logger.info(f"Duplicating {self}: ignoring unique field {field.name}") 

203 objdict.pop(field.name, None) 

204 

205 # remove related fields from dict representation 

206 related_fields = [ 

207 field for field in self._meta.get_fields() if field.is_relation 

208 ] 

209 for field in related_fields: 

210 objdict.pop(field.name, None) 

211 

212 newobj = type(self).objects.create(**objdict) 

213 

214 for field in related_fields: 

215 # we are not using `isinstance` because we want to 

216 # differentiate between different levels of inheritance 

217 if type(field) is ForeignKey: 

218 setattr(newobj, field.name, getattr(self, field.name)) 

219 if type(field) is ManyToManyField: 

220 objfield = getattr(newobj, field.name) 

221 values = getattr(self, field.name).all() 

222 objfield.set(values) 

223 

224 newobj.save() 

225 post_duplicate.send(sender=origin, instance=self, duplicate=newobj) 

226 return newobj 

227 

228 duplicate.alters_data = True 

229 

230 def uri_set(self): 

231 ct = ContentType.objects.get_for_model(self) 

232 return ( 

233 ContentType.objects.get(app_label="apis_metainfo", model="uri") 

234 .model_class() 

235 .objects.filter(content_type=ct, object_id=self.id) 

236 .all() 

237 )