Coverage for apis_core/generic/abc.py: 56%

142 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-20 09:24 +0000

1import logging 

2 

3from django.contrib.contenttypes.models import ContentType 

4from django.db.models import BooleanField, CharField, TextField 

5from django.db.models.fields.related import ForeignKey, ManyToManyField 

6from django.db.models.query import QuerySet 

7from django.forms import model_to_dict 

8from django.urls import reverse 

9 

10from apis_core.generic.helpers import permission_fullname 

11from apis_core.generic.signals import ( 

12 post_duplicate, 

13 post_merge_with, 

14 pre_duplicate, 

15 pre_merge_with, 

16) 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class GenericModel: 

22 def __repr__(self): 

23 if id := getattr(self, "id", None): 

24 return super().__repr__() + f" (ID: {id})" 

25 return super().__repr__() 

26 

27 @classmethod 

28 def get_listview_url(cls): 

29 ct = ContentType.objects.get_for_model(cls) 

30 return reverse("apis_core:generic:list", args=[ct]) 

31 

32 @classmethod 

33 def get_createview_url(cls): 

34 ct = ContentType.objects.get_for_model(cls) 

35 return reverse("apis_core:generic:create", args=[ct]) 

36 

37 @classmethod 

38 def get_importview_url(cls): 

39 ct = ContentType.objects.get_for_model(cls) 

40 return reverse("apis_core:generic:import", args=[ct]) 

41 

42 def get_edit_url(self): 

43 ct = ContentType.objects.get_for_model(self) 

44 return reverse("apis_core:generic:update", args=[ct, self.id]) 

45 

46 def get_enrich_url(self): 

47 ct = ContentType.objects.get_for_model(self) 

48 return reverse("apis_core:generic:enrich", args=[ct, self.id]) 

49 

50 def get_absolute_url(self): 

51 ct = ContentType.objects.get_for_model(self) 

52 return reverse("apis_core:generic:detail", args=[ct, self.id]) 

53 

54 def get_delete_url(self): 

55 ct = ContentType.objects.get_for_model(self) 

56 return reverse("apis_core:generic:delete", args=[ct, self.id]) 

57 

58 def get_merge_url(self, other_id): 

59 ct = ContentType.objects.get_for_model(self) 

60 return reverse("apis_core:generic:merge", args=[ct, self.id, other_id]) 

61 

62 def get_select_merge_or_enrich_url(self): 

63 ct = ContentType.objects.get_for_model(self) 

64 return reverse("apis_core:generic:selectmergeorenrich", args=[ct, self.id]) 

65 

66 def get_create_success_url(self): 

67 return self.get_absolute_url() 

68 

69 def get_update_success_url(self): 

70 return self.get_edit_url() 

71 

72 def get_api_detail_endpoint(self): 

73 ct = ContentType.objects.get_for_model(self) 

74 return reverse("apis_core:generic:genericmodelapi-detail", args=[ct, self.id]) 

75 

76 @classmethod 

77 def get_change_permission(self): 

78 return permission_fullname("change", self) 

79 

80 @classmethod 

81 def get_add_permission(self): 

82 return permission_fullname("add", self) 

83 

84 @classmethod 

85 def get_delete_permission(self): 

86 return permission_fullname("delete", self) 

87 

88 def get_merge_charfield_value(self, other: CharField, field: CharField): 

89 res = getattr(self, field.name) 

90 if not field.choices: 

91 otherres = getattr(other, field.name, res) 

92 if otherres != res: 

93 res += f" ({otherres})" 

94 return res 

95 

96 def get_merge_textfield_value(self, other: TextField, field: TextField): 

97 res = getattr(self, field.name) 

98 if getattr(other, field.name): 

99 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name) 

100 return res 

101 

102 def get_merge_booleanfield(self, other: BooleanField, field: BooleanField): 

103 return getattr(other, field.name) 

104 

105 def get_field_value_after_merge(self, other, field): 

106 """ 

107 This method finds the value of a field after merging `other` into `self`. 

108 It first tries to find a merge method that is specific to that field 

109 (merge_{fieldname}) and then tries to find a method that is specific to 

110 the type of the field (merge_{fieldtype}) 

111 If neither of those exist, it uses the others field value if the field 

112 in self is not set, otherwise it keeps the value in self. 

113 """ 

