-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdatabase_control.py
More file actions
128 lines (109 loc) · 4.43 KB
/
database_control.py
File metadata and controls
128 lines (109 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import datetime
import os
from typing import Any
from dateutil.relativedelta import relativedelta
import bson
import pymongo.collection
from pymongo import MongoClient as MongoClient
from pymongo.command_cursor import CommandCursor
import credentials
import processing
class DatabaseControl:
CONNECT_WITH_CREDENTIALS: dict = {
"host": credentials.DATABASE_HOST_URI,
"port": credentials.DATABASE_PORT,
"username": credentials.DATABASE_USER,
"password": credentials.DATABASE_USER_PASSWORD,
"authSource": credentials.DATABASE_AUTH_DB_NAME,
"authMechanism": "SCRAM-SHA-256",
}
def restore_from_dump(self,
database_name: str,
path_to_dump: str,
db_flag: bool = True,
_credentials: dict = None) -> None:
if not db_flag:
return
_credentials: dict = _credentials or self.CONNECT_WITH_CREDENTIALS
client = MongoClient(**_credentials)
db = client[database_name]
for coll in os.listdir(path_to_dump):
if coll.endswith('.bson'):
with open(os.path.join(path_to_dump, coll), 'rb+') as f:
db[coll.split('.')[0]].insert_many(bson.decode_all(f.read()))
def get_collection(self,
database_name: str,
collection_name: str,
db_flag: bool = True,
_credentials: dict = None) -> pymongo.collection.Collection | None:
if not db_flag:
return
_credentials: dict = _credentials or self.CONNECT_WITH_CREDENTIALS
client = MongoClient(**_credentials)
db = client[database_name]
data_collection = db[collection_name]
return data_collection
def get_statistics_collection(self) -> pymongo.collection.Collection | None:
return self.get_collection(database_name=credentials.DATABASE_NAME,
collection_name=credentials.DATABASE_STATISTICS_COLLECTION_NAME)
def get_result(self, dt_from: datetime.datetime,
dt_upto: datetime.datetime,
group_type: str) -> dict:
sample_collection: pymongo.collection.Collection = self.get_statistics_collection()
key_value: dict = {
"dt": {
"$gte": dt_from,
"$lte": dt_upto
}
}
grouping_mapping: dict = {
"_id": {
"date": {
"$dateTrunc": {
"date": "$dt",
"unit": group_type
}
}
},
"dataset": {
"$sum":
"$value"
}
}
_arguments: dict = {f"{group_type}s": +1}
extra_step_to_include_upper_bound: relativedelta = relativedelta(**_arguments)
dt_upto_including_upper_bound = dt_upto + extra_step_to_include_upper_bound
density: dict = {
"field": "_id.date",
"range": {
"bounds": [dt_from, dt_upto_including_upper_bound if dt_upto.minute == 0 else dt_upto],
"step": 1,
"unit": group_type
},
}
filling: dict = {"output": {"dataset": {"value": 0}}}
result: CommandCursor[Any] = sample_collection.aggregate([
# retrieve matching dates
{"$match": key_value},
# group by 'group_type'
{"$group": grouping_mapping},
# densify (create missing dates)
# 'densify' only available since MongoDB 5.1
{"$densify": density},
# fill missing data with zeroes
{"$fill": filling},
# sort by ascending data
{"$sort": {"_id": 1}}
])
# print(list(result))
_out: tuple[str, str] = ("dataset", "labels")
_out_values: list[tuple[Any, str]] = [(record["dataset"],
processing.DateTimeConverter.dt_to_str(
record["_id"]["date"], group_type)
)
for record in result]
to_return: dict = dict(zip(_out,
list(zip(*_out_values))
)
)
return to_return