Coverage for shps/process_upload.py: 19%

69 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-27 11:19 +0000

1import json 

2import os 

3import glob 

4import zipfile 

5import geopandas as gp 

6 

7from django.contrib.gis import geos 

8from django.contrib.gis.geos import fromstr 

9from django.db import IntegrityError 

10 

11from vocabs.models import SkosConcept, SkosConceptScheme 

12from .models import TempSpatial 

13 

14mandatory_keys = [ 

15 "id", 

16 "name", 

17 "name_alt", 

18 "start_date", 

19 "end_date", 

20 "date_acc", 

21 "adm_type", 

22 "geometry", 

23] 

24 

25 

26def unzip_shapes(path_to_zipfile, shape_temp_dir): 

27 zf = zipfile.ZipFile(path_to_zipfile, "r") 

28 zf.extractall(shape_temp_dir) 

29 shape_temp_dir = os.path.join(shape_temp_dir, "*.shp") 

30 shapefiles = [x for x in glob.iglob(shape_temp_dir, recursive=False)] 

31 return shapefiles 

32 

33 

34def import_shapes(shapefiles, source): 

35 adm_scheme, _ = SkosConceptScheme.objects.get_or_create( 

36 dc_title="administrative_unit" 

37 ) 

38 temp_spatial_ids = [] 

39 for x in shapefiles: 

40 df = gp.read_file(x).to_crs( 

41 {"proj": "longlat", "ellps": "WGS84", "datum": "WGS84"} 

42 ) 

43 for i, row in df.iterrows(): 

44 add_data = {} 

45 if row["geometry"].geom_type == "MultiPolygon": 

46 mp = row["geometry"].wkt 

47 else: 

48 mp = geos.MultiPolygon(fromstr(row["geometry"].wkt)) 

49 spat, _ = TempSpatial.objects.get_or_create( 

50 start_date=row["start_date"], 

51 end_date=row["end_date"], 

52 date_accuracy=row["date_acc"], 

53 geom=mp, 

54 ) 

55 try: 

56 try: 

57 adm, _ = SkosConcept.objects.get_or_create( 

58 pref_label=row["adm_type"] 

59 ) 

60 except IntegrityError: 

61 adm, _ = SkosConcept.objects.get_or_create(pref_label="unknown adm") 

62 except KeyError: 

63 adm, _ = SkosConcept.objects.get_or_create(pref_label="unknown adm") 

64 if adm is not None: 

65 adm.scheme.add(adm_scheme) 

66 spat.administrative_unit = adm 

67 try: 

68 if row["name"]: 

69 spat.name = row["name"] 

70 except KeyError: 

71 pass 

72 try: 

73 if row["name_alt"]: 

74 spat.alt_name = row["name_alt"] 

75 except KeyError: 

76 pass 

77 try: 

78 if row["id"]: 

79 spat.orig_id = row["id"] 

80 except KeyError: 

81 pass 

82 try: 

83 if row["wiki_id"]: 

84 spat.wikidata_id = row["wiki_id"] 

85 except KeyError: 

86 pass 

87 for x in list(df.keys()): 

88 if x not in mandatory_keys: 

89 add_data[x] = row[x] 

90 spat.additional_data = json.dumps(add_data) 

91 spat.source = source 

92 try: 

93 spat.save() 

94 temp_spatial_ids.append(spat.id) 

95 except Exception as e: 

96 print(e) 

97 

98 return temp_spatial_ids