Coverage for apis_core/apis_entities/models.py: 65%

130 statements  

« prev     ^ index     » next       coverage.py v7.6.1, created at 2024-09-16 07:42 +0000

1import functools 

2import re 

3 

4from django.conf import settings 

5from django.contrib.contenttypes.models import ContentType 

6from django.db.models.query import QuerySet 

7from django.db.models.signals import post_save 

8from django.dispatch import receiver 

9from django.urls import NoReverseMatch, reverse 

10 

11from apis_core.apis_entities import signals 

12from apis_core.apis_metainfo.models import RootObject, Uri 

13from apis_core.apis_relations.models import TempTriple 

14 

15BASE_URI = getattr(settings, "APIS_BASE_URI", "http://apis.info/") 

16NEXT_PREV = getattr(settings, "APIS_NEXT_PREV", True) 

17 

18 

19class AbstractEntity(RootObject): 

20 """ 

21 Abstract super class which encapsulates common logic between the 

22 different entity kinds and provides various methods relating to either 

23 all or one specific entity kind. 

24 

25 Most of the class methods are designed to be used in the subclass as they 

26 are considering contexts which depend on the subclass entity type. 

27 So they are to be understood in that dynamic context. 

28 """ 

29 

30 class Meta: 

31 abstract = True 

32 

33 def __init__(self, *args, **kwargs): 

34 super().__init__(*args, **kwargs) 

35 

36 @classmethod 

37 def get_or_create_uri(cls, uri): 

38 uri = str(uri) 

39 try: 

40 if re.match(r"^[0-9]*$", uri): 

41 p = cls.objects.get(pk=uri) 

42 else: 

43 p = cls.objects.get(uri__uri=uri) 

44 return p 

45 except Exception as e: 

46 print("Found no object corresponding to given uri." + e) 

47 return False 

48 

49 # TODO 

50 @classmethod 

51 def get_entity_list_filter(cls): 

52 return None 

53 

54 def get_edit_url(self): 

55 """ 

56 We override the edit url, because entities have a 

57 custom view that includes the relations 

58 """ 

59 ct = ContentType.objects.get_for_model(self) 

60 return reverse( 

61 "apis_core:apis_entities:generic_entities_edit_view", 

62 args=[ct.model, self.id], 

63 ) 

64 

65 @functools.cached_property 

66 def get_prev_id(self): 

67 if NEXT_PREV: 

68 prev_instance = ( 

69 type(self) 

70 .objects.filter(id__lt=self.id) 

71 .order_by("-id") 

72 .only("id") 

73 .first() 

74 ) 

75 if prev_instance is not None: 

76 return prev_instance.id 

77 return False 

78 

79 @functools.cached_property 

80 def get_next_id(self): 

81 if NEXT_PREV: 

82 next_instance = ( 

83 type(self) 

84 .objects.filter(id__gt=self.id) 

85 .order_by("id") 

86 .only("id") 

87 .first() 

88 ) 

89 if next_instance is not None: 

90 return next_instance.id 

91 return False 

92 

93 def get_duplicate_url(self): 

94 entity = self.__class__.__name__.lower() 

95 return reverse( 

96 "apis_core:apis_entities:generic_entities_duplicate_view", 

97 kwargs={"contenttype": entity, "pk": self.id}, 

98 ) 

99 

100 def get_merge_url(self): 

101 entity = self.__class__.__name__.lower() 

102 return reverse( 

103 "apis_core:apis_entities:generic_entities_merge_view", 

104 kwargs={"contenttype": entity, "pk": self.id}, 

105 ) 

106 

107 def merge_charfield(self, other, field): 

108 res = getattr(self, field.name) 

109 if not field.choices: 

110 otherres = getattr(other, field.name, res) 

111 if otherres != res: 

112 res += f" ({otherres})" 

113 setattr(self, field.name, res) 

114 

115 def merge_textfield(self, other, field): 

116 res = getattr(self, field.name) 

117 if getattr(other, field.name): 

118 res += "\n" + f"Merged from {other}:\n" + getattr(other, field.name) 

119 setattr(self, field.name, res) 

120 

121 def merge_booleanfield(self, other, field): 

