Coverage for vocabs/skos.py: 8%

223 statements  

« prev     ^ index     » next       coverage.py v7.6.0, created at 2024-07-27 11:19 +0000

1import lxml.etree as ET 

2import csv 

3from .models import SkosConcept, SkosConceptScheme, SkosLabel 

4 

5 

6class Csv2SkosReader(object): 

7 """ 

8 extract SKOS-like objects from special structured CSV sheets 

9 and returns a list of dictionaries containing data needed to 

10 create vocabs-entries 

11 """ 

12 

13 def __init__(self, csv_file): 

14 self.csv_file = csv_file 

15 self.data = [x for x in csv.reader(self.csv_file)] 

16 self.headers = self.data[0] 

17 try: 

18 self.alt_lang = (self.headers[1])[(self.headers[1]).index("@") + 1 :] # noqa: E203 

19 except: # noqa: E722 

20 self.alt_lang = None 

21 self.schemes = set([x[0] for x in self.data[1:]]) 

22 self.number_of_schemes = len(self.schemes) 

23 

24 def get_concepts(self): 

25 concepts = [] 

26 for x in self.data[1:]: 

27 first_order = x[1].split("|") 

28 if x[2] != "": 

29 second_order = x[2].split("|") 

30 concept = { 

31 "scheme": x[0], 

32 "concept": { 

33 "pref_label": first_order[0], 

34 "pref_label_lang": "eng", 

35 "alt_label": self.alt_lang, 

36 "alt_label_lang": self.alt_lang, 

37 "narrower": { 

38 "scheme": x[0], 

39 "concept": { 

40 "pref_label": second_order[0], 

41 "pref_label_lang": "eng", 

42 "alt_label": second_order[1], 

43 "alt_label_lang": self.alt_lang, 

44 }, 

45 }, 

46 }, 

47 } 

48 else: 

49 concept = { 

50 "scheme": x[0], 

51 "concept": { 

52 "pref_label": first_order[0], 

53 "pref_label_lang": "eng", 

54 "alt_label": first_order[1], 

55 "alt_label_lang": self.alt_lang, 

56 }, 

57 } 

58 concepts.append(concept) 

59 

60 return concepts 

61 

62 

63class Csv2SkosImporter(Csv2SkosReader): 

64 """Takes a special formatted csv file, parses it and imports the derived data into vocabs""" 

65 

66 def update_schemes(self): 

67 """import/updates all conceptSchemes found in csv""" 

68 report = {} 

69 report["before"] = len(SkosConceptScheme.objects.all()) 

70 failed = [] 

71 success = [] 

72 for x in self.schemes: 

73 try: 

74 clean = x.split("|")[0].strip() 

75 except: # noqa: E722 

76 clean = x.strip() 

77 try: 

78 temp_scheme, _ = SkosConceptScheme.objects.get_or_create(dc_title=clean) 

79 temp_scheme.save() 

80 success.append(x) 

81 except: # noqa: E722 

82 failed.append(x) 

83 report["failed"] = failed 

84 report["success"] = success 

85 report["after"] = len(SkosConceptScheme.objects.all()) 

86 return report 

87 

88 def importConcepts(self): 

89 """import/updates all SkosConcepts found in csv""" 

90 report = {} 

91 report["before"] = len(SkosConcept.objects.all()) 

92 report["schemes_before"] = len(SkosConceptScheme.objects.all()) 

93 failed = [] 

94 success = [] 

95 for x in self.get_concepts(): 

96 # get scheme 

97 try: 

98 clean = x["scheme"].split("|")[0].strip() 

99 except: # noqa: E722 

100 clean = x["scheme"].strip() 

101 temp_scheme, _ = SkosConceptScheme.objects.get_or_create(dc_title=clean) 

102 # crete 1st order 

103 try: 

104 temp_label, _ = SkosLabel.objects.get_or_create( 

105 label=x["concept"]["alt_label"], 

106 label_type="altLabel", 

107 isoCode=x["concept"]["alt_label_lang"], 

108 ) 

109 temp_first, _ = SkosConcept.objects.get_or_create( 

110 pref_label=x["concept"]["pref_label"], 

111 pref_label_lang=x["concept"]["pref_label_lang"], 

112 ) 

113 temp_first.label = [temp_label] 

114 temp_first.scheme = [temp_scheme] 

115 success.append(x["concept"]["pref_label"]) 

116 except: # noqa: E722 

117 failed.append(x["concept"]["pref_label"]) 

118 try: 

119 second = x["concept"]["narrower"]["concept"] 

120 # crete 2st order 

121 try: 

122 temp_label, _ = SkosLabel.objects.get_or_create( 

123 label=second["alt_label"], 

124 label_type="altLabel", 

125 isoCode=second["alt_label_lang"], 

126 ) 

