Skip to content

Commit 65ff85d

Browse files
committed
Make collector more resilient; validate records sequence nr.; enforce v9 version
1 parent 2ac23b8 commit 65ff85d

File tree

5 files changed

+75
-22
lines changed

5 files changed

+75
-22
lines changed

dbutils.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,3 +141,7 @@ def migration_step_2():
141141
def migration_step_3():
142142
with db.cursor() as c:
143143
c.execute(f'CREATE INDEX {DB_PREFIX}records_ts ON {DB_PREFIX}records (ts);')
144+
145+
def migration_step_4():
146+
with db.cursor() as c:
147+
c.execute(f'ALTER TABLE {DB_PREFIX}records DROP COLUMN version;')

docker-compose.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,14 @@ services:
109109
- netflowwriter
110110
volumes:
111111
- shared-grafolean:/shared-grafolean
112+
# To use py-spy:
113+
# - $ docker exec -ti grafolean-newflow-writer bash
114+
# - # pip install py-spy
115+
# - # py-spy -n -o /tmp/prof/out.svg --pid 1
116+
# But first, these 3 lines below must be enabled, to add a volume and capabilities: (careful not to add spaces!)
117+
# - /tmp/prof/:/tmp/prof/
118+
#cap_add:
119+
# - SYS_PTRACE
112120
networks:
113121
- grafolean
114122

netflowbot.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from dbutils import db, DB_PREFIX
1717
from lookup import PROTOCOLS
1818

19-
logging.basicConfig(format='%(asctime)s | %(levelname)s | %(message)s',
19+
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
2020
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG)
2121
logging.addLevelName(logging.DEBUG, color("DBG", 7))
2222
logging.addLevelName(logging.INFO, "INF")
@@ -90,10 +90,12 @@ def perform_job(*args, **job_params):
9090
output_path_prefix = f'entity.{entity_info["entity_id"]}.netflow'
9191

9292
minute_ago = datetime.now() - timedelta(minutes=1)
93+
two_minutes_ago = minute_ago - timedelta(minutes=1)
94+
9395
values = []
9496
# Traffic in and out: (per interface)
95-
values.extend(NetFlowBot.get_values_traffic_in(output_path_prefix, minute_ago))
96-
values.extend(NetFlowBot.get_values_traffic_out(output_path_prefix, minute_ago))
97+
values.extend(NetFlowBot.get_values_traffic_in(output_path_prefix, two_minutes_ago, minute_ago))
98+
values.extend(NetFlowBot.get_values_traffic_out(output_path_prefix, two_minutes_ago, minute_ago))
9799

98100
if not values:
99101
log.warning("No values found to be sent to Grafolean")
@@ -108,7 +110,7 @@ def perform_job(*args, **job_params):
108110
)
109111

110112
@staticmethod
111-
def get_values_traffic_in(output_path_prefix, from_time):
113+
def get_values_traffic_in(output_path_prefix, from_time, to_time):
112114
with db.cursor() as c:
113115
# TODO: missing check for IP: r.client_ip = %s AND
114116
c.execute(f"""
@@ -120,11 +122,12 @@ def get_values_traffic_in(output_path_prefix, from_time):
120122
{DB_PREFIX}flows "f"
121123
WHERE
122124
r.ts >= %s AND
125+
r.ts < %s AND
123126
r.seq = f.record AND
124127
((f.data->'DIRECTION')::integer) = 1
125128
GROUP BY
126129
f.data->'INPUT_SNMP'
127-
""", (from_time,))
130+
""", (from_time, to_time,))
128131

129132
values = []
130133
for interface_index, traffic_bytes in c.fetchall():
@@ -136,7 +139,7 @@ def get_values_traffic_in(output_path_prefix, from_time):
136139
return values
137140

138141
@staticmethod
139-
def get_values_traffic_out(output_path_prefix, from_time):
142+
def get_values_traffic_out(output_path_prefix, from_time, to_time):
140143
with db.cursor() as c:
141144
# TODO: missing check for IP: r.client_ip = %s AND
142145
c.execute(f"""
@@ -148,11 +151,12 @@ def get_values_traffic_out(output_path_prefix, from_time):
148151
{DB_PREFIX}flows "f"
149152
WHERE
150153
r.ts >= %s AND
154+
r.ts < %s AND
151155
r.seq = f.record AND
152156
((f.data->'DIRECTION')::integer) = 1
153157
GROUP BY
154158
f.data->'OUTPUT_SNMP'
155-
""", (from_time,))
159+
""", (from_time, to_time,))
156160

157161
values = []
158162
for interface_index, traffic_bytes in c.fetchall():

netflowcollector.py

