Coverage for apis_core/apis_relations/models.py: 65%

206 statements  

« prev     ^ index     » next       coverage.py v7.6.8, created at 2024-12-20 09:24 +0000

1import copy 

2import unicodedata 

3 

4from crum import get_current_request 

5from django.conf import settings 

6from django.contrib.contenttypes.models import ContentType 

7from django.db import models 

8from django.db.models import Q 

9from django.db.models.fields.related_descriptors import ForwardManyToOneDescriptor 

10from django.db.models.signals import m2m_changed 

11from model_utils.managers import InheritanceManager 

12 

13from apis_core.apis_metainfo.models import RootObject 

14from apis_core.generic import signals 

15from apis_core.generic.abc import GenericModel 

16from apis_core.history.models import VersionMixin 

17from apis_core.utils import DateParser 

18 

19 

20def find_if_user_accepted(): 

21 request = get_current_request() 

22 if request is not None: 

23 if request.user.is_authenticated: 

24 return {} 

25 else: 

26 return {"published": True} 

27 else: 

28 return {} 

29 

30 

31class BaseRelationManager(models.Manager): 

32 def get_queryset(self): 

33 return RelationPublishedQueryset(self.model, using=self._db) 

34 

35 def filter_ann_proj(self, request=None, ann_proj=1, include_all=True): 

36 return self.get_queryset().filter_ann_proj( 

37 request=request, ann_proj=ann_proj, include_all=include_all 

38 ) 

39 

40 def filter_for_user(self): 

41 if hasattr(settings, "APIS_SHOW_ONLY_PUBLISHED"): 

42 return self.get_queryset().filter_for_user() 

43 else: 

44 return self.get_queryset() 

45 

46 

47class Property(RootObject): 

48 class Meta: 

49 verbose_name_plural = "Properties" 

50 

51 objects = BaseRelationManager() 

52 property_class_uri = models.CharField( 

53 max_length=255, verbose_name="Property Class URI", blank=True 

54 ) 

55 # TODO RDF: Redundancy between name_forward and name, solve this. 

56 name_forward = models.CharField( 

57 max_length=255, 

58 verbose_name="Name forward", 

59 help_text='Inverse relation like: "is sub-class of" vs. "is super-class of".', 

60 blank=True, 

61 ) 

62 # TODO RDF: Maybe rename name to name_subj_to_obj and name_reverse to name_obj_to_subj 

63 name_reverse = models.CharField( 

64 max_length=255, 

65 verbose_name="Name reverse", 

66 help_text='Inverse relation like: "is sub-class of" vs. "is super-class of".', 

67 blank=True, 

68 ) 

69 subj_class = models.ManyToManyField( 

70 ContentType, 

71 related_name="property_set_subj", 

72 ) 

73 obj_class = models.ManyToManyField( 

74 ContentType, 

75 related_name="property_set_obj", 

76 ) 

77 

78 def __str__(self): 

79 return self.name_forward 

80 

81 def save(self, *args, **kwargs): 

82 if self.name_reverse == "": 

83 self.name_reverse = f"{self.name_forward} [INVERSE]" 

84 

85 self.name_forward = unicodedata.normalize("NFC", str(self.name_forward)) 

86 self.name_reverse = unicodedata.normalize("NFC", str(self.name_reverse)) 

87 

88 if (update_fields := kwargs.get("update_fields")) is not None: 

89 if "name_forward" in update_fields and "name_reverse" not in update_fields: 

90 modified_update_fields = set(update_fields) 

91 modified_update_fields.add("name_reverse") 

92 kwargs["update_fields"] = modified_update_fields 

93 

94 super(Property, self).save(*args, **kwargs) 

95 return self 

96 

97 

98# TODO: comment and explain 

99def subj_or_obj_class_changed(sender, is_subj, **kwargs): 

100 def cascade_subj_obj_class_to_children( 

101 contenttype_to_add_or_remove, 

102 contenttype_already_saved_list, 

103 subj_or_obj_field_function, 

104 ): 

105 def get_all_parents(contenttype_current): 

106 parent_list = [] 

107 class_current = contenttype_current.model_class() 

108 for class_parent in class_current.__bases__: 

109 # TODO: Avoid ContentType DB fetch 

110 contenttype_parent = ContentType.objects.filter( 

111 model=class_parent.__name__ 

112 ) 

113 if len(contenttype_parent) == 1: 

114 contenttype_parent = contenttype_parent[0] 

115 parent_list.append(contenttype_parent) 

116 parent_list.extend(get_all_parents(contenttype_parent)) 

117 

118 return parent_list 

119 

120 def get_all_children(contenttype_current): 

121 child_list = [] 

122 class_current = contenttype_current.model_class() 

123 for class_child in class_current.__subclasses__(): 

124 # TODO: Avoid ContentType DB fetch 

125 contenttype_child = ContentType.objects.get_for_model(class_child) 

126 child_list.append(contenttype_child) 

127 child_list.extend(get_all_children(contenttype_child)) 

128 

129 return child_list 

130 