127 temp_second, _ = SkosConcept.objects.get_or_create( 

128 pref_label=second["pref_label"], 

129 pref_label_lang=second["pref_label_lang"], 

130 ) 

131 temp_second.label = [temp_label] 

132 temp_second.scheme = [temp_scheme] 

133 temp_first.skos_narrower = [temp_second] 

134 success.append(second["pref_label"]) 

135 except: # noqa: E722 

136 failed.append(second["pref_label"]) 

137 except: # noqa: E722 

138 pass 

139 report["failed"] = failed 

140 report["success"] = success 

141 report["after"] = len(SkosConcept.objects.all()) 

142 report["schemes_after"] = len(SkosConceptScheme.objects.all()) 

143 return report 

144 

145 def update_concepts(self): 

146 """import/updates all SkosConcepts found in csv""" 

147 report = {} 

148 report["before"] = len(SkosConcept.objects.all()) 

149 report["after"] = len(SkosConcept.objects.all()) 

150 return report 

151 

152 

153class SkosReader(object): 

154 """ 

155 reads a skos file (RDF/XML) and returns a list of dictionaries 

156 containing rdf:Description properties 

157 concept-id: (URL) 

158 notation: (derived from concept-id) 

159 pref_labels: (list of labels) 

160 skos:broader: (list of broader elements) 

161 skos:narrower: ... 

162 skos:closeMatch ... 

163 skos:inScheme: (list of all conceptSchemes a concept is related to 

164 """ 

165 

166 def __init__(self, skosfile): 

167 self.ns_skos = "http://www.w3.org/2004/02/skos/core#" 

168 self.ns_rdf = "http://www.w3.org/1999/02/22-rdf-syntax-ns#" 

169 self.skosfile = skosfile 

170 

171 try: 

172 self.tree = ET.parse(skosfile) 

173 self.parsed_file = ET.tostring(self.tree, encoding="utf-8") 

174 except: # noqa: E722 

175 self.parsed_file = "parsing didn't work" 

176 

177 try: 

178 self.extractedDescriptions = self.tree.findall( 

179 "rdf:Description", namespaces={"rdf": self.ns_rdf} 

180 ) 

181 self.numberOfextractedDescriptions = len(self.extractedDescriptions) 

182 except: # noqa: E722 

183 self.extractedDescriptions = "rdf:Descriptions could not be extracted." 

184 self.numberOfextractedDescriptions = 0 

185 

186 def returnDescriptions(self): 

187 descriptions = [] 

188 for x in self.extractedDescriptions: 

189 description = {} 

190 temp_type = x.find("rdf:type", namespaces={"rdf": self.ns_rdf}) 

191 if temp_type is not None: 

192 description["type"] = temp_type.attrib[ 

193 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource" 

194 ] 

195 else: 

196 description["type"] = "no type" 

197 description["id"] = x.attrib[ 

198 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}about" 

199 ] 

200 description["notation"] = x.find( 

201 "skos:notation", namespaces={"skos": self.ns_skos} 

202 ) 

203 

204 skos_pref_labels = [] 

205 for y in x.findall("skos:prefLabel", namespaces={"skos": self.ns_skos}): 

206 skos_label = {} 

207 skos_label["text"] = y.text 

208 skos_label["lang"] = y.attrib[ 

209 "{http://www.w3.org/XML/1998/namespace}lang" 

210 ] 

211 skos_pref_labels.append(skos_label) 

212 description["pref_labels"] = skos_pref_labels 

213 skos_definitions = [] 

214 for y in x.findall("skos:definition", namespaces={"skos": self.ns_skos}): 

215 skos_definitions.append(y.text) 

216 description["definitions"] = skos_definitions 

217 

218 skos_alt_labels = [] 

219 for y in x.findall("skos:altLabel", namespaces={"skos": self.ns_skos}): 

220 skos_label = {} 

221 skos_label["text"] = y.text 

222 skos_label["lang"] = y.attrib[ 

223 "{http://www.w3.org/XML/1998/namespace}lang" 

224 ] 

225 skos_alt_labels.append(skos_label) 

226 description["alt_labels"] = skos_alt_labels 

227 

228 skos_broader = [] 

229 for y in x.findall("skos:broader", namespaces={"skos": self.ns_skos}): 

230 broader = {} 

231 broader["uri"] = y.attrib[ 

232 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource" 

233 ] 

234 broader["notation"] = broader["uri"].split("/")[-1] 

235 skos_broader.append(broader) 

236 description["broader"] = skos_broader 

237 

238 skos_narrower = [] 

239 for y in x.findall("skos:narrower", namespaces={"skos": self.ns_skos}): 

240 narrower = {} 

241 narrower["uri"] = y.attrib[ 

242 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource" 

243 ] 

244 narrower["notation"] = narrower["uri"].split("/")[-1] 

245 skos_narrower.append(narrower) 

246 description["narrower"] = skos_narrower 