Lines changed: 48 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
from pynetflow.main import get_export_packets
1616

1717

18-
logging.basicConfig(format='%(asctime)s | %(levelname)s | %(message)s',
18+
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
1919
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG)
2020
logging.addLevelName(logging.DEBUG, color("DBG", 7))
2121
logging.addLevelName(logging.INFO, "INF")
@@ -26,16 +26,53 @@
2626

2727
def process_netflow(netflow_port, named_pipe_filename):
2828
# endless loop - read netflow packets, encode them to JSON and write them to named pipe:
29-
with open(named_pipe_filename, "wb", 0) as fp:
30-
for ts, client, export in get_export_packets('0.0.0.0', NETFLOW_PORT):
31-
entry = {
32-
"ts": ts,
33-
"client": client,
34-
"version": export.header.version,
35-
"flows": [flow.data for flow in export.flows],
36-
}
37-
line = json.dumps(entry).encode() + b'\n'
38-
fp.write(line)
29+
line = None
30+
last_record_seqs = {}
31+
while True:
32+
try:
33+
with open(named_pipe_filename, "wb", 0) as fp:
34+
# if named pipe threq an error for some reason (BrokenPipe), write the line we
35+
# have in buffer before listening to new packets:
36+
if line is not None:
37+
fp.write(line)
38+
line = None
39+
for ts, client, export in get_export_packets('0.0.0.0', NETFLOW_PORT):
40+
if export.header.version != 9:
41+
log.error(f"Only Netflow v9 currently supported, ignoring record (version: [{export.header.version}])")
42+
continue
43+
44+
client_ip, _ = client
45+
46+
# check for missing records:
47+
last_record_seq = last_record_seqs.get(client_ip)
48+
if last_record_seq is None:
49+
log.warning(f"Last record sequence number is not known, starting with {export.header.sequence}")
50+
elif export.header.sequence != last_record_seq + 1:
51+
log.error(f"Sequence number ({export.header.sequence}) does not follow ({last_record_seq}), some records might have been skipped")
52+
last_record_seqs[client_ip] = export.header.sequence
53+
54+
flows_data = [flow.data for flow in export.flows]
55+
entry = {
56+
"ts": ts,
57+
"client": client_ip,
58+
"flows": [{
59+
"IN_BYTES": data["IN_BYTES"],
60+
"PROTOCOL": data["PROTOCOL"],
61+
"DIRECTION": data["DIRECTION"],
62+
"INPUT_SNMP": data["INPUT_SNMP"],
63+
"L4_DST_PORT": data["L4_DST_PORT"],
64+
"L4_SRC_PORT": data["L4_SRC_PORT"],
65+
"OUTPUT_SNMP": data["OUTPUT_SNMP"],
66+
"IPV4_DST_ADDR": data["IPV4_DST_ADDR"],
67+
"IPV4_SRC_ADDR": data["IPV4_SRC_ADDR"],
68+
} for data in flows_data],
69+
}
70+
line = json.dumps(entry).encode() + b'\n'
71+
fp.write(line)
72+
line = None
73+
except Exception as ex:
74+
log.exception(f"Exception: {str(ex)}")
75+
3976

4077

4178
if __name__ == "__main__":

netflowwriter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from dbutils import migrate_if_needed, db, DB_PREFIX
1717

1818

19-
logging.basicConfig(format='%(asctime)s | %(levelname)s | %(message)s',
19+
logging.basicConfig(format='%(asctime)s.%(msecs)03d | %(levelname)s | %(message)s',
2020
datefmt='%Y-%m-%d %H:%M:%S', level=logging.DEBUG)
2121
logging.addLevelName(logging.DEBUG, color("DBG", 7))
2222
logging.addLevelName(logging.INFO, "INF")
@@ -46,9 +46,9 @@ def process_named_pipe(named_pipe_filename):
4646
def write_record(j):
4747
with db.cursor() as c:
4848
# first save the flow record:
49-
ts = datetime.utcfromtimestamp(j["ts"])
50-
log.info(f"Received record: {ts} from {j['client'][0]}")
51-
c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip, version) VALUES (%s, %s, %s) RETURNING seq;", (ts, j['client'][0], j['version'],))
49+
ts = datetime.utcfromtimestamp(j['ts'])
50+
log.info(f"Received record: {ts} from {j['client']}")
51+
c.execute(f"INSERT INTO {DB_PREFIX}records (ts, client_ip) VALUES (%s, %s) RETURNING seq;", (ts, j['client'],))
5252
record_seq = c.fetchone()[0]
5353

5454
# then save each of the flows within the record, but use execute_values() to perform bulk insert:

0 commit comments

Comments
 (0)