122 setattr( 

123 self, field.name, getattr(self, field.name) and getattr(other, field.name) 

124 ) 

125 

126 def merge_start_date_written(self, other): 

127 self.start_date_written = self.start_date_written or other.start_date_written 

128 

129 def merge_end_date_written(self, other): 

130 self.end_date_written = self.end_date_written or other.end_date_written 

131 

132 def merge_fields(self, other): 

133 """ 

134 This method iterates through the model fields and copies 

135 data from other to self. It first tries to find a merge method 

136 that is specific to that field (merge_{fieldname}) and then tries 

137 to find a method that is specific to the type of the field (merge_{fieldtype}) 

138 It is called by the `merge_with` method. 

139 """ 

140 for field in self._meta.fields: 

141 fieldtype = field.get_internal_type().lower() 

142 # if there is a `merge_{fieldname}` method in this model, use that one 

143 if callable(getattr(self, f"merge_{field.name}", None)): 

144 getattr(self, f"merge_{field.name}")(other) 

145 # otherwise we check if there is a method for the field type and use that one 

146 elif callable(getattr(self, f"merge_{fieldtype}", None)): 

147 getattr(self, f"merge_{fieldtype}")(other, field) 

148 else: 

149 if not getattr(self, field.name): 

150 setattr(self, field.name, getattr(other, field.name)) 

151 self.save() 

152 

153 def merge_with(self, entities): 

154 if self in entities: 

155 entities.remove(self) 

156 origin = self.__class__ 

157 signals.pre_merge_with.send(sender=origin, instance=self, entities=entities) 

158 

159 # TODO: check if these imports can be put to top of module without 

160 # causing circular import issues. 

161 from apis_core.apis_metainfo.models import Uri 

162 

163 e_a = type(self).__name__ 

164 self_model_class = ContentType.objects.get(model__iexact=e_a).model_class() 

165 if isinstance(entities, int): 

166 entities = self_model_class.objects.get(pk=entities) 

167 if not isinstance(entities, list) and not isinstance(entities, QuerySet): 

168 entities = [entities] 

169 entities = [ 

170 self_model_class.objects.get(pk=ent) if isinstance(ent, int) else ent 

171 for ent in entities 

172 ] 

173 for ent in entities: 

174 e_b = type(ent).__name__ 

175 if e_a != e_b: 

176 continue 

177 for f in ent._meta.local_many_to_many: 

178 if not f.name.endswith("_set"): 

179 sl = list(getattr(self, f.name).all()) 

180 for s in getattr(ent, f.name).all(): 

181 if s not in sl: 

182 getattr(self, f.name).add(s) 

183 Uri.objects.filter(root_object=ent).update(root_object=self) 

184 TempTriple.objects.filter(obj__id=ent.id).update(obj=self) 

185 TempTriple.objects.filter(subj__id=ent.id).update(subj=self) 

186 

187 for ent in entities: 

188 self.merge_fields(ent) 

189 

190 signals.post_merge_with.send(sender=origin, instance=self, entities=entities) 

191 

192 for ent in entities: 

193 ent.delete() 

194 

195 def get_serialization(self): 

196 from apis_core.apis_entities.serializers_generic import EntitySerializer 

197 

198 return EntitySerializer(self).data 

199 

200 

201@receiver(post_save, dispatch_uid="create_default_uri") 

202def create_default_uri(sender, instance, created, raw, using, update_fields, **kwargs): 

203 create_default_uri = getattr(settings, "CREATE_DEFAULT_URI", True) 

204 skip_default_uri = getattr(instance, "skip_default_uri", False) 

205 if create_default_uri and not skip_default_uri: 

206 if isinstance(instance, AbstractEntity) and created: 

207 base = BASE_URI.strip("/") 

208 try: 

209 route = reverse("GetEntityGenericRoot", kwargs={"pk": instance.pk}) 

210 except NoReverseMatch: 

211 route = reverse( 

212 "apis_core:GetEntityGeneric", kwargs={"pk": instance.pk} 

213 ) 

214 uri = f"{base}{route}" 

215 Uri.objects.create(uri=uri, domain="apis default", root_object=instance)