-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlite_manager.py
More file actions
156 lines (126 loc) · 4.94 KB
/
sqlite_manager.py
File metadata and controls
156 lines (126 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import sqlite3
import functools
from utilities import report_problem_to_admin
class Singleton(type):
_isinstance = None
def __call__(self, *args, **kwargs):
if not self._isinstance:
self._isinstance = super().__call__(*args, **kwargs)
return self._isinstance
class ManageDb(metaclass=Singleton):
def __init__(self, db_name: str = "test"):
self.db_name = db_name + ".db"
@staticmethod
def handle_exceptions(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
err = f"[{type(e)}] An error occurred in {func.__name__}:\n{str(e)}"
report_problem_to_admin(err)
return e
return wrapper
@staticmethod
def init_name(name):
if isinstance(name, str):
return name.replace("'", "").replace('"', "")
else:
return name
def create_table(self, table: dict):
list_of_status = []
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, val in table.items():
coul = [f"{name} {v}" for name, v in val.items()]
list_of_status.append(cursor.execute("CREATE TABLE IF NOT EXISTS {0} ({1})".format(key, ", ".join(coul))))
db.commit()
return list_of_status
@handle_exceptions
def insert(self, table: str, rows: dict):
column = ', '.join(rows.keys())
values = [f"'{self.init_name(val)}'" for val in rows.values()]
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(f'INSERT INTO {table} ({column}) VALUES ({", ".join(values)})')
db.commit()
return cursor.lastrowid
def select(self, column: str = "*", table: str = "sqlite_master",
where: str = None, distinct: bool = False, order_by: str = None,
limit: int = None):
distinct_ = "DISTINCT " if distinct else ''
order_by_ = f'ORDER BY {order_by}' if order_by else ''
limit_ = f'LIMIT {limit}' if limit else ''
where_ = f'WHERE {where}' if where else ''
sql = f"SELECT {distinct_}{column} FROM {table} {where_} {order_by_} {limit_}"
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(sql)
db_values = cursor.fetchall()
return db_values
def delete(self, table: dict):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, value in table.items():
cursor.execute(f"DELETE FROM {key} WHERE {value[0]}='{value[1]}'")
db.commit()
def advanced_delete(self, table):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, value in table.items():
where = ''
for arg in value:
key_ = arg[0]
val_ = arg[1] if type(arg[1]) is int else f'"{arg[1]}"'
where += f'{key_} = {val_} AND '
cursor.execute(f"DELETE FROM {key} WHERE {where[:-4]}")
db.commit()
def drop_table(self, table: str):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(f"DROP TABLE IF EXISTS {table}")
db.commit()
def update(self, table, where):
where = f'where {where}' or None
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for key, value in table.items():
for k, v in value.items():
text = f"UPDATE {key} SET {k} = '{self.init_name(v)}' {where}"
cursor.execute(text)
db.commit()
return cursor.lastrowid
def custom(self, order: str, return_fetcall=True):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
cursor.execute(order)
db.commit()
if return_fetcall:
return cursor.fetchall()
def custom_multi(self, *order):
with sqlite3.connect(self.db_name) as db:
cursor = db.cursor()
for order_ in order:
cursor.execute(order_)
db.commit()
# t = {
# "student": {
# "id": "integer primary key",
# "name": "TEXT",
# "family": "TEXT",
# "age": "INTEGER"
# },
# "teacher": {
# "name": "TEXT",
# "family": "TEXT",
# "age": "INTEGER"
# }
# }
# a = ManageDb()
# a.create_table(t)
# print(a.custom("SELECT name from sqlite_master where type='table'"))
# a.insert(table='student', rows=[{'name': 'amir', 'family': 'najafi', 'age': 21}, {'name': 'fsd', 'family': 'sfd', 'age': 34}])
# a.delete({'student': ['name', 'amir']})
# a.drop_table('teacher')
# print(a.order_by(table='student'))
# print(a.select(table='student'))