-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode_fixer.py
More file actions
227 lines (198 loc) · 8.56 KB
/
code_fixer.py
File metadata and controls
227 lines (198 loc) · 8.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
CodeSense - Code Fixer
Generates specific, actionable fix suggestions with before/after code diffs.
"""
import ast
import re
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from logger import get_logger
logger = get_logger(__name__)
@dataclass
class Fix:
issue_type: str
line: int
description: str
before: str
after: str
explanation: str
confidence: float # 0–1: how certain the fix is correct
is_safe: bool # True = safe auto-apply; False = requires review
class PythonFixer:
"""Generates Python-specific code fixes."""
def suggest_fixes(self, code: str, issues: List[Dict]) -> List[Fix]:
fixes: List[Fix] = []
lines = code.splitlines()
for issue in issues:
issue_type = issue.get("type", "")
line_no = issue.get("line", 0)
raw_line = lines[line_no - 1] if 0 < line_no <= len(lines) else ""
fix = None
if issue_type == "UnusedImport":
fix = self._fix_unused_import(raw_line, line_no, issue.get("symbol", ""))
elif issue_type == "MutableDefault":
fix = self._fix_mutable_default(raw_line, line_no, issue.get("symbol", ""))
elif issue_type == "BareExcept":
fix = self._fix_bare_except(raw_line, line_no)
elif issue_type == "NamingConvention":
fix = self._fix_naming(raw_line, line_no, issue.get("message", ""))
elif issue_type in ("eval_usage", "exec_usage"):
fix = self._fix_eval(raw_line, line_no, issue_type)
elif issue_type == "weak_hash":
fix = self._fix_weak_hash(raw_line, line_no)
elif issue_type == "weak_random":
fix = self._fix_weak_random(raw_line, line_no)
elif issue_type == "yaml_unsafe_load":
fix = self._fix_yaml_load(raw_line, line_no)
elif issue_type == "sql_injection":
fix = self._fix_sql_injection(raw_line, line_no)
if fix:
fixes.append(fix)
return fixes
# ── Individual fixers ────────────────────────────────────────────────────
def _fix_unused_import(self, line: str, lineno: int, symbol: str) -> Fix:
return Fix(
issue_type="UnusedImport",
line=lineno,
description=f"Remove unused import '{symbol}'",
before=line,
after="", # Line removed
explanation=f"'{symbol}' is imported but never used. Removing it reduces clutter and startup time.",
confidence=0.90,
is_safe=True,
)
def _fix_mutable_default(self, line: str, lineno: int, fn_name: str) -> Fix:
# def fn(items=[]) → def fn(items=None)
fixed = re.sub(r"=\s*(\[\]|\{\}|set\(\))", "=None", line)
return Fix(
issue_type="MutableDefault",
line=lineno,
description=f"Replace mutable default argument in '{fn_name}'",
before=line,
after=fixed,
explanation=(
"Mutable defaults (list/dict/set) are shared across all calls.\n"
"Replace with None and assign inside the function:\n"
" if items is None:\n items = []"
),
confidence=0.95,
is_safe=False, # Requires adding guard inside function
)
def _fix_bare_except(self, line: str, lineno: int) -> Fix:
fixed = line.replace("except:", "except Exception as e:")
return Fix(
issue_type="BareExcept",
line=lineno,
description="Replace bare 'except:' with specific exception",
before=line,
after=fixed,
explanation="Bare 'except:' catches everything including SystemExit. Use 'except Exception as e:' at minimum.",
confidence=0.85,
is_safe=False,
)
def _fix_naming(self, line: str, lineno: int, message: str) -> Fix:
if "snake_case" in message:
m = re.search(r"def\s+([A-Z][a-zA-Z0-9]*)", line)
if m:
old = m.group(1)
new = re.sub(r"([A-Z])", lambda x: f"_{x.group(1).lower()}", old).lstrip("_")
fixed = line.replace(f"def {old}", f"def {new}")
return Fix(
issue_type="NamingConvention",
line=lineno,
description=f"Rename function '{old}' to snake_case '{new}'",
before=line, after=fixed,
explanation="PEP 8 requires function names to use snake_case.",
confidence=0.80, is_safe=False,
)
return None
def _fix_eval(self, line: str, lineno: int, issue_type: str) -> Fix:
return Fix(
issue_type=issue_type,
line=lineno,
description="Replace eval/exec with a safer alternative",
before=line,
after=f"# TODO: Replace eval/exec — consider ast.literal_eval() for data, or refactor logic\n{line}",
explanation=(
"eval() and exec() execute arbitrary code and are a major security risk.\n"
"Use ast.literal_eval() for parsing literals, or refactor to avoid dynamic execution."
),
confidence=0.50,
is_safe=False,
)
def _fix_weak_hash(self, line: str, lineno: int) -> Fix:
fixed = re.sub(r"hashlib\.(md5|sha1)", "hashlib.sha256", line)
return Fix(
issue_type="weak_hash",
line=lineno,
description="Upgrade MD5/SHA1 to SHA-256",
before=line, after=fixed,
explanation="MD5 and SHA-1 are cryptographically broken. Use SHA-256 or better.",
confidence=0.90, is_safe=True,
)
def _fix_weak_random(self, line: str, lineno: int) -> Fix:
fixed = line.replace("random.", "secrets.")
return Fix(
issue_type="weak_random",
line=lineno,
description="Replace random with secrets for security-sensitive use",
before=line, after=fixed,
explanation="Use the 'secrets' module for passwords, tokens, and keys.",
confidence=0.75, is_safe=False,
)
def _fix_yaml_load(self, line: str, lineno: int) -> Fix:
fixed = re.sub(r"yaml\.load\s*\(([^)]+)\)",
r"yaml.safe_load(\1)", line)
return Fix(
issue_type="yaml_unsafe_load",
line=lineno,
description="Replace yaml.load() with yaml.safe_load()",
before=line, after=fixed,
explanation="yaml.load() can execute arbitrary Python. yaml.safe_load() restricts to basic types.",
confidence=0.95, is_safe=True,
)
def _fix_sql_injection(self, line: str, lineno: int) -> Fix:
return Fix(
issue_type="sql_injection",
line=lineno,
description="Use parameterised SQL queries instead of string concatenation",
before=line,
after='# Use: cursor.execute("SELECT * FROM users WHERE id = ?", (user_id,))',
explanation=(
"String concatenation in SQL enables injection attacks.\n"
"Always use parameterised queries:\n"
" cursor.execute('SELECT * FROM t WHERE x = ?', (value,))"
),
confidence=0.85, is_safe=False,
)
class CodeFixer:
"""Dispatcher: selects the right language-specific fixer."""
_fixers = {
"python": PythonFixer(),
}
def suggest_fixes(self, code: str, language: str,
semantic_issues: List[Dict],
security_issues: List[Dict]) -> List[Dict]:
"""
Generate fix suggestions for a set of issues.
Returns:
List of serialisable fix dicts.
"""
fixer = self._fixers.get(language.lower())
if not fixer:
return []
all_issues = semantic_issues + security_issues
fixes = fixer.suggest_fixes(code, all_issues)
return [
{
"issue_type": f.issue_type,
"line": f.line,
"description": f.description,
"before": f.before,
"after": f.after,
"explanation": f.explanation,
"confidence": f.confidence,
"is_safe": f.is_safe,
}
for f in fixes if f is not None
]