114 fieldtype = field.get_internal_type().lower() 

115 # if there is a `get_merge_{fieldname}` method in this model, use that one 

116 if callable(getattr(self, f"get_merge_{field.name}_value", None)): 

117 return getattr(self, f"get_merge_{field.name}_value")(other) 

118 # otherwise we check if there is a method for the field type and use that one 

119 elif callable(getattr(self, f"get_merge_{fieldtype}_value", None)): 

120 return getattr(self, f"get_merge_{fieldtype}_value")(other, field) 

121 else: 

122 if not getattr(self, field.name): 

123 return getattr(other, field.name) 

124 return getattr(self, field.name) 

125 

126 def merge_fields(self, other): 

127 """ 

128 This method iterates through the model fields and uses the 

129 `get_field_value_after_merge` method to copy values from `other` to `self`. 

130 It is called by the `merge_with` method. 

131 """ 

132 for field in self._meta.fields: 

133 newval = self.get_field_value_after_merge(other, field) 

134 if newval != getattr(self, field.name): 

135 setattr(self, field.name, newval) 

136 self.save() 

137 

138 def merge_with(self, entities): 

139 if self in entities: 

140 entities.remove(self) 

141 origin = self.__class__ 

142 pre_merge_with.send(sender=origin, instance=self, entities=entities) 

143 

144 # TODO: check if these imports can be put to top of module without 

145 # causing circular import issues. 

146 from apis_core.apis_metainfo.models import Uri 

147 

148 e_a = type(self).__name__ 

149 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

150 if isinstance(entities, int): 

151 entities = self_model_class.objects.get(pk=entities) 

152 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

153 entities = [entities] 

154 entities = [ 

155 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

156 for ent in entities 

157 ] 

158 for ent in entities: 

159 e_b = type(ent).__name__ 

160 if e_a != e_b: 

161 continue 

162 for f in ent._meta.local_many_to_many: 

163 if not f.name.endswith("_set"): 

164 sl = list(getattr(self, f.name).all()) 

165 for s in getattr(ent, f.name).all(): 

166 if s not in sl: 

167 getattr(self, f.name).add(s) 

168 Uri.objects.filter(root_object=ent).update(root_object=self) 

169 

170 for ent in entities: 

171 self.merge_fields(ent) 

172 

173 post_merge_with.send(sender=origin, instance=self, entities=entities) 

174 

175 for ent in entities: 

176 ent.delete() 

177 

178 def duplicate(self): 

179 origin = self.__class__ 

180 pre_duplicate.send(sender=origin, instance=self) 

181 # usually, copying instances would work like 

182 # https://docs.djangoproject.com/en/4.2/topics/db/queries/#copying-model-instances 

183 # but we are working with abstract classes, 

184 # so we have to do it by hand using model_to_dict:( 

185 objdict = model_to_dict(self) 

186 

187 # remove unique fields from dict representation 

188 unique_fields = [field for field in self._meta.fields if field.unique] 

189 for field in unique_fields: 

190 logger.info(f"Duplicating {self}: ignoring unique field {field.name}") 

191 objdict.pop(field.name, None) 

192 

193 # remove related fields from dict representation 

194 related_fields = [ 

195 field for field in self._meta.get_fields() if field.is_relation 

196 ] 

197 for field in related_fields: 

198 objdict.pop(field.name, None) 

199 

200 newobj = type(self).objects.create(**objdict) 

201 

202 for field in related_fields: 

203 # we are not using `isinstance` because we want to 

204 # differentiate between different levels of inheritance 

205 if type(field) is ForeignKey: 

206 setattr(newobj, field.name, getattr(self, field.name)) 

207 if type(field) is ManyToManyField: 

208 objfield = getattr(newobj, field.name) 

209 values = getattr(self, field.name).all() 

210 objfield.set(values) 

211 

212 newobj.save() 

213 post_duplicate.send(sender=origin, instance=self, duplicate=newobj) 

214 return newobj 

215 

216 duplicate.alters_data = True