Skip to content

Commit 18a4427

Browse files
committed
Solve performance problems (disable WAL on DB)
1 parent 1616687 commit 18a4427

File tree

3 files changed

+50
-24
lines changed

3 files changed

+50
-24
lines changed

dbutils.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,13 +135,22 @@ def migration_step_1():
135135

136136
def migration_step_2():
137137
with db.cursor() as c:
138-
c.execute(f'CREATE TABLE {DB_PREFIX}records (seq BIGSERIAL NOT NULL PRIMARY KEY, ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, client_ip TEXT, version SMALLSERIAL NOT NULL);')
139-
c.execute(f'CREATE TABLE {DB_PREFIX}flows (record INTEGER NOT NULL REFERENCES {DB_PREFIX}records(seq) ON DELETE CASCADE, data JSONB NOT NULL);')
140-
141-
def migration_step_3():
142-
with db.cursor() as c:
138+
# UNLOGGED: Disabling WAL avoids high I/O load. Since NetFlow data is of temporary nature, this still
139+
# allows us to perform queries, but if the database crashes it is acceptable to lose all of the records.
140+
c.execute(f'CREATE UNLOGGED TABLE {DB_PREFIX}records (seq BIGSERIAL NOT NULL PRIMARY KEY, ts TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, client_ip TEXT);')
143141
c.execute(f'CREATE INDEX {DB_PREFIX}records_ts ON {DB_PREFIX}records (ts);')
144142

145-
def migration_step_4():
146-
with db.cursor() as c:
147-
c.execute(f'ALTER TABLE {DB_PREFIX}records DROP COLUMN version;')
143+
c.execute(f"""
144+
CREATE UNLOGGED TABLE {DB_PREFIX}flows (
145+
record INTEGER NOT NULL REFERENCES {DB_PREFIX}records(seq) ON DELETE CASCADE,
146+
IN_BYTES INTEGER,
147+
PROTOCOL SMALLINT,
148+
DIRECTION SMALLINT,
149+
L4_DST_PORT INTEGER,
150+
L4_SRC_PORT INTEGER,
151+
INPUT_SNMP SMALLINT,
152+
OUTPUT_SNMP SMALLINT,
153+
IPV4_DST_ADDR TEXT,
154+
IPV4_SRC_ADDR TEXT
155+
);
156+
""")

netflowbot.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -117,18 +117,18 @@ def get_values_traffic_in(output_path_prefix, from_time, to_time):
117117
# TODO: missing check for IP: r.client_ip = %s AND
118118
c.execute(f"""
119119
SELECT
120-
f.data->'INPUT_SNMP',
121-
sum((f.data->'IN_BYTES')::integer)
120+
f.INPUT_SNMP,
121+
sum(f.IN_BYTES)
122122
FROM
123123
{DB_PREFIX}records "r",
124124
{DB_PREFIX}flows "f"
125125
WHERE
126126
r.ts >= %s AND
127127
r.ts < %s AND
128128
r.seq = f.record AND
129-
((f.data->'DIRECTION')::integer) = 1
129+
f.DIRECTION = 0
130130
GROUP BY
131-
f.data->'INPUT_SNMP'
131+
f.INPUT_SNMP
132132
""", (from_time, to_time,))
133133

134134
values = []
@@ -146,18 +146,18 @@ def get_values_traffic_out(output_path_prefix, from_time, to_time):
146146
# TODO: missing check for IP: r.client_ip = %s AND
147147
c.execute(f"""
148148
SELECT
149-
f.data->'OUTPUT_SNMP',
150-
sum((f.data->'IN_BYTES')::integer)
149+
f.OUTPUT_SNMP,
150+
sum(f.IN_BYTES)
151151
FROM
152152
{DB_PREFIX}records "r",
153153
{DB_PREFIX}flows "f"
154154
WHERE
155155
r.ts >= %s AND
156156
r.ts < %s AND
157157
r.seq = f.record AND
158-
((f.data->'DIRECTION')::integer) = 1
158+
f.DIRECTION = 1
159159
GROUP BY
160-
f.data->'OUTPUT_SNMP'
160+
f.OUTPUT_SNMP
161161
""", (from_time, to_time,))
162162

163163
values = []
@@ -175,19 +175,19 @@ def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_di
175175
# TODO: missing check for IP: r.client_ip = %s AND
176176
c.execute(f"""
177177
SELECT
178-
f.data->'IPV4_DST_ADDR',
179-
sum((f.data->'IN_BYTES')::integer) "traffic"
178+
f.IPV4_DST_ADDR,
179+
sum(f.IN_BYTES) "traffic"
180180
FROM
181181
netflow_records "r",
182182
netflow_flows "f"
183183
WHERE
184184
r.ts >= %s AND
185185
r.ts < %s AND
186186
r.seq = f.record AND
187-
(f.data->'{'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP'}')::integer = %s AND
188-
(f.data->'DIRECTION')::integer = {'0' if is_direction_in else '1'}
187+
f.{'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP'} = %s AND
188+
f.DIRECTION = {'0' if is_direction_in else '1'}
189189
GROUP BY
190-
f.data->'IPV4_DST_ADDR'
190+
f.IPV4_DST_ADDR
191191
ORDER BY
192192
traffic desc
193193
LIMIT 10;

netflowwriter.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,17 +54,34 @@ def write_record(j):
5454
# then save each of the flows within the record, but use execute_values() to perform bulk insert:
5555
def _get_data(record_seq, flows):
5656
for flow in flows:
57-
yield (record_seq, flow,)
57+
yield (
58+
record_seq,
59+
flow.get('IN_BYTES'),
60+
flow.get('PROTOCOL'),
61+
flow.get('DIRECTION'),
62+
flow.get('L4_DST_PORT'),
63+
flow.get('L4_SRC_PORT'),
64+
flow.get('INPUT_SNMP'),
65+
flow.get('OUTPUT_SNMP'),
66+
flow.get('IPV4_DST_ADDR'),
67+
flow.get('IPV4_SRC_ADDR'),
68+
)
5869
data_iterator = _get_data(record_seq, j['flows'])
59-
psycopg2.extras.execute_values(c, f"INSERT INTO {DB_PREFIX}flows (record, data) VALUES %s", data_iterator, "(%s, %s)", page_size=100)
60-
70+
psycopg2.extras.execute_values(
71+
c,
72+
f"INSERT INTO {DB_PREFIX}flows (record, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s",
73+
data_iterator,
74+
"(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)",
75+
page_size=100
76+
)
6177

6278
if __name__ == "__main__":
6379
NAMED_PIPE_FILENAME = os.environ.get('NAMED_PIPE_FILENAME', None)
6480
if not NAMED_PIPE_FILENAME:
6581
raise Exception("Please specify NAMED_PIPE_FILENAME environment var")
6682

6783
migrate_if_needed()
84+
6885
try:
6986
process_named_pipe(NAMED_PIPE_FILENAME)
7087
except KeyboardInterrupt:

0 commit comments

Comments
 (0)