131 parent_contenttype_list = get_all_parents(contenttype_to_add_or_remove) 

132 for parent_contenttype in parent_contenttype_list: 

133 if parent_contenttype in contenttype_already_saved_list: 

134 raise Exception( 

135 f"Pre-existing parent class found when trying to save or remove a property subject or object class." 

136 f" The current class to be saved is '{contenttype_to_add_or_remove.model_class().__name__}'," 

137 f" but already saved is '{parent_contenttype.model_class().__name__}'." 

138 f" Such a save could potentially be in conflict with an ontology." 

139 f" Better save or remove the respective top parent subject or object class from this property." 

140 ) 

141 children_contenttype_list = get_all_children(contenttype_to_add_or_remove) 

142 for child_contenttype in children_contenttype_list: 

143 subj_or_obj_field_function(child_contenttype) 

144 

145 if kwargs["pk_set"] is not None and len(kwargs["pk_set"]) == 1: 

146 sending_property = kwargs["instance"] 

147 if sender == Property.subj_class.through: 

148 subj_or_obj_field = sending_property.subj_class 

149 elif sender == Property.obj_class.through: 

150 subj_or_obj_field = sending_property.obj_class 

151 else: 

152 raise Exception 

153 subj_or_obj_field_function = None 

154 if kwargs["action"] == "pre_add": 

155 subj_or_obj_field_function = subj_or_obj_field.add 

156 elif kwargs["action"] == "post_remove": 

157 subj_or_obj_field_function = subj_or_obj_field.remove 

158 if subj_or_obj_field_function is not None: 

159 cascade_subj_obj_class_to_children( 

160 contenttype_to_add_or_remove=ContentType.objects.get( 

161 pk=min(kwargs["pk_set"]) 

162 ), 

163 contenttype_already_saved_list=subj_or_obj_field.all(), 

164 subj_or_obj_field_function=subj_or_obj_field_function, 

165 ) 

166 

167 

168def subj_class_changed(sender, **kwargs): 

169 subj_or_obj_class_changed(sender, is_subj=True, **kwargs) 

170 

171 

172def obj_class_changed(sender, **kwargs): 

173 subj_or_obj_class_changed(sender, is_subj=False, **kwargs) 

174 

175 

176m2m_changed.connect(subj_class_changed, sender=Property.subj_class.through) 

177m2m_changed.connect(obj_class_changed, sender=Property.obj_class.through) 

178 

179 

180class RelationPublishedQueryset(models.QuerySet): 

181 def filter_for_user(self, *args, **kwargs): 

182 if getattr(settings, "APIS_SHOW_ONLY_PUBLISHED", False): 

183 request = get_current_request() 

184 if request is not None: 

185 if request.user.is_authenticated: 

186 return self 

187 else: 

188 return self.filter(published=True) 

189 else: 

190 return self.filter(published=True) 

191 else: 

192 return self 

193 

194 def filter_ann_proj(self, request=None, ann_proj=1, include_all=True): 

195 """The filter function provided by the manager class. 

196 

197 :param request: `django.request` object 

198 :return: queryset that contains only objects that are shown in the highlighted text or those not connected 

199 to an annotation at all. 

200 """ 

201 qs = self 

202 users_show = None 

203 if request: 

204 ann_proj = request.session.get("annotation_project", False) 

205 if not ann_proj: 

206 return qs 

207 users_show = request.session.get("users_show_highlighter", None) 

208 query = Q(annotation__annotation_project_id=ann_proj) 

209 if users_show is not None: 

210 query.add(Q(annotation__user_added_id__in=users_show), Q.AND) 

211 if include_all: 

212 query.add(Q(annotation__annotation_project__isnull=True), Q.OR) 

213 return qs.filter(query) 

214 

215 

216class InheritanceForwardManyToOneDescriptor(ForwardManyToOneDescriptor): 

217 def get_queryset(self, **hints): 

218 return self.field.remote_field.model.objects_inheritance.db_manager( 

219 hints=hints 

220 ).select_subclasses() 

221 

222 

223class InheritanceForeignKey(models.ForeignKey): 

224 forward_related_accessor_class = InheritanceForwardManyToOneDescriptor 

225 

226 

227class Triple(models.Model, GenericModel): 

228 subj = InheritanceForeignKey( 

229 RootObject, 

230 blank=True, 

231 null=True, 

232 on_delete=models.CASCADE, 

233 related_name="triple_set_from_subj", 

234 verbose_name="Subject", 

235 ) 

236 obj = InheritanceForeignKey( 

237 RootObject, 

238 blank=True, 

239 null=True, 

240 on_delete=models.CASCADE, 

241 related_name="triple_set_from_obj", 

242 verbose_name="Object", 

243 ) 

244 prop = models.ForeignKey( 

245 Property, 

246 blank=True, 

247 null=True, 

248 on_delete=models.CASCADE, 

249 related_name="triple_set_from_prop", 

250 verbose_name="Property", 

251 ) 

252 

253 objects = BaseRelationManager() 

254 objects_inheritance = InheritanceManager() 

255 

