Coverage for shps/models.py: 73%

164 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-27 11:19 +0000

1import os 

2import hashlib 

3from datetime import datetime 

4 

5from rdflib import Namespace 

6from django.conf import settings 

7from django.contrib.gis.db import models 

8from django.contrib.postgres.fields import DateRangeField 

9from django.core.files.storage import FileSystemStorage 

10from django.core.serializers import serialize 

11from django.urls import reverse 

12from django.utils.text import slugify 

13from next_prev import next_in_order, prev_in_order 

14 

15from idprovider.models import IdProvider 

16from vocabs.models import SkosConcept 

17 

18 

19ARCHE = Namespace("https://vocabs.acdh.oeaw.ac.at/schema#") 

20ACDH = Namespace("https://id.acdh.oeaw.ac.at/") 

21 

22curent_date = datetime.now().strftime("%Y-%m-%d") 

23 

24 

25class OverwriteStorage(FileSystemStorage): 

26 def get_available_name(self, name, max_length=None): 

27 """Returns a filename that's free on the target storage system, and 

28 available for new content to be written to. 

29 

30 Found at http://djangosnippets.org/snippets/976/ 

31 

32 This file storage solves overwrite on upload problem. Another 

33 proposed solution was to override the save method on the model 

34 like so (from https://code.djangoproject.com/ticket/11663): 

35 

36 def save(self, *args, **kwargs): 

37 try: 

38 this = MyModelName.objects.get(id=self.id) 

39 if this.MyImageFieldName != self.MyImageFieldName: 

40 this.MyImageFieldName.delete() 

41 except: pass 

42 super(MyModelName, self).save(*args, **kwargs) 

43 """ 

44 # If the filename already exists, remove it as if it was a true file system 

45 if self.exists(name): 

46 os.remove(os.path.join(settings.MEDIA_ROOT, name)) 

47 return name 

48 

49 

50DATE_ACCURACY = (("Y", "Year"), ("YM", "Month"), ("DMY", "Day")) 

51 

52QUALITY = ( 

53 ("red", "red"), 

54 ("yellow", "yellow"), 

55 ("green", "green"), 

56) 

57 

58 

59class Source(models.Model): 

60 name = models.CharField( 

61 max_length=255, verbose_name="Name", help_text="Name of the source" 

62 ) 

63 description = models.TextField( 

64 blank=True, 

65 null=True, 

66 verbose_name="Description", 

67 help_text="Some verbose description of the source", 

68 ) 

69 quote = models.TextField( 

70 blank=True, null=True, verbose_name="Quote", help_text="How to quote." 

71 ) 

72 original_url = models.TextField( 

73 blank=True, 

74 null=True, 

75 verbose_name="URLs", 

76 help_text="URLs from where the data was downloaded, use '; ' as separator", 

77 ) 

78 upload = models.FileField( 

79 max_length=250, 

80 blank=True, 

81 verbose_name="A zipped ESRI Shape File", 

82 help_text="A shape file following the HistoGIS data convention", 

83 upload_to="data/", 

84 storage=OverwriteStorage(), 

85 ) 

86 

87 class Meta: 

88 ordering = ["id"] 

89 

90 def get_absolute_url(self): 

91 return reverse("shapes:source_detail", kwargs={"pk": self.id}) 

92 

93 def delete(self, using=None, keep_parents=False): 

94 """Delete the file from disk because Django doesn't do it. Kudos to AlexanderWatzinger""" 

95 self.upload.delete() 

96 super(Source, self).delete(using, keep_parents) 

97 

98 def __str__(self): 

99 if self.name: 

100 return "{}".format(self.name) 

101 else: 

102 return "Source ID: {}".format(self.name) 

103 

104 @classmethod 

105 def get_listview_url(self): 

106 return reverse("shapes:browse_sources") 

107 

108 @classmethod 

109 def get_createview_url(self): 

110 return reverse("shapes:source_create") 

111 

112 def get_next(self): 

113 next = next_in_order(self) 

114 if next: 

115 return next.id 

116 return False 

117 

118 def get_prev(self): 

119 prev = prev_in_order(self) 

120 if prev: 

121 return prev.id 

122 return False 

123 

124 def get_file_size(self): 

125 try: 

126 return "{}".format(self.upload.size) 

