-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathprocpid.py
More file actions
122 lines (87 loc) · 2.8 KB
/
procpid.py
File metadata and controls
122 lines (87 loc) · 2.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
"""procpid plugin for sysmon"""
import os
import sys
from util.util import (
en_open,
to_bytes,
PROCS,
)
from util.logger import setup_logger
def read_process_status(pid):
"""get pid data, like name, state, vmrss"""
pid_data = {}
try:
with en_open(f"/proc/{pid}/status") as pid_status_file:
pid_file_lines = {}
pid_data["pid"] = pid
for line in pid_status_file:
line = line.split()
key = line[0].rstrip(":").lower()
try:
value = (
" ".join(line[1:][1:]).strip("(").strip(")").title()
if key == "state"
else line[1:][0]
)
except IndexError:
value = "!?!?"
pid_file_lines[key] = value
with open(f"/proc/{pid}/cmdline") as pid_cmdline:
exec_name = (
pid_cmdline.read()
.replace("\x00", " ")
.strip()
.split("/")[-1]
.split(" ")[0]
)
if len(exec_name) > 28:
exec_name = exec_name[:25] + "..."
pid_file_lines["name"] = exec_name
return pid_file_lines
except FileNotFoundError:
pass
class Procpid:
"""
Procpid class - get processes and sort from highest to lowest
based on VmRSS usage
Usage:
call get_data() to get data
returns dict
DO:
NOT CALL print_data(). That function
is intended to be used by sysmon. (might change in the future...?)
CALL close_files() when your program ends
to avoid opened files
"""
def __init__(self):
"""
initializing important stuff
"""
self.logger = setup_logger(__name__)
self.logger.debug("[init] initializing")
def get_data(self):
"""
returns a json dict with data
"""
data = {"processes": []}
process_dirs = [pid for pid in os.listdir("/proc") if pid.isdigit()]
processes = []
for pid in process_dirs:
process_info = read_process_status(pid)
if process_info:
processes.append(process_info)
self.logger.debug("[get_data] sorting")
processes = sorted(
processes, key=lambda x: int(x.get("vmrss", 0)), reverse=True
)
for proc_data in processes[:PROCS]:
data["processes"].append(
{
"name": proc_data["name"],
"pid": int(proc_data["pid"]),
"vmrss": to_bytes(int(proc_data["vmrss"])),
"state": proc_data["state"],
}
)
return data