@@ -48,7 +48,10 @@ def process_named_pipe(named_pipe_filename):
4848 raise
4949
5050 templates = {}
51+ last_record_seqs = {}
5152 last_day_seq = None
53+ buffer = [] # we merge together writes to DB
54+ MAX_BUFFER_SIZE = 5
5255 while True :
5356 with open (named_pipe_filename , "rb" ) as fp :
5457 log .info (f"Opened named pipe { named_pipe_filename } " )
@@ -60,18 +63,34 @@ def process_named_pipe(named_pipe_filename):
6063
6164 try :
6265 data_b64 , ts , client = json .loads (line )
66+ client_ip , _ = client
6367 data = base64 .b64decode (data_b64 )
6468
6569 # sequence number of the (24h) day from UNIX epoch helps us determine the
6670 # DB partition we are working with:
6771 day_seq = int (ts // (24 * 3600 ))
6872 if day_seq != last_day_seq :
69- create_flow_table_partition (day_seq )
73+ write_buffer (buffer , last_day_seq )
74+ ensure_flow_table_partition_exists (day_seq )
7075 last_day_seq = day_seq
7176
7277 try :
7378 export = parse_packet (data , templates )
74- write_record (ts , client , export , day_seq )
79+ log .debug (f"[{ client_ip } ] Received record [{ export .header .sequence } ]: { datetime .utcfromtimestamp (ts )} " )
80+
81+ # check for missing NetFlow records:
82+ last_record_seq = last_record_seqs .get (client_ip )
83+ if last_record_seq is None :
84+ log .warning (f"[{ client_ip } ] Last record sequence number is not known, starting with { export .header .sequence } " )
85+ elif export .header .sequence != last_record_seq + 1 :
86+ log .error (f"[{ client_ip } ] Sequence number ({ export .header .sequence } ) does not follow ({ last_record_seq } ), some records might have been skipped" )
87+ last_record_seqs [client_ip ] = export .header .sequence
88+
89+ # append the record to a buffer and write to DB when buffer is full enough:
90+ buffer .append ((ts , client_ip , export ,))
91+ if len (buffer ) > MAX_BUFFER_SIZE :
92+ write_buffer (buffer , day_seq )
93+ buffer = []
7594 except UnknownNetFlowVersion :
7695 log .warning ("Unknown NetFlow version" )
7796 continue
@@ -85,7 +104,7 @@ def process_named_pipe(named_pipe_filename):
85104
86105
87106# Based on timestamp, make sure that the partition exists:
88- def create_flow_table_partition (day_seq ):
107+ def ensure_flow_table_partition_exists (day_seq ):
89108 day_start = day_seq * (24 * 3600 )
90109 day_end = day_start + (24 * 3600 )
91110 with get_db_cursor () as c :
@@ -96,10 +115,7 @@ def create_flow_table_partition(day_seq):
96115 return day_seq
97116
98117
99- last_record_seqs = {}
100-
101-
102- def write_record (ts , client , export , day_seq ):
118+ def write_buffer (buffer , day_seq ):
103119 # {
104120 # "DST_AS": 0,
105121 # "SRC_AS": 0,
@@ -125,81 +141,73 @@ def write_record(ts, client, export, day_seq):
125141 # }
126142 # https://www.cisco.com/en/US/technologies/tk648/tk362/technologies_white_paper09186a00800a3db9.html#wp9001622
127143
128- client_ip , _ = client
129-
130- # check for missing NetFlow records:
131- last_record_seq = last_record_seqs .get (client_ip )
132- if last_record_seq is None :
133- log .warning (f"[{ client_ip } ] Last record sequence number is not known, starting with { export .header .sequence } " )
134- elif export .header .sequence != last_record_seq + 1 :
135- log .error (f"[{ client_ip } ] Sequence number ({ export .header .sequence } ) does not follow ({ last_record_seq } ), some records might have been skipped" )
136- last_record_seqs [client_ip ] = export .header .sequence
137144
138- log .debug (f"[ { client_ip } ] Received record [ { export . header . sequence } ]: { datetime . utcfromtimestamp ( ts ) } " )
145+ log .debug (f"Writing { len ( buffer ) } records to DB for day { day_seq } " )
139146 with get_db_cursor () as c :
140147 # save each of the flows within the record, but use execute_values() to perform bulk insert:
141- def _get_data (netflow_version , ts , client_ip , flows ):
142- if netflow_version == 9 :
143- for f in flows :
144- yield (
145- ts ,
146- client_ip ,
147- # "IN_BYTES":
148- f .data ["IN_BYTES" ],
149- # "PROTOCOL":
150- f .data ["PROTOCOL" ],
151- # "DIRECTION":
152- f .data ["DIRECTION" ],
153- # "L4_DST_PORT":
154- f .data ["L4_DST_PORT" ],
155- # "L4_SRC_PORT":
156- f .data ["L4_SRC_PORT" ],
157- # "INPUT_SNMP":
158- f .data ["INPUT_SNMP" ],
159- # "OUTPUT_SNMP":
160- f .data ["OUTPUT_SNMP" ],
161- # "IPV4_DST_ADDR":
162- f .data ["IPV4_DST_ADDR" ],
163- # "IPV4_SRC_ADDR":
164- f .data ["IPV4_SRC_ADDR" ],
165- )
166- elif netflow_version == 5 :
167- for f in flows :
168- yield (
169- ts ,
170- client_ip ,
171- # "IN_BYTES":
172- f .data ["IN_OCTETS" ],
173- # "PROTOCOL":
174- f .data ["PROTO" ],
175- # "DIRECTION":
176- DIRECTION_INGRESS ,
177- # "L4_DST_PORT":
178- f .data ["DST_PORT" ],
179- # "L4_SRC_PORT":
180- f .data ["SRC_PORT" ],
181- # "INPUT_SNMP":
182- f .data ["INPUT" ],
183- # "OUTPUT_SNMP":
184- f .data ["OUTPUT" ],
185- # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
186- # them back to bytes and transform them to strings:
187- # "IPV4_DST_ADDR":
188- socket .inet_ntoa (struct .pack ('!I' , f .data ["IPV4_DST_ADDR" ])),
189- # "IPV4_SRC_ADDR":
190- socket .inet_ntoa (struct .pack ('!I' , f .data ["IPV4_SRC_ADDR" ])),
191- )
192- else :
193- log .error (f"[{ client_ip } ] Only Netflow v5 and v9 currently supported, ignoring record (version: [{ export .header .version } ])" )
194- return
195-
196- data_iterator = _get_data (export .header .version , ts , client_ip , export .flows )
148+ def _get_data (buffer ):
149+ for ts , client_ip , export in buffer :
150+ netflow_version , flows = export .header .version , export .flows
151+ if netflow_version == 9 :
152+ for f in flows :
153+ yield (
154+ ts ,
155+ client_ip ,
156+ # "IN_BYTES":
157+ f .data ["IN_BYTES" ],
158+ # "PROTOCOL":
159+ f .data ["PROTOCOL" ],
160+ # "DIRECTION":
161+ f .data ["DIRECTION" ],
162+ # "L4_DST_PORT":
163+ f .data ["L4_DST_PORT" ],
164+ # "L4_SRC_PORT":
165+ f .data ["L4_SRC_PORT" ],
166+ # "INPUT_SNMP":
167+ f .data ["INPUT_SNMP" ],
168+ # "OUTPUT_SNMP":
169+ f .data ["OUTPUT_SNMP" ],
170+ # "IPV4_DST_ADDR":
171+ f .data ["IPV4_DST_ADDR" ],
172+ # "IPV4_SRC_ADDR":
173+ f .data ["IPV4_SRC_ADDR" ],
174+ )
175+ elif netflow_version == 5 :
176+ for f in flows :
177+ yield (
178+ ts ,
179+ client_ip ,
180+ # "IN_BYTES":
181+ f .data ["IN_OCTETS" ],
182+ # "PROTOCOL":
183+ f .data ["PROTO" ],
184+ # "DIRECTION":
185+ DIRECTION_INGRESS ,
186+ # "L4_DST_PORT":
187+ f .data ["DST_PORT" ],
188+ # "L4_SRC_PORT":
189+ f .data ["SRC_PORT" ],
190+ # "INPUT_SNMP":
191+ f .data ["INPUT" ],
192+ # "OUTPUT_SNMP":
193+ f .data ["OUTPUT" ],
194+ # netflow v5 IP addresses are decoded to integers, which is less suitable for us - pack
195+ # them back to bytes and transform them to strings:
196+ # "IPV4_DST_ADDR":
197+ socket .inet_ntoa (struct .pack ('!I' , f .data ["IPV4_DST_ADDR" ])),
198+ # "IPV4_SRC_ADDR":
199+ socket .inet_ntoa (struct .pack ('!I' , f .data ["IPV4_SRC_ADDR" ])),
200+ )
201+ else :
202+ log .error (f"[{ client_ip } ] Only Netflow v5 and v9 currently supported, ignoring record (version: [{ export .header .version } ])" )
203+
204+ data_iterator = _get_data (buffer )
197205 psycopg2 .extras .execute_values (
198206 c ,
199207 f"INSERT INTO { DB_PREFIX } flows_{ day_seq } (ts, client_ip, IN_BYTES, PROTOCOL, DIRECTION, L4_DST_PORT, L4_SRC_PORT, INPUT_SNMP, OUTPUT_SNMP, IPV4_DST_ADDR, IPV4_SRC_ADDR) VALUES %s" ,
200208 data_iterator ,
201209 "(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)" ,
202- page_size = 100
210+ page_size = 500
203211 )
204212
205213if __name__ == "__main__" :
0 commit comments