127 except: # noqa: E722 

128 return None 

129 

130 @property 

131 def end_date(self): 

132 try: 

133 return f"{self.source_of.all().order_by('end_date').last().end_date}" 

134 except AttributeError: 

135 return None 

136 

137 @property 

138 def start_date(self): 

139 try: 

140 return f"{self.source_of.all().order_by('start_date').last().start_date}" 

141 except AttributeError: 

142 return None 

143 

144 def slug_name(self): 

145 return "{}__{}_{}".format(slugify(self.name), self.start_date, self.end_date) 

146 

147 

148class TempSpatial(IdProvider): 

149 """A class for temporalized spatial objects""" 

150 

151 name = models.CharField( 

152 max_length=250, 

153 blank=True, 

154 verbose_name="Name", 

155 help_text="Usually the object's contemporary name", 

156 ) 

157 alt_name = models.CharField( 

158 max_length=500, 

159 blank=True, 

160 verbose_name="Alternative Names", 

161 help_text="Alternative Names, use '; ' as separator in case of more names", 

162 ) 

163 wikidata_id = models.CharField( 

164 max_length=500, 

165 blank=True, 

166 verbose_name="Wikidata ID", 

167 help_text="The ID of a wiki data entry which can be\ 

168 reasonable associated with the current object.", 

169 ) 

170 start_date = models.DateField( 

171 verbose_name="Start Date.", help_text="Earliest date this entity captures" 

172 ) 

173 end_date = models.DateField( 

174 verbose_name="End Date.", help_text="Latest date this entity captures" 

175 ) 

176 date_accuracy = models.CharField( 

177 verbose_name="How accurate is the given date", 

178 help_text="The value indicates if the date is accurate per YEAR, MONTH or DAY", 

179 choices=DATE_ACCURACY, 

180 default=DATE_ACCURACY[0][0], 

181 max_length=3, 

182 ) 

183 source = models.ForeignKey( 

184 Source, 

185 null=True, 

186 blank=True, 

187 related_name="source_of", 

188 verbose_name="Source", 

189 help_text="The source of this data.", 

190 on_delete=models.CASCADE, 

191 ) 

192 geom = models.MultiPolygonField(blank=True, null=True, srid=4326) 

193 administrative_unit = models.ForeignKey( 

194 SkosConcept, 

195 null=True, 

196 related_name="adm_unit", 

197 on_delete=models.SET_NULL, 

198 blank=True, 

199 verbose_name="Contemporary Administrative Unit", 

200 help_text="A contemporary name of the administrative unit.", 

201 ) 

202 orig_id = models.CharField( 

203 max_length=255, 

204 null=True, 

205 blank=True, 

206 verbose_name="Any legacy Identifier", 

207 help_text="The ID of this object from the dataset used to import this data.", 

208 ) 

209 quality = models.CharField( 

210 verbose_name="Quality of this dataset", 

211 help_text="An estimation of the HistoGis Team upon the quality of this dataset", 

212 max_length=25, 

213 null=True, 

214 choices=QUALITY, 

215 default=QUALITY[1][1], 

216 ) 

217 additional_data = models.JSONField( 

218 verbose_name="Additional data", 

219 help_text="Additional data provided from the object's source.", 

220 blank=True, 

221 null=True, 

222 ) 

223 unique = models.CharField(blank=True, null=True, max_length=300, unique=True) 

224 centroid = models.PointField( 

225 blank=True, 

226 null=True, 

227 verbose_name="Centroid", 

228 help_text="The object's centroid", 

229 ) 

230 temp_extent = DateRangeField( 

231 blank=True, 

232 null=True, 

233 verbose_name="Temporal Extent", 

234 help_text="The objects temporal extent (Start and end date)", 

235 ) 

236 spatial_extent = models.FloatField( 

237 blank=True, 

238 null=True, 

239 verbose_name="Spatial Extent", 

240 help_text="Saves the area of the object", 

241 ) 

242 

243 def alt_name_list(self): 

244 """ 

245 returns a list of alt names 

246 :return: a python list of alt names 

247 """ 

248 

249 return [x.strip() for x in self.alt_name.split(";")] 

250 

251 def save(self, *args, **kwargs): 

252 """customized save function stores 

253 centroid, a hash, temp_extent and spatial_extent on save 

254 """ 