256 def __repr__(self): 

257 try: 

258 return f"<{self.__class__.__name__}: subj: {self.subj}, prop: {self.prop}, obj: {self.obj}>" 

259 except RootObject.DoesNotExist: 

260 return f"<{self.__class__.__name__}: None>" 

261 

262 def __str__(self): 

263 return self.__repr__() 

264 

265 def get_web_object(self): 

266 return { 

267 "relation_pk": self.pk, 

268 "subj": str(self.subj), 

269 "obj": str(self.obj), 

270 "prop": self.prop.name_forward, 

271 } 

272 

273 def save(self, *args, **kwargs): 

274 # TODO RDF: Integrate more proper check if subj and obj instances are of valid class as defined in prop.subj_class and prop.obj_class 

275 # def get_all_parents(cls_current): 

276 # parent_list = [] 

277 # for p in cls_current.__bases__: 

278 # parent_list.append(p) 

279 # parent_list.extend(get_all_parents(p)) 

280 # return parent_list 

281 

282 def get_all_childs(cls_current): 

283 child_list = [] 

284 for p in cls_current.__subclasses__(): 

285 child_list.append(p) 

286 child_list.extend(get_all_childs(p)) 

287 

288 return child_list 

289 

290 if self.subj is None or self.obj is None or self.prop is None: 

291 raise Exception("subj, obj, or prop is None") 

292 if self.subj is not None: 

293 subj_class_name = self.subj.__class__.__name__ 

294 if ( 

295 ContentType.objects.get_for_model(self.subj.__class__) 

296 not in self.prop.subj_class.all() 

297 ): 

298 raise Exception( 

299 f"Subject class '{subj_class_name}' is not in valid subject class list of property '{self.prop}'" 

300 ) 

301 if self.obj is not None: 

302 obj_class_name = self.obj.__class__.__name__ 

303 if ( 

304 ContentType.objects.get_for_model(model=self.obj.__class__) 

305 not in self.prop.obj_class.all() 

306 ): 

307 raise Exception( 

308 f"Object class '{obj_class_name}' is not in valid object class list of property '{self.prop}'" 

309 ) 

310 

311 super().save(*args, **kwargs) 

312 

313 def duplicate(self): 

314 origin = self.__class__ 

315 signals.pre_duplicate.send(sender=origin, instance=self) 

316 

317 instance = copy.copy(self) 

318 

319 self.pk = None 

320 self.id = None 

321 self._state.adding = True 

322 duplicate = self.save() 

323 

324 signals.post_duplicate.send( 

325 sender=origin, instance=instance, duplicate=duplicate 

326 ) 

327 return duplicate 

328 

329 

330class TempTriple(Triple, VersionMixin): 

331 review = models.BooleanField( 

332 default=False, 

333 help_text="Should be set to True, if the data record holds up quality standards.", 

334 ) 

335 start_date = models.DateField(blank=True, null=True) 

336 start_start_date = models.DateField(blank=True, null=True) 

337 start_end_date = models.DateField(blank=True, null=True) 

338 end_date = models.DateField(blank=True, null=True) 

339 end_start_date = models.DateField(blank=True, null=True) 

340 end_end_date = models.DateField(blank=True, null=True) 

341 start_date_written = models.CharField( 

342 max_length=255, 

343 blank=True, 

344 null=True, 

345 verbose_name="Start", 

346 ) 

347 end_date_written = models.CharField( 

348 max_length=255, 

349 blank=True, 

350 null=True, 

351 verbose_name="End", 

352 ) 

353 # text = models.ManyToManyField("Text", blank=True) 

354 # collection = models.ManyToManyField("Collection") 

355 status = models.CharField(max_length=100) 

356 # source = models.ForeignKey( 

357 # "Source", blank=True, null=True, on_delete=models.SET_NULL 

358 # ) 

359 references = models.TextField(blank=True, null=True) 

360 notes = models.TextField(blank=True, null=True) 

361 

362 def save(self, parse_dates=True, *args, **kwargs): 

363 """Adaption of the save() method of the class to automatically parse string-dates into date objects""" 

364 

365 if parse_dates: 

366 # overwrite every field with None as default 

367 start_date = None 

368 start_start_date = None 

369 start_end_date = None 

370 end_date = None 

371 end_start_date = None 

372 end_end_date = None 

373 

374 if self.start_date_written: 

375 # If some textual user input of a start date is there, then parse it 

376 

377 start_date, start_start_date, start_end_date = DateParser.parse_date( 

378 self.start_date_written 

379 ) 

380 

381 if self.end_date_written: 

382 # If some textual user input of an end date is there, then parse it 

383 

384 end_date, end_start_date, end_end_date = DateParser.parse_date( 

385 self.end_date_written 

386 ) 

387 

388 self.start_date = start_date 

389 self.start_start_date = start_start_date 

390 self.start_end_date = start_end_date 

391 self.end_date = end_date 

392 self.end_start_date = end_start_date 

393 self.end_end_date = end_end_date 

394 

395 super().save(*args, **kwargs) 

396 

397 return self