3232 ('4h' , 4 * 3600 ),
3333 ('24h' , 24 * 3600 ),
3434]
35+ TOP_N_MAX = 10
36+
37+
38+ def path_part_encode (s ):
39+ return s .replace ("." , '%2e' )
3540
3641
3742def _get_last_used_seq (job_id ):
@@ -99,7 +104,8 @@ def perform_account_aggr_job(*args, **job_params):
99104 job_id = job_params ["job_id" ]
100105 interval_label = job_params ["interval_label" ]
101106 account_id = job_params ["account_id" ]
102- entities_infos = job_params ["entities_infos" ]
107+ entities = [(entity_info ["entity_id" ], entity_info ["details" ]["ipv4" ],) for entity_info in job_params ["entities_infos" ]]
108+
103109
104110 last_used_seq , last_used_ts = _get_last_used_seq (job_id )
105111 max_seq , max_ts = _get_current_max_seq ()
@@ -112,20 +118,19 @@ def perform_account_aggr_job(*args, **job_params):
112118 return
113119 time_between = float (max_ts - last_used_ts )
114120
121+ # traffic:
115122 values = []
116123 sum_traffic_egress = 0
117124 sum_traffic_ingress = 0
118- for entity_info in entities_infos :
119- entity_id = entity_info ["entity_id" ]
120- entity_ip = entity_info ["details" ]["ipv4" ]
125+ for entity_id , entity_ip in entities :
121126 v , s = NetFlowBot .get_traffic_for_entity (interval_label , last_used_seq , max_seq , time_between , DIRECTION_EGRESS , entity_id , entity_ip )
122127 values .extend (v )
123128 sum_traffic_egress += s
124129 v , s = NetFlowBot .get_traffic_for_entity (interval_label , last_used_seq , max_seq , time_between , DIRECTION_INGRESS , entity_id , entity_ip )
125130 values .extend (v )
126131 sum_traffic_ingress += s
127132
128- # add two values for cumulative sum for the whole account:
133+ # cumulative sum for the whole account:
129134 output_path = NetFlowBot .construct_output_path_prefix (interval_label , DIRECTION_EGRESS , entity_id = None , interface = None )
130135 values .append ({
131136 'p' : output_path ,
@@ -137,6 +142,12 @@ def perform_account_aggr_job(*args, **job_params):
137142 'v' : sum_traffic_ingress / time_between ,
138143 })
139144
145+ # top N IPs:
146+ for entity_id , entity_ip in entities :
147+ v = NetFlowBot .get_top_N_IPs_for_entity_interfaces (interval_label , last_used_seq , max_seq , time_between , DIRECTION_EGRESS , entity_id , entity_ip )
148+ values .extend (v )
149+ v = NetFlowBot .get_top_N_IPs_for_entity_interfaces (interval_label , last_used_seq , max_seq , time_between , DIRECTION_INGRESS , entity_id , entity_ip )
150+ values .extend (v )
140151
141152 if not values :
142153 log .warning ("No values found to be sent to Grafolean" )
@@ -236,38 +247,56 @@ def get_traffic_for_entity(interval_label, last_seq, max_seq, time_between, dire
236247 return values , sum_traffic
237248
238249
239- # @staticmethod
240- # def get_top_N_IPs(output_path_prefix, from_time, to_time, interface_index, is_direction_in=True):
241- # with get_db_cursor() as c:
242- # # TODO: missing check for IP: r.client_ip = %s AND
243- # c.execute(f"""
244- # SELECT
245- # f.IPV4_{'SRC' if is_direction_in else 'DST'}_ADDR,
246- # sum(f.IN_BYTES) "traffic"
247- # FROM
248- # netflow_records "r",
249- # netflow_flows "f"
250- # WHERE
251- # r.ts >= %s AND
252- # r.ts < %s AND
253- # r.seq = f.record AND
254- # f.{'INPUT_SNMP' if is_direction_in else 'OUTPUT_SNMP'} = %s AND
255- # f.DIRECTION = {'0' if is_direction_in else '1'}
256- # GROUP BY
257- # f.IPV4_{'SRC' if is_direction_in else 'DST'}_ADDR
258- # ORDER BY
259- # traffic desc
260- # LIMIT 10;
261- # """, (from_time, to_time, interface_index,))
250+ @staticmethod
251+ def get_top_N_IPs_for_entity_interfaces (interval_label , last_seq , max_seq , time_between , direction , entity_id , entity_ip ):
252+ with get_db_cursor () as c , get_db_cursor () as c2 :
262253
263- # values = []
264- # for top_ip, traffic_bytes in c.fetchall():
265- # output_path = f"{output_path_prefix}.topip.{'in' if is_direction_in else 'out'}.{interface_index}.if{interface_index}.{top_ip}"
266- # values.append({
267- # 'p': output_path,
268- # 'v': traffic_bytes / 60., # Bps
269- # })
270- # return values
254+ values = []
255+ c .execute (f"""
256+ SELECT
257+ distinct(f.{ 'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp' } ) "interface_index"
258+ FROM
259+ netflow_records "r",
260+ netflow_flows "f"
261+ WHERE
262+ r.client_ip = %s AND
263+ r.seq > %s AND
264+ r.seq <= %s AND
265+ r.seq = f.record AND
266+ f.direction = %s
267+ """ , (entity_ip , last_seq , max_seq , direction ,))
268+
269+ for interface_index , in c .fetchall ():
270+ c2 .execute (f"""
271+ SELECT
272+ f.{ 'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr' } ,
273+ sum(f.in_bytes) "traffic"
274+ FROM
275+ netflow_records "r",
276+ netflow_flows "f"
277+ WHERE
278+ r.client_ip = %s AND
279+ r.seq > %s AND
280+ r.seq <= %s AND
281+ r.seq = f.record AND
282+ f.direction = %s AND
283+ f.{ 'input_snmp' if direction == DIRECTION_INGRESS else 'output_snmp' } = %s
284+ GROUP BY
285+ f.{ 'ipv4_src_addr' if direction == DIRECTION_INGRESS else 'ipv4_dst_addr' }
286+ ORDER BY
287+ traffic desc
288+ LIMIT { TOP_N_MAX } ;
289+ """ , (entity_ip , last_seq , max_seq , direction , interface_index ,))
290+
291+ output_path_interface = NetFlowBot .construct_output_path_prefix (interval_label , direction , entity_id , interface = interface_index )
292+ for top_ip , traffic_bytes in c2 .fetchall ():
293+ output_path = f"{ output_path_interface } .topip.{ path_part_encode (top_ip )} "
294+ values .append ({
295+ 'p' : output_path ,
296+ 'v' : traffic_bytes / time_between , # Bps
297+ })
298+
299+ return values
271300
272301 # @staticmethod
273302 # def get_top_N_protocols(output_path_prefix, from_time, to_time, interface_index, is_direction_in=True):
0 commit comments