1818from colors import color
1919
2020from lookup import PROTOCOLS
21- from dbutils import migrate_if_needed , get_db_cursor , DB_PREFIX
21+ from dbutils import migrate_if_needed , get_db_cursor , DB_PREFIX , S_PER_PARTITION
2222from lookup import DIRECTION_INGRESS
2323
2424
@@ -86,7 +86,7 @@ def process_named_pipe(named_pipe_filename):
8686
8787 templates = {}
8888 last_record_seqs = {}
89- last_day_seq = None
89+ last_partition_no = None
9090 buffer = [] # we merge together writes to DB
9191 MAX_BUFFER_SIZE = 5
9292 while True :
@@ -105,11 +105,11 @@ def process_named_pipe(named_pipe_filename):
105105
106106 # sequence number of the (24h) day from UNIX epoch helps us determine the
107107 # DB partition we are working with:
108- day_seq = int (ts // ( 24 * 3600 ) )
109- if day_seq != last_day_seq :
110- write_buffer (buffer , last_day_seq )
111- ensure_flow_table_partition_exists (day_seq )
112- last_day_seq = day_seq
108+ partition_no = int (ts // S_PER_PARTITION )
109+ if partition_no != last_partition_no :
110+ write_buffer (buffer , last_partition_no )
111+ ensure_flow_table_partition_exists (partition_no )
112+ last_partition_no = partition_no
113113
114114 try :
115115 export = parse_packet (data , templates )
@@ -126,7 +126,7 @@ def process_named_pipe(named_pipe_filename):
126126 # append the record to a buffer and write to DB when buffer is full enough:
127127 buffer .append ((ts , client_ip , export ,))
128128 if len (buffer ) > MAX_BUFFER_SIZE :
129- write_buffer (buffer , day_seq )
129+ write_buffer (buffer , partition_no )
130130 buffer = []
131131 except UnknownNetFlowVersion :
132132 log .warning ("Unknown NetFlow version" )
@@ -141,18 +141,18 @@ def process_named_pipe(named_pipe_filename):
141141
142142
143143# Based on timestamp, make sure that the partition exists:
144- def ensure_flow_table_partition_exists (day_seq ):
145- day_start = day_seq * ( 24 * 3600 )
146- day_end = day_start + ( 24 * 3600 )
144+ def ensure_flow_table_partition_exists (partition_no ):
145+ ts_start = partition_no * S_PER_PARTITION
146+ ts_end = ts_start + S_PER_PARTITION
147147 with get_db_cursor () as c :
148148 # "When creating a range partition, the lower bound specified with FROM is an inclusive bound, whereas
149149 # the upper bound specified with TO is an exclusive bound."
150150 # https://www.postgresql.org/docs/12/sql-createtable.html
151- c .execute (f"CREATE UNLOGGED TABLE IF NOT EXISTS { DB_PREFIX } flows_{ day_seq } PARTITION OF { DB_PREFIX } flows FOR VALUES FROM ({ day_start } ) TO ({ day_end } )" )
152- return day_seq
151+ c .execute (f"CREATE UNLOGGED TABLE IF NOT EXISTS { DB_PREFIX } flows_{ partition_no } PARTITION OF { DB_PREFIX } flows FOR VALUES FROM ({ ts_start } ) TO ({ ts_end } )" )
152+ return partition_no
153153
154154
155- def write_buffer (buffer , day_seq ):
155+ def write_buffer (buffer , partition_no ):
156156 # {
157157 # "DST_AS": 0,
158158 # "SRC_AS": 0,
@@ -179,7 +179,7 @@ def write_buffer(buffer, day_seq):
179179 # https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
180180
181181
182- log .debug (f"Writing { len (buffer )} records to DB for day { day_seq } " )
182+ log .debug (f"Writing { len (buffer )} records to DB, partition { partition_no } " )
183183 # save each of the flows within the record, but use execute_values() to perform bulk insert:
184184 def _get_data (buffer ):
185185 for ts , client_ip , export in buffer :
@@ -228,7 +228,7 @@ def _get_data(buffer):
228228 # "OUTPUT_SNMP":
229229 f .data ["OUTPUT" ],
230230 # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
231- # them back to bytes and transform them to strings :
231+ # them back to bytes:
232232 # "IPV4_DST_ADDR":
233233 struct .pack ('!I' , f .data ["IPV4_DST_ADDR" ]),
234234 # "IPV4_SRC_ADDR":
0 commit comments