@@ -144,10 +144,11 @@ def perform_account_aggr_job(*args, **job_params):
144144
145145 # top N IPs:
146146 for entity_id , entity_ip in entities :
147- v = NetFlowBot .get_top_N_IPs_for_entity_interfaces (interval_label , last_used_seq , max_seq , time_between , DIRECTION_EGRESS , entity_id , entity_ip )
148- values .extend (v )
149- v = NetFlowBot .get_top_N_IPs_for_entity_interfaces (interval_label , last_used_seq , max_seq , time_between , DIRECTION_INGRESS , entity_id , entity_ip )
150- values .extend (v )
147+ for direction in [DIRECTION_EGRESS , DIRECTION_INGRESS ]:
148+ values .extend (NetFlowBot .get_top_N_IPs_for_entity (interval_label , last_used_seq , max_seq , time_between , direction , entity_id , entity_ip ))
149+ values .extend (NetFlowBot .get_top_N_IPs_for_entity_interfaces (interval_label , last_used_seq , max_seq , time_between , direction , entity_id , entity_ip ))
150+ values .extend (NetFlowBot .get_top_N_protocols_for_entity (interval_label , last_used_seq , max_seq , time_between , direction , entity_id , entity_ip ))
151+ values .extend (NetFlowBot .get_top_N_protocols_for_entity_interfaces (interval_label , last_used_seq , max_seq , time_between , direction , entity_id , entity_ip ))
151152
152153 if not values :
153154 log .warning ("No values found to be sent to Grafolean" )
@@ -298,6 +299,126 @@ def get_top_N_IPs_for_entity_interfaces(interval_label, last_seq, max_seq, time_
298299
299300 return values
300301
302+ @staticmethod
303+ def get_top_N_IPs_for_entity (interval_label , last_seq , max_seq , time_between , direction , entity_id , entity_ip ):
304+ with get_db_cursor () as c :
305+ values = []
306+ c .execute (f"""
307+ SELECT
308+ f.{ 'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr' } ,
309+ sum(f.in_bytes) "traffic"
310+ FROM
311+ netflow_records "r",
312+ netflow_flows "f"
313+ WHERE
314+ r.client_ip = %s AND
315+ r.seq > %s AND
316+ r.seq <= %s AND
317+ r.seq = f.record AND
318+ f.direction = %s
319+ GROUP BY
320+ f.{ 'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr' }
321+ ORDER BY
322+ traffic desc
323+ LIMIT { TOP_N_MAX } ;
324+ """ , (entity_ip , last_seq , max_seq , direction ,))
325+
326+ output_path_entity = NetFlowBot .construct_output_path_prefix (interval_label , direction , entity_id , interface = None )
327+ for top_ip , traffic_bytes in c .fetchall ():
328+ output_path = f"{ output_path_entity } .topip.{ path_part_encode (top_ip )} "
329+ values .append ({
330+ 'p' : output_path ,
331+ 'v' : traffic_bytes / time_between , # Bps
332+ })
333+
334+ return values
335+
336+
337+ @staticmethod
338+ def get_top_N_protocols_for_entity_interfaces (interval_label , last_seq , max_seq , time_between , direction , entity_id , entity_ip ):
339+ with get_db_cursor () as c , get_db_cursor () as c2 :
340+
341+ values = []
342+ c .execute (f"""
343+ SELECT
344+ distinct(f.{ 'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp' } ) "interface_index"
345+ FROM
346+ netflow_records "r",
347+ netflow_flows "f"
348+ WHERE
349+ r.client_ip = %s AND
350+ r.seq > %s AND
351+ r.seq <= %s AND
352+ r.seq = f.record AND
353+ f.direction = %s
354+ """ , (entity_ip , last_seq , max_seq , direction ,))
355+
356+ for interface_index , in c .fetchall ():
357+ c2 .execute (f"""
358+ SELECT
359+ f.protocol,
360+ sum(f.in_bytes) "traffic"
361+ FROM
362+ netflow_records "r",
363+ netflow_flows "f"
364+ WHERE
365+ r.client_ip = %s AND
366+ r.seq > %s AND
367+ r.seq <= %s AND
368+ r.seq = f.record AND
369+ f.direction = %s AND
370+ f.{ 'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp' } = %s
371+ GROUP BY
372+ f.protocol
373+ ORDER BY
374+ traffic desc
375+ LIMIT { TOP_N_MAX } ;
376+ """ , (entity_ip , last_seq , max_seq , direction , interface_index ,))
377+
378+ output_path_interface = NetFlowBot .construct_output_path_prefix (interval_label , direction , entity_id , interface = interface_index )
379+ for protocol , traffic_bytes in c2 .fetchall ():
380+ output_path = f"{ output_path_interface } .topprotocol.{ path_part_encode (PROTOCOLS [protocol ])} "
381+ values .append ({
382+ 'p' : output_path ,
383+ 'v' : traffic_bytes / time_between , # Bps
384+ })
385+
386+ return values
387+
388+ @staticmethod
389+ def get_top_N_protocols_for_entity (interval_label , last_seq , max_seq , time_between , direction , entity_id , entity_ip ):
390+ with get_db_cursor () as c :
391+ values = []
392+ c .execute (f"""
393+ SELECT
394+ f.protocol,
395+ sum(f.in_bytes) "traffic"
396+ FROM
397+ netflow_records "r",
398+ netflow_flows "f"
399+ WHERE
400+ r.client_ip = %s AND
401+ r.seq > %s AND
402+ r.seq <= %s AND
403+ r.seq = f.record AND
404+ f.direction = %s
405+ GROUP BY
406+ f.protocol
407+ ORDER BY
408+ traffic desc
409+ LIMIT { TOP_N_MAX } ;
410+ """ , (entity_ip , last_seq , max_seq , direction ,))
411+
412+ output_path_entity = NetFlowBot .construct_output_path_prefix (interval_label , direction , entity_id , interface = None )
413+ for protocol , traffic_bytes in c .fetchall ():
414+ output_path = f"{ output_path_entity } .topprotocol.{ path_part_encode (PROTOCOLS [protocol ])} "
415+ values .append ({
416+ 'p' : output_path ,
417+ 'v' : traffic_bytes / time_between , # Bps
418+ })
419+
420+ return values
421+
301422 # @staticmethod
302423 # def get_top_N_protocols(output_path_prefix, from_time, to_time, interface_index, is_direction_in=True):
303424 # with get_db_cursor() as c:
0 commit comments