31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413 | class BaseRowClient:
def get_jwt_token(self) -> str:
"""fetches a baserow auth token
Returns:
str: the baserow auth token
"""
url = f"{self.br_base_url}user/token-auth/"
payload = {"password": self.br_pw, "username": self.br_user}
r = requests.post(url=url, json=payload)
return r.json()["token"]
def url_fixer(self, url: str) -> str:
"""checks if the passed in URL ends with slash and appends one if not
Args:
url (str): URL to check (or any other string)
Returns:
str: URL ending with "/"
"""
if url.endswith("/"):
return url
else:
return f"{url}/"
def list_tables(self, br_database_id: Union[int, str]) -> list:
"""retuns the baserow api enspoint listing all tables of the given database
Args:
br_database_id (Union[int, str]): The ID of the database
Returns:
list: a list with dicts like `{'id': 100947, 'name': 'place', 'order': 2, 'database_id': 41426}`
"""
db_url = f"{self.br_base_url}database/tables/database/{br_database_id}/"
r = requests.get(
url=db_url, headers={"Authorization": f"JWT {self.br_jwt_token}"}
)
return r.json()
def get_table_by_name(self, br_database_id, br_table_name):
"""
Retrieve the table ID for a table with the specified name within a database.
Args:
br_database_id: The ID of the Baserow database to search in
br_table_name (str): The name of the table to find
Returns:
str or False: The table ID as a string if found, False if not found
"""
tables = self.list_tables(br_database_id)
table_id = False
for x in tables:
if x["name"] == br_table_name:
table_id = str(x["id"])
return table_id
def list_fields(self, br_table_id):
"""
Retrieve a list of all fields for a specified Baserow table.
Args:
br_table_id (int): The ID of the Baserow table to retrieve fields from.
Returns:
dict: JSON response containing the list of fields and their metadata
from the Baserow API.
"""
url = f"{self.br_base_url}database/fields/table/{br_table_id}/"
r = requests.get(url, headers={"Authorization": f"JWT {self.br_jwt_token}"})
return r.json()
def search_rows(self, br_table_id, q, query_field_id, lookup_type="contains"):
"""
Search for rows in a Baserow table based on a query string and field.
Args:
br_table_id (int): The ID of the Baserow table to search in.
q (str): The query string to search for.
query_field_id (int): The ID of the field to search within.
lookup_type (str, optional): The type of lookup to perform.
Defaults to "contains". Other options may include "exact",
"starts_with", "ends_with", etc.
Returns:
dict: JSON response from the Baserow API containing the search results.
Typically includes a list of matching rows and metadata.
"""
url = f"{self.br_base_url}database/rows/table/{br_table_id}/?user_field_names=true&filter__field_{query_field_id}__{lookup_type}={q}" # noqa
r = requests.get(url, headers={"Authorization": f"JWT {self.br_jwt_token}"})
return r.json()
def yield_rows(self, br_table_id, filters={}):
br_rows_url = f"{self.br_base_url}database/rows/table/{br_table_id}/"
url = f"{br_rows_url}?user_field_names=true"
if filters:
for key, value in filters.items():
url += f"&{key}={value}"
next_page = True
while next_page:
print(url)
response = None
result = None
x = None
response = requests.get(url, headers=self.headers)
result = response.json()
next_page = result["next"]
url = result["next"]
for x in result["results"]:
yield x
def dump_tables_as_json(self, br_table_id, folder_name=None, indent=0):
tables = self.list_tables(br_table_id)
file_names = []
for x in tables:
data = {x["id"]: x for x in self.yield_rows(f"{x['id']}")}
f_name = f"{x['name']}.json"
if folder_name is not None:
f_name = os.path.join(folder_name, f_name)
with open(f_name, "w", encoding="utf-8") as f:
if indent:
json.dump(data, f, ensure_ascii=False, indent=indent)
else:
json.dump(data, f, ensure_ascii=False)
file_names.append(f_name)
return file_names
def fetch_table_field_dict(self, br_db_id):
print(f"fetching table and field info for {br_db_id}")
br_tables = self.list_tables(br_db_id)
table_dict = {}
for x in br_tables:
field_dict = {}
table_dict[x["name"]] = x
for f in self.list_fields(x["id"]):
field_dict[f["name"]] = f
table_dict[x["name"]]["fields"] = field_dict
br_table_dict = table_dict
return br_table_dict
def get_or_create(self, table_name, field_name, lookup_dict, q):
"""
Get an existing row or create a new one in a Baserow table.
Searches for a row in the specified table where the given field matches the query value.
If exactly one matching row is found, returns that row. If no matching row is found,
creates a new row with the specified field value.
Args:
table_name (str): Name of the table to search/create in
field_name (str): Name of the field to search by and set value for
lookup_dict (dict): Dictionary containing table and field metadata including IDs
q (str): Query value to search for and use when creating new row
Returns:
tuple: A tuple containing:
- object (dict): The found or created row data
- created (bool): True if a new row was created, False if existing row was found
"""
br_table_id = lookup_dict[table_name]["id"]
query_field_id = lookup_dict[table_name]["fields"][field_name]["id"]
match = self.search_rows(br_table_id, q, query_field_id, lookup_type="equal")
if match["count"] == 1:
object, created = match["results"][0], False
else:
create_url = f"{self.br_base_url}database/rows/table/{br_table_id}/?user_field_names=true"
item = {field_name: q}
r = requests.post(
create_url,
headers={
"Authorization": f"Token {self.br_token}",
"Content-Type": "application/json",
},
json=item,
)
object, created = r.json(), True
return object, created
def delete_table(self, table_id):
url = f"{self.br_base_url}database/tables/{table_id}/"
r = requests.delete(
url,
headers={
"Authorization": f"JWT {self.br_jwt_token}",
"Content-Type": "application/json",
},
)
if r.status_code == 204:
object, deleted = {"status": f"table {table_id} deleted"}, True
else:
object, deleted = {"error": r.status_code}, False
return object, deleted
def create_table(self, table_name, fields=None):
database_id = self.br_db_id
url = f"{self.br_base_url}database/tables/database/{database_id}/"
payload = {"name": table_name}
if fields is not None:
payload["data"] = fields
payload["first_row_header"] = True
r = requests.post(
url=url,
headers={
"Authorization": f"JWT {self.br_jwt_token}",
"Content-Type": "application/json",
},
json=payload,
)
if r.status_code == 200:
object, created = r.json(), True
else:
object, created = {"error": r.status_code}, False
return object, created
def delete_table_fields(self, br_table_id, field_names):
object, deleted = {"status": "no fields to delete"}, True
for f in self.list_fields(br_table_id):
if f["name"] in field_names:
print("Deleting field... ", f["name"], f["id"])
url = f"{self.br_base_url}database/fields/{f['id']}/"
r = requests.delete(
url,
headers={
"Authorization": f"JWT {self.br_jwt_token}",
"Content-Type": "application/json",
},
)
if r.status_code == 200:
print(
f"Deleted field {f['name']} with id: {f['id']} in {br_table_id}"
)
object, deleted = r.json(), True
else:
print(f"Error {r.status_code} with {br_table_id} in delete_fields")
object, deleted = {"error": r.status_code}, False
return object, deleted
def create_table_fields(self, br_table_id, br_table_fields):
url = f"{self.br_base_url}database/fields/table/{br_table_id}/"
payload, valid = self.validate_table_fields_type(br_table_fields)
if valid:
for field in payload:
r = requests.post(
url=url,
headers={
"Authorization": f"JWT {self.br_jwt_token}",
"Content-Type": "application/json",
},
json=field,
)
if r.status_code == 200:
object, created = r.json(), True
else:
object, created = {"error": r.status_code}, False
else:
object, created = {"error": "Field type schema wrong."}, valid
print(
object["error"],
"Visit https://api.baserow.io/api/redoc/ to learn more.",
)
return object, created
def validate_table_fields_type(self, br_table_fields):
valid = True
required_keys = ["name", "type"]
for f in br_table_fields:
for k in required_keys:
if k not in f.keys():
valid = False
raise KeyError(f"missing required key: {k}")
valid_types = [
"text",
"long_text",
"number",
"date",
"boolean",
"link_row",
"formula",
]
for f in br_table_fields:
if f["type"] not in valid_types:
valid = False
raise KeyError(f"invalid field type: {f['type']}")
if f["type"] == "formula":
if "formula" not in f.keys():
valid = False
raise KeyError("formula field missing 'formula' key")
elif not isinstance(f["formula"], str):
valid = False
raise ValueError("formula field must be a string")
if f["type"] == "link_row":
if "link_row_table_id" not in f.keys():
valid = False
raise KeyError("link_row field missing 'link_row_table_id' key")
elif not isinstance(f["link_row_table_id"], int):
valid = False
raise ValueError("link_row_table_id field must be a integer")
return br_table_fields, valid
def patch_row(self, table_id: str, row_id: str, payload: dict) -> dict:
"""sends a PATCH request for the given row
Args:
table_id (str): The ID of the table
row_id (str): The ID of the row
payload (dict): The patch-data, see https://api.baserow.io/api/redoc/#tag/Database-table-rows/operation/update_database_table_row
Returns:
dict: The JSON response of the updated row
""" # noqa:
url = f"{self.br_base_url}database/rows/table/{table_id}/{row_id}/?user_field_names=true"
print(url)
r = requests.patch(
url,
headers={
"Authorization": f"Token {self.br_token}",
"Content-Type": "application/json",
},
json=payload,
)
return r.json()
def batch_update_rows(self, table_id: str, payload: list) -> dict:
"""Sends PATCH requests for the given rows in batches of 190.
Args:
table_id (str): The ID of the table
payload (list): The patch-data for multiple rows
Returns:
dict: A dict with keys "updated_rows" with updated row objects and "errors".
"""
url = f"{self.br_base_url}database/rows/table/{table_id}/batch/?user_field_names=true"
batch_size = 199
updated_rows = []
errors = []
print(f"start updating {len(payload)} rows")
for i in range(0, len(payload), batch_size):
batch = payload[i : i + batch_size] # noqa
r = requests.patch(
url,
headers={
"Authorization": f"Token {self.br_token}",
"Content-Type": "application/json",
},
json={"items": batch},
)
try:
resp = r.json()
except Exception:
errors.append(
{"error": "Invalid JSON response", "status_code": r.status_code}
)
continue
try:
updated_rows.extend(resp["items"])
except KeyError:
updated_rows.extend([])
errors.append(resp["error"])
return {"updated_rows": updated_rows, "errors": errors}
def __init__(
self,
br_user,
br_pw,
br_token,
br_base_url="https://api.baserow.io/api/",
br_db_id=None,
):
self.br_user = br_user
self.br_pw = br_pw
self.br_token = br_token
self.br_base_url = self.url_fixer(br_base_url)
self.br_jwt_token = self.get_jwt_token()
self.headers = {
"Authorization": f"Token {self.br_token}",
"Content-Type": "application/json",
}
if br_db_id:
self.br_db_id = br_db_id
self.br_table_dict = self.fetch_table_field_dict(self.br_db_id)
else:
self.br_db_id = None
self.br_table_dict = None
|