247 

248 skos_closeMatch = [] 

249 for y in x.findall("skos:closeMatch", namespaces={"skos": self.ns_skos}): 

250 closeMatch = {} 

251 closeMatch["uri"] = y.attrib[ 

252 "{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource" 

253 ] 

254 closeMatch["notation"] = closeMatch["uri"].split("/")[-1] 

255 skos_closeMatch.append(closeMatch) 

256 description["closeMatch"] = skos_closeMatch 

257 

258 skos_schemes = [] 

259 for y in x.findall("skos:inScheme", namespaces={"skos": self.ns_skos}): 

260 skos_schemes.append( 

261 y.attrib["{http://www.w3.org/1999/02/22-rdf-syntax-ns#}resource"] 

262 ) 

263 description["schemes"] = skos_schemes 

264 descriptions.append(description) 

265 return descriptions 

266 

267 def countConcepts(self): 

268 return len(self.returnDescriptions()) 

269 

270 

271class SkosImporter(SkosReader): 

272 """Imports concepts and concept schemes to django application/database""" 

273 

274 def importConcepts(self): 

275 concepts_before = len(SkosConcept.objects.all()) 

276 num_description_type_concept = 0 

277 num_description_type_concept_scheme = 0 

278 for x in self.returnDescriptions(): 

279 if x["type"] == "http://www.w3.org/2004/02/skos/core#ConceptScheme": 

280 temp_concept_scheme, _ = SkosConceptScheme.objects.get_or_create( 

281 legacy_id=x["id"] 

282 ) 

283 temp_concept_scheme.save() 

284 num_description_type_concept_scheme += 1 

285 

286 else: 

287 temp_uri = x["id"] 

288 temp_notation = temp_uri.split("/")[-1] 

289 temp_concept, _ = SkosConcept.objects.get_or_create( 

290 legacy_id=temp_uri, notation=temp_notation 

291 ) 

292 try: 

293 temp_concept.pref_label = x["pref_labels"][0]["text"] 

294 temp_concept.pref_label_lang = x["pref_labels"]["lang"] 

295 except: # noqa: E722 

296 pass 

297 try: 

298 temp_concept.definition = x["definitions"][0] 

299 temp_concept.definition_lang = "eng" 

300 except: # noqa: E722 

301 pass 

302 temp_concept.save() 

303 

304 for y in x["pref_labels"][1:]: 

305 temp_label, _ = SkosLabel.objects.get_or_create( 

306 label=y["text"], isoCode=y["lang"], label_type="prefLabel" 

307 ) 

308 temp_concept.label = [temp_label] 

309 temp_concept.save() 

310 

311 for y in x["alt_labels"][1:]: 

312 temp_label, _ = SkosLabel.objects.get_or_create( 

313 label=y["text"], isoCode=y["lang"], label_type="altLabel" 

314 ) 

315 temp_concept.label = [temp_label] 

316 temp_concept.save() 

317 

318 for z in x["schemes"]: 

319 temp_scheme, _ = SkosConceptScheme.objects.get_or_create( 

320 legacy_id=z 

321 ) 

322 scheme_dctitle = z.split("/")[-1] 

323 temp_scheme.dc_title = scheme_dctitle 

324 temp_scheme.save() 

325 temp_concept.scheme = [temp_scheme] 

326 temp_concept.save() 

327 

328 for y in x["broader"]: 

329 temp_broader, _ = SkosConcept.objects.get_or_create( 

330 legacy_id=y["uri"], notation=y["notation"] 

331 ) 

332 temp_broader.save() 

333 temp_concept.skos_broader = [temp_broader] 

334 temp_concept.save() 

335 

336 for y in x["narrower"]: 

337 temp_narrower, _ = SkosConcept.objects.get_or_create( 

338 legacy_id=y["uri"], notation=y["notation"] 

339 ) 

340 temp_narrower.save() 

341 temp_concept.skos_narrower = [temp_narrower] 

342 temp_concept.save() 

343 

344 for y in x["closeMatch"]: 

345 temp_closeMatch, _ = SkosConcept.objects.get_or_create( 

346 legacy_id=y["uri"], notation=y["notation"] 

347 ) 

348 temp_closeMatch.save() 

349 temp_concept.skos_closematch = [temp_closeMatch] 

350 temp_concept.save() 

351 

352 num_description_type_concept += 1 

353 concepts_after = len(SkosConcept.objects.all()) 

354 summary = ( 

355 "#descr. type 'concept': {} | #descr. type 'conceptSchemes': {}".format( 

356 num_description_type_concept, num_description_type_concept_scheme 

357 ) 

358 ) 

359 

360 report = { 

361 "summary": summary, 

362 "before": concepts_before, 

363 "after": concepts_after, 

364 } 

365 return report 

366 

367 def test_if_class_works(self): 

368 check = "Works!" 

369 return check