-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmain.py
More file actions
196 lines (177 loc) · 7.02 KB
/
main.py
File metadata and controls
196 lines (177 loc) · 7.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# -*- coding: utf-8 -*-
"""
semgrep: 多语言静态扫描工具
功能: 代码分析
用法: python3 main.py
"""
import os
import json
import yaml
import shutil
import subprocess
import sys
class Semgrep(object):
def __get_task_params(self):
"""获取需要任务参数
:return:
"""
task_request_file = os.environ.get("TASK_REQUEST")
with open(task_request_file, 'r') as rf:
task_request = json.load(rf)
task_params = task_request["task_params"]
return task_params
def config(self, rules):
"""配置规则
:langs: 语言
:rules: 规则
"""
rules_path = os.path.abspath("./rules")
relpos = len(rules_path) + 1
endsuff = [".yaml", ".yml"]
filelist = []
for dirpath, _, files in os.walk(rules_path):
for filename in files:
if filename.lower().endswith(tuple(endsuff)):
filelist.append(os.path.join(dirpath, filename))
config_rules_path = os.path.abspath("./config_rules")
if os.path.exists(config_rules_path):
shutil.rmtree(config_rules_path)
os.mkdir(config_rules_path)
for single_file in filelist:
rel_path = single_file[relpos:]
file_path = os.path.join(config_rules_path, rel_path)
with open(single_file,'r') as fp:
data = yaml.load(fp, Loader=yaml.FullLoader)
if data:
if data.__contains__('rules'):
for rule_data in data['rules']:
if rule_data["id"] in rules:
if not os.path.exists(os.path.dirname(file_path)):
os.makedirs(os.path.dirname(file_path))
shutil.copy(single_file, file_path)
break
return config_rules_path
def run(self):
"""
:return:
"""
# 代码目录直接从环境变量获取
source_dir = os.environ.get("SOURCE_DIR", None)
print("[debug] source_dir: %s" % source_dir)
# 结果目录直接从环境变量获取
result_dir = os.environ.get("RESULT_DIR", os.getcwd())
# 其他参数从task_request.json文件获取
task_params = self.__get_task_params()
# 环境变量
envs = task_params["envs"]
print("[debug] envs: %s" % envs)
# tca自带python3安装semgrep
# print("[debug] PATH: %s" % new_path_str)
# 过滤路径(通配符)
exclude_path = task_params["path_filters"]["exclusion"]
include_path = task_params["path_filters"]["inclusion"]
# 规则
rules = task_params["rules"]
# ------------------------------------------------------------------ #
# 增量扫描时,可以通过环境变量获取到diff文件列表,只扫描diff文件,减少耗时
# 此处获取到的diff文件列表,已经根据项目配置的过滤路径过滤
# ------------------------------------------------------------------ #
# 从 DIFF_FILES 环境变量中获取增量文件列表存放的文件(全量扫描时没有这个环境变量)
diff_file_json = os.environ.get("DIFF_FILES")
scan_file_json = os.environ.get("SCAN_FILES")
if diff_file_json: # 如果存在 DIFF_FILES, 说明是增量扫描, 直接获取增量文件列表
print("get diff file: %s" % diff_file_json)
with open(diff_file_json, "r") as rf:
diff_files = json.load(rf)
scan_files = [path for path in diff_files]
else:
if os.environ.get("TCA_QUICK_SCAN") and scan_file_json:
print("get scan file: %s" % scan_file_json)
with open(scan_file_json, "r") as rf:
files = json.load(rf)
scan_files = [path for path in files]
else: # 未获取到环境变量,即全量扫描,遍历source_dir获取需要扫描的文件列表
scan_files = [source_dir]
if len(" ".join(scan_files)) > 100000:
scan_files = [source_dir]
if not self.check_tool_version():
return
# 设置配置文件、输出文件和结果文件
config_rules = self.config(rules)
error_output = os.path.join(result_dir, "error_output.json")
result=[]
cmd = [
"semgrep",
"scan",
"--config",
config_rules,
"--disable-version-check",
"--metrics",
"off",
"--no-git-ignore",
"--no-rewrite-rule-ids",
"--json",
"--output",
error_output
]
if include_path:
include = ["--include='%s'" % path for path in include_path]
cmd.extend(include)
if exclude_path:
exclude = ["--exclude='%s'" % path for path in exclude_path]
cmd.extend(exclude)
if not scan_files:
print("[error] File list is empty.")
with open("result.json", "w") as fp:
json.dump(result, fp, indent=2)
return
cmd.extend(scan_files)
scan_cmd = " ".join(cmd)
print("[debug] cmd: %s" % scan_cmd)
# 优化调用方式
subproc = subprocess.Popen(scan_cmd, shell=True)
subproc.communicate()
# subprocess.check_output(cmd)
print("start data handle")
# 数据处理
try:
with open(error_output, "r") as f:
outputs_data = json.load(f)
except:
print("[error] Resulting file not found or cannot be loaded")
with open("result.json", "w") as fp:
json.dump(result, fp, indent=2)
return
if outputs_data:
if(len(outputs_data['errors']) > 0):
error = outputs_data['errors'][0]
print("[error]: %s" % json.dumps(error, indent=2))
for item in outputs_data['results']:
issue = {}
issue['path'] = item['path']
issue['line'] = item['start']['line']
issue['column'] = item['start']['col']
issue['msg'] = item['extra']['message']
rule_name = item['check_id']
if rule_name not in rules:
continue
issue['rule'] = rule_name
issue['refs'] = []
if issue != {}:
result.append(issue)
# 输出结果到指定的json文件
result_path = os.path.join(result_dir, "result.json")
with open(result_path, "w") as fp:
json.dump(result, fp, indent=2)
def check_tool_version(self):
"""
检查semgrep是否安装以及安装版本
"""
if sys.platform in ("win32"):
print("[error] Semgrep can not be installed in windows")
return False
return True
if __name__ == '__main__':
print("-- start run tool ...")
Semgrep().run()
print("-- end ...")