11import argparse
22import base64
3+ from datetime import datetime , timedelta
34import gzip
45import json
56import logging
@@ -47,6 +48,7 @@ def process_named_pipe(named_pipe_filename):
4748 raise
4849
4950 templates = {}
51+ last_day_seq = None
5052 while True :
5153 with open (named_pipe_filename , "rb" ) as fp :
5254 log .info (f"Opened named pipe { named_pipe_filename } " )
@@ -60,9 +62,16 @@ def process_named_pipe(named_pipe_filename):
6062 data_b64 , ts , client = json .loads (line )
6163 data = base64 .b64decode (data_b64 )
6264
65+ # sequence number of the (24h) day from UNIX epoch helps us determine the
66+ # DB partition we are working with:
67+ day_seq = int (ts // (24 * 3600 ))
68+ if day_seq != last_day_seq :
69+ create_flow_table_partition (day_seq )
70+ last_day_seq = day_seq
71+
6372 try :
6473 export = parse_packet (data , templates )
65- write_record (ts , client , export )
74+ write_record (ts , client , export , day_seq )
6675 except UnknownNetFlowVersion :
6776 log .warning ("Unknown NetFlow version" )
6877 continue
@@ -75,10 +84,22 @@ def process_named_pipe(named_pipe_filename):
7584 log .exception ("Error writing line, skipping..." )
7685
7786
87+ # Based on timestamp, make sure that the partition exists:
88+ def create_flow_table_partition (day_seq ):
89+ day_start = day_seq * (24 * 3600 )
90+ day_end = day_start + (24 * 3600 )
91+ with get_db_cursor () as c :
92+ # "When creating a range partition, the lower bound specified with FROM is an inclusive bound, whereas
93+ # the upper bound specified with TO is an exclusive bound."
94+ # https://www.postgresql.org/docs/12/sql-createtable.html
95+ c .execute (f"CREATE UNLOGGED TABLE IF NOT EXISTS { DB_PREFIX } flows_{ day_seq } PARTITION OF { DB_PREFIX } flows FOR VALUES FROM ({ day_start } ) TO ({ day_end } )" )
96+ return day_seq
97+
98+
7899last_record_seqs = {}
79100
80101
81- def write_record (ts , client , export ):
102+ def write_record (ts , client , export , day_seq ):
82103 # {
83104 # "DST_AS": 0,
84105 # "SRC_AS": 0,
@@ -175,7 +196,7 @@ def _get_data(netflow_version, ts, client_ip, flows):
175196 data_iterator = _get_data (export .header .version , ts , client_ip , export .flows )
176197 psycopg2 .extras .execute_values (
177198 c ,
178- f"INSERT INTO { DB_PREFIX } flows (ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s" ,
199+ f"INSERT INTO { DB_PREFIX } flows_ { day_seq } (ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s" ,
179200 data_iterator ,
180201 "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" ,
181202 page_size = 100
0 commit comments