Coverage for vocabs/skos.py: 8%
223 statements
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-27 11:19 +0000
« prev ^ index » next coverage.py v7.6.0, created at 2024-07-27 11:19 +0000
1import lxml.etree as ET
2import csv
3from .models import SkosConcept, SkosConceptScheme, SkosLabel
6class Csv2SkosReader(object):
7 """
8 extract SKOS-like objects from special structured CSV sheets
9 and returns a list of dictionaries containing data needed to
10 create vocabs-entries
11 """
13 def __init__(self, csv_file):
14 self.csv_file = csv_file
15 self.data = [x for x in csv.reader(self.csv_file)]
16 self.headers = self.data[0]
17 try:
18 self.alt_lang = (self.headers[1])[(self.headers[1]).index("@") + 1 :] # noqa: E203
19 except: # noqa: E722
20 self.alt_lang = None
21 self.schemes = set([x[0] for x in self.data[1:]])
22 self.number_of_schemes = len(self.schemes)
24 def get_concepts(self):
25 concepts = []
26 for x in self.data[1:]:
27 first_order = x[1].split("|")
28 if x[2] != "":
29 second_order = x[2].split("|")
30 concept = {
31 "scheme": x[0],
32 "concept": {
33 "pref_label": first_order[0],
34 "pref_label_lang": "eng",
35 "alt_label": self.alt_lang,
36 "alt_label_lang": self.alt_lang,
37 "narrower": {
38 "scheme": x[0],
39 "concept": {
40 "pref_label": second_order[0],
41 "pref_label_lang": "eng",
42 "alt_label": second_order[1],
43 "alt_label_lang": self.alt_lang,
44 },
45 },
46 },
47 }
48 else:
49 concept = {
50 "scheme": x[0],
51 "concept": {
52 "pref_label": first_order[0],
53 "pref_label_lang": "eng",
54 "alt_label": first_order[1],
55 "alt_label_lang": self.alt_lang,
56 },
57 }
58 concepts.append(concept)
60 return concepts
63class Csv2SkosImporter(Csv2SkosReader):
64 """Takes a special formatted csv file, parses it and imports the derived data into vocabs"""
66 def update_schemes(self):
67 """import/updates all conceptSchemes found in csv"""
68 report = {}
69 report["before"] = len(SkosConceptScheme.objects.all())
70 failed = []
71 success = []
72 for x in self.schemes:
73 try:
74 clean = x.split("|")[0].strip()
75 except: # noqa: E722
76 clean = x.strip()
77 try:
78 temp_scheme, _ = SkosConceptScheme.objects.get_or_create(dc_title=clean)
79 temp_scheme.save()
80 success.append(x)
81 except: # noqa: E722
82 failed.append(x)
83 report["failed"] = failed
84 report["success"] = success
85 report["after"] = len(SkosConceptScheme.objects.all())
86 return report
88 def importConcepts(self):
89 """import/updates all SkosConcepts found in csv"""
90 report = {}
91 report["before"] = len(SkosConcept.objects.all())
92 report["schemes_before"] = len(SkosConceptScheme.objects.all())
93 failed = []
94 success = []
95 for x in self.get_concepts():
96 # get scheme
97 try:
98 clean = x["scheme"].split("|")[0].strip()
99 except: # noqa: E722
100 clean = x["scheme"].strip()
101 temp_scheme, _ = SkosConceptScheme.objects.get_or_create(dc_title=clean)
102 # crete 1st order
103 try:
104 temp_label, _ = SkosLabel.objects.get_or_create(
105 label=x["concept"]["alt_label"],
106 label_type="altLabel",
107 isoCode=x["concept"]["alt_label_lang"],
108 )
109 temp_first, _ = SkosConcept.objects.get_or_create(
110 pref_label=x["concept"]["pref_label"],
111 pref_label_lang=x["concept"]["pref_label_lang"],
112 )
113 temp_first.label = [temp_label]
114 temp_first.scheme = [temp_scheme]
115 success.append(x["concept"]["pref_label"])
116 except: # noqa: E722
117 failed.append(x["concept"]["pref_label"])
118 try:
119 second = x["concept"]["narrower"]["concept"]
120 # crete 2st order
121 try:
122 temp_label, _ = SkosLabel.objects.get_or_create(
123 label=second["alt_label"],
124 label_type="altLabel",
125 isoCode=second["alt_label_lang"],
126 )
127 temp_second, _ = SkosConcept.objects.get_or_create(
128 pref_label=second["pref_label"],
129 pref_label_lang=second["pref_label_lang"],
130 )
131 temp_second.label = [temp_label]
132 temp_second.scheme = [temp_scheme]
133 temp_first.skos_narrower = [temp_second]
134 success.append(second["pref_label"])
135 except: # noqa: E722
136 failed.append(second["pref_label"])
137 except: # noqa: E722
138 pass
139 report["failed"] = failed
140 report["success"] = success
141 report["after"] = len(SkosConcept.objects.all())
142 report["schemes_after"] = len(SkosConceptScheme.objects.all())
143 return report
145 def update_concepts(self):
146 """import/updates all SkosConcepts found in csv"""
147 report = {}
148 report["before"] = len(SkosConcept.objects.all())
149 report["after"] = len(SkosConcept.objects.all())
150 return report
153class SkosReader(object):
154 """
155 reads a skos file (RDF/XML) and returns a list of dictionaries
156 containing rdf:Description properties
157 concept-id: (URL)
158 notation: (derived from concept-id)
159 pref_labels: (list of labels)
160 skos:broader: (list of broader elements)
161 skos:narrower: ...
162 skos:closeMatch ...
163 skos:inScheme: (list of all conceptSchemes a concept is related to
164 """
166 def __init__(self, skosfile):
167 self.ns_skos = "http://www.w3.org/2004/02/skos/core#"
168 self.ns_rdf = "http://www.w3.org/1999/02/22-rdf-syntax-ns#"
169 self.skosfile = skosfile
171 try:
172 self.tree = ET.parse(skosfile)
173 self.parsed_file = ET.tostring(self.tree, encoding="utf-8")
174 except: # noqa: E722
175 self.parsed_file = "parsing didn't work"
177 try:
178 self.extractedDescriptions = self.tree.findall(
179 "rdf:Description", namespaces={"rdf": self.ns_rdf}
180 )
181 self.numberOfextractedDescriptions = len(self.extractedDescriptions)
182 except: # noqa: E722
183 self.extractedDescriptions = "rdf:Descriptions could not be extracted."
184 self.numberOfextractedDescriptions = 0
186 def returnDescriptions(self):
187 descriptions = []
188 for x in self.extractedDescriptions:
189 description = {}
190 temp_type = x.find("rdf:type", namespaces={"rdf": self.ns_rdf})
191 if temp_type is not None:
192 description["type"] = temp_type.attrib[
193 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource"
194 ]
195 else:
196 description["type"] = "no type"
197 description["id"] = x.attrib[
198 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}about"
199 ]
200 description["notation"] = x.find(
201 "skos:notation", namespaces={"skos": self.ns_skos}
202 )
204 skos_pref_labels = []
205 for y in x.findall("skos:prefLabel", namespaces={"skos": self.ns_skos}):
206 skos_label = {}
207 skos_label["text"] = y.text
208 skos_label["lang"] = y.attrib[
209 "{http://www.w3.org/XML/1998/namespace}lang"
210 ]
211 skos_pref_labels.append(skos_label)
212 description["pref_labels"] = skos_pref_labels
213 skos_definitions = []
214 for y in x.findall("skos:definition", namespaces={"skos": self.ns_skos}):
215 skos_definitions.append(y.text)
216 description["definitions"] = skos_definitions
218 skos_alt_labels = []
219 for y in x.findall("skos:altLabel", namespaces={"skos": self.ns_skos}):
220 skos_label = {}
221 skos_label["text"] = y.text
222 skos_label["lang"] = y.attrib[
223 "{http://www.w3.org/XML/1998/namespace}lang"
224 ]
225 skos_alt_labels.append(skos_label)
226 description["alt_labels"] = skos_alt_labels
228 skos_broader = []
229 for y in x.findall("skos:broader", namespaces={"skos": self.ns_skos}):
230 broader = {}
231 broader["uri"] = y.attrib[
232 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource"
233 ]
234 broader["notation"] = broader["uri"].split("/")[-1]
235 skos_broader.append(broader)
236 description["broader"] = skos_broader
238 skos_narrower = []
239 for y in x.findall("skos:narrower", namespaces={"skos": self.ns_skos}):
240 narrower = {}
241 narrower["uri"] = y.attrib[
242 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource"
243 ]
244 narrower["notation"] = narrower["uri"].split("/")[-1]
245 skos_narrower.append(narrower)
246 description["narrower"] = skos_narrower
248 skos_closeMatch = []
249 for y in x.findall("skos:closeMatch", namespaces={"skos": self.ns_skos}):
250 closeMatch = {}
251 closeMatch["uri"] = y.attrib[
252 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource"
253 ]
254 closeMatch["notation"] = closeMatch["uri"].split("/")[-1]
255 skos_closeMatch.append(closeMatch)
256 description["closeMatch"] = skos_closeMatch
258 skos_schemes = []
259 for y in x.findall("skos:inScheme", namespaces={"skos": self.ns_skos}):
260 skos_schemes.append(
261 y.attrib["{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource"]
262 )
263 description["schemes"] = skos_schemes
264 descriptions.append(description)
265 return descriptions
267 def countConcepts(self):
268 return len(self.returnDescriptions())
271class SkosImporter(SkosReader):
272 """Imports concepts and concept schemes to django application/database"""
274 def importConcepts(self):
275 concepts_before = len(SkosConcept.objects.all())
276 num_description_type_concept = 0
277 num_description_type_concept_scheme = 0
278 for x in self.returnDescriptions():
279 if x["type"] == "http://www.w3.org/2004/02/skos/core#ConceptScheme":
280 temp_concept_scheme, _ = SkosConceptScheme.objects.get_or_create(
281 legacy_id=x["id"]
282 )
283 temp_concept_scheme.save()
284 num_description_type_concept_scheme += 1
286 else:
287 temp_uri = x["id"]
288 temp_notation = temp_uri.split("/")[-1]
289 temp_concept, _ = SkosConcept.objects.get_or_create(
290 legacy_id=temp_uri, notation=temp_notation
291 )
292 try:
293 temp_concept.pref_label = x["pref_labels"][0]["text"]
294 temp_concept.pref_label_lang = x["pref_labels"]["lang"]
295 except: # noqa: E722
296 pass
297 try:
298 temp_concept.definition = x["definitions"][0]
299 temp_concept.definition_lang = "eng"
300 except: # noqa: E722
301 pass
302 temp_concept.save()
304 for y in x["pref_labels"][1:]:
305 temp_label, _ = SkosLabel.objects.get_or_create(
306 label=y["text"], isoCode=y["lang"], label_type="prefLabel"
307 )
308 temp_concept.label = [temp_label]
309 temp_concept.save()
311 for y in x["alt_labels"][1:]:
312 temp_label, _ = SkosLabel.objects.get_or_create(
313 label=y["text"], isoCode=y["lang"], label_type="altLabel"
314 )
315 temp_concept.label = [temp_label]
316 temp_concept.save()
318 for z in x["schemes"]:
319 temp_scheme, _ = SkosConceptScheme.objects.get_or_create(
320 legacy_id=z
321 )
322 scheme_dctitle = z.split("/")[-1]
323 temp_scheme.dc_title = scheme_dctitle
324 temp_scheme.save()
325 temp_concept.scheme = [temp_scheme]
326 temp_concept.save()
328 for y in x["broader"]:
329 temp_broader, _ = SkosConcept.objects.get_or_create(
330 legacy_id=y["uri"], notation=y["notation"]
331 )
332 temp_broader.save()
333 temp_concept.skos_broader = [temp_broader]
334 temp_concept.save()
336 for y in x["narrower"]:
337 temp_narrower, _ = SkosConcept.objects.get_or_create(
338 legacy_id=y["uri"], notation=y["notation"]
339 )
340 temp_narrower.save()
341 temp_concept.skos_narrower = [temp_narrower]
342 temp_concept.save()
344 for y in x["closeMatch"]:
345 temp_closeMatch, _ = SkosConcept.objects.get_or_create(
346 legacy_id=y["uri"], notation=y["notation"]
347 )
348 temp_closeMatch.save()
349 temp_concept.skos_closematch = [temp_closeMatch]
350 temp_concept.save()
352 num_description_type_concept += 1
353 concepts_after = len(SkosConcept.objects.all())
354 summary = (
355 "#descr. type 'concept': {} | #descr. type 'conceptSchemes': {}".format(
356 num_description_type_concept, num_description_type_concept_scheme
357 )
358 )
360 report = {
361 "summary": summary,
362 "before": concepts_before,
363 "after": concepts_after,
364 }
365 return report
367 def test_if_class_works(self):
368 check = "Works!"
369 return check