255 if self.geom and not self.centroid: 

256 cent = self.geom.centroid 

257 self.centroid = cent 

258 unique_str = "".join( 

259 [ 

260 str(self.start_date), 

261 str(self.end_date), 

262 str(self.geom.wkt), 

263 str(self.date_accuracy), 

264 ] 

265 ).encode("utf-8") 

266 self.unique = hashlib.md5(unique_str).hexdigest() 

267 if self.start_date and self.end_date: 

268 self.temp_extent = (self.start_date, self.end_date) 

269 self.spatial_extent = self.geom.area 

270 try: 

271 super().save(*args, **kwargs) 

272 except Exception as e: 

273 print(e) 

274 

275 class Meta: 

276 ordering = ["id"] 

277 

278 def get_geojson(self): 

279 geojson = serialize( 

280 "geojson", 

281 TempSpatial.objects.filter(id=self.id), 

282 geometry_field="geom", 

283 fields=("name",), 

284 ) 

285 return geojson 

286 

287 def get_absolute_url(self): 

288 return reverse("shapes:shape_detail", kwargs={"pk": self.id}) 

289 

290 def get_arche_url(self): 

291 return reverse("shapes:arche_md", kwargs={"pk": self.id}) 

292 

293 def get_json_url(self): 

294 return reverse("tempspatial-detail", kwargs={"pk": self.id}) 

295 

296 def get_permalink_url(self): 

297 return reverse("shapes:permalink-view", kwargs={"unique": self.unique}) 

298 

299 @classmethod 

300 def get_listview_url(self): 

301 return reverse("shapes:browse_shapes") 

302 

303 @classmethod 

304 def get_createview_url(self): 

305 return reverse("shapes:shape_create") 

306 

307 def get_next(self): 

308 next = next_in_order(self) 

309 if next: 

310 return next.id 

311 return False 

312 

313 def get_prev(self): 

314 prev = prev_in_order(self) 

315 if prev: 

316 return prev.id 

317 return False 

318 

319 def sq_km(self, ct=3035): 

320 """returns the size of the spatial extent in square km""" 

321 self.geom.transform(ct=ct) 

322 sq_km = self.geom.area / 1000000 

323 return sq_km 

324 

325 def slug_name(self): 

326 return f"{slugify(self.name)}__{self.start_date}_{self.end_date}" 

327 

328 def sanitize_wikidataid(self): 

329 if self.wikidata_id is not None: 

330 if self.wikidata_id.startswith("http"): 

331 return self.wikidata_id 

332 else: 

333 return f"https://www.wikidata.org/wiki/{self.wikidata_id}" 

334 else: 

335 return None 

336 

337 def __str__(self): 

338 if self.name: 

339 return f"{self.name} ({self.start_date}–{self.end_date})" 

340 else: 

341 return f"TempStatial ID: {self.id}" 

342 

343 

344class TempStatialRel(IdProvider): 

345 """Describes a temporalized relation between two TempSpatial objects""" 

346 

347 instance_a = models.ForeignKey( 

348 TempSpatial, 

349 null=True, 

350 related_name="related_instance_a", 

351 on_delete=models.SET_NULL, 

352 ) 

353 instance_b = models.ForeignKey( 

354 TempSpatial, 

355 null=True, 

356 related_name="related_instance_b", 

357 on_delete=models.SET_NULL, 

358 ) 

359 relation_type = models.ForeignKey( 

360 SkosConcept, 

361 null=True, 

362 related_name="tmp_spatial_rel_relation", 

363 on_delete=models.SET_NULL, 

364 ) 

365 start_date = models.DateField( 

366 verbose_name="Start Date.", help_text="Earliest date this relation captures" 

367 ) 

368 end_date = models.DateField( 

369 verbose_name="End Date.", help_text="Latest date this relation captures" 

370 ) 

371 date_accuracy = models.CharField( 

372 verbose_name="Date Accuracy", default="Y", max_length=3, choices=DATE_ACCURACY 

373 ) 

374 

375 def __str__(self): 

376 if self.instance_a and self.instance_b and self.relation_type: 

377 return "{} {} {}".format( 

378 self.instance_a, self.relation_type, self.instance_b 

379 ) 

380 else: 

381 return "TempStatialRel ID: {}".format(self.id)