Merge pull request 'feature/GPT' (#12) from feature/GPT into main
Some checks are pending
Python application test / build (push) Waiting to run

Reviewed-on: #12
Reviewed-by: dqy <dqy@noreply.localhost>
Reviewed-by: sangge <sangge@noreply.localhost>
This commit is contained in:
sangge 2024-04-29 18:58:48 +08:00
commit 000146a835
9 changed files with 199 additions and 93 deletions

View File

@ -11,8 +11,11 @@ jobs:
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v2
- uses: https://git.mamahaha.work/actions/checkout@v2
- name: Install dependencies
run: pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
- name: Run tests
run: python -m unittest discover -s tests
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
http_proxy: http://192.168.1.3:10809

66
detection/GPTdetection.py Normal file
View File

@ -0,0 +1,66 @@
import json
import os
from .utils import *
import openai
import signal
class TimeoutException(Exception):
"""Custom exception to handle timeouts."""
pass
def timeout_handler(signum, frame):
"""Handle the SIGALRM signal by raising a TimeoutException."""
raise TimeoutException
def detectGPT(content: str):
api_key = os.getenv("OPENAI_API_KEY")
if api_key is None:
raise ValueError("env OPENAI_API_KEY no set")
# Set alarm timer
signal.signal(signal.SIGTERM, timeout_handler)
signal.alarm(10)
client = openai.OpenAI(api_key=api_key)
text = content
# client = openai.OpenAI(api_key="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") #测试用key
response = client.chat.completions.create(
messages=[
{
"role": "system",
"content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: "
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
"You are required to only output the json format. Do not output any other information.\n",
},
{
"role": "user",
"content": text,
},
],
model="gpt-3.5-turbo",
)
try:
message_content = response.choices[0].message.content
if message_content is None:
raise ValueError("API response content is None")
res_json = json.loads(message_content)
except json.JSONDecodeError:
raise ValueError("Error: Could not parse the response. Please try again.")
except TimeoutException:
raise TimeoutException("The api call timed out")
finally:
signal.alarm(0)
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for res in res_json:
classified_results[res["Risk"]].append(
(res["Line"], text.split("\n")[res["Line"] - 1].strip())
)
return classified_results

View File

@ -0,0 +1,39 @@
import re
from typing import Dict, List, Tuple
from .utils import remove_comments
def find_dangerous_functions(
file_content: str, file_extension: str
) -> Dict[str, List[Tuple[int, str]]]:
patterns = {
".py": {
r"\bsystem\(": "high",
r"\bexec\(": "high",
r"\bpopen\(": "medium",
r"\beval\(": "high",
r"\bsubprocess\.run\(": "medium",
r"\b__getattribute__\(": "high",
r"\bgetattr\(": "medium",
r"\b__import__\(": "high",
},
".js": {
r"\beval\(": "high",
r"\bexec\(": "high",
r"\bchild_process\.exec\(": "high",
},
".cpp": {
r"\bsystem\(": "high",
},
}
risk_patterns = patterns.get(file_extension, {})
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for line_number, line in enumerate(file_content.split("\n"), start=1):
clean_line = remove_comments(line, file_extension)
if not clean_line:
continue
for pattern, risk_level in risk_patterns.items():
if re.search(pattern, clean_line):
classified_results[risk_level].append((line_number, clean_line))
return classified_results

View File

@ -1,74 +1,17 @@
import os
import re
import sys
from typing import Dict, List, Tuple
from reportlab.lib.pagesizes import letter
from reportlab.pdfgen import canvas
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
from reportlab.lib import colors
from .Regexdetection import find_dangerous_functions
from .GPTdetection import detectGPT
from .utils import *
import sys
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"}
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
def read_file_content(file_path: str) -> str:
try:
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
except FileNotFoundError:
print("Error: File not found.")
sys.exit(1)
except IOError:
print("Error: Could not read file.")
sys.exit(1)
def remove_comments(code: str, extension: str) -> str:
if extension == ".py":
return code.split("#")[0].strip()
elif extension in {".js", ".cpp"}:
code = re.sub(r"//.*", "", code)
code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)
return code.strip()
return code.strip()
def find_dangerous_functions(
file_content: str, file_extension: str
) -> Dict[str, List[Tuple[int, str]]]:
patterns = {
".py": {
r"\bsystem\(": "high",
r"\bexec\(": "high",
r"\bpopen\(": "medium",
r"\beval\(": "high",
r"\bsubprocess\.run\(": "medium",
r"\b__getattribute__\(": "high",
r"\bgetattr\(": "medium",
r"\b__import__\(": "high",
},
".js": {
r"\beval\(": "high",
r"\bexec\(": "high",
r"\bchild_process\.exec\(": "high",
},
".cpp": {
r"\bsystem\(": "high",
},
}
risk_patterns = patterns.get(file_extension, {})
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for line_number, line in enumerate(file_content.split("\n"), start=1):
clean_line = remove_comments(line, file_extension)
if not clean_line:
continue
for pattern, risk_level in risk_patterns.items():
if re.search(pattern, clean_line):
classified_results[risk_level].append((line_number, clean_line))
return classified_results
def generate_text_content(results):
text_output = "Security Analysis Report\n"
for risk_level, entries in results.items():
@ -81,7 +24,7 @@ def generate_text_content(results):
def output_results(results, output_format, output_file=None):
if output_file:
file_name, file_extension = os.path.splitext(output_file)
file_name = os.path.splitext(output_file)
if output_format not in OUTPUT_FORMATS:
output_format = "txt"
output_file = f"{file_name}.txt"
@ -174,7 +117,17 @@ def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None):
return text_output
def process_path(path: str, output_format: str, output_file=None):
def checkModeAndDetect(mode: str, filePath: str, fileExtension: str):
# TODO:添加更多方式,这里提高代码的复用性和扩展性
if mode == "regex":
return find_dangerous_functions(read_file_content(filePath), fileExtension)
elif mode == "llm":
return detectGPT(read_file_content(filePath))
else:
return find_dangerous_functions(read_file_content(filePath), fileExtension)
def process_path(path: str, output_format: str, mode: str, output_file=None):
results = {"high": [], "medium": [], "low": [], "none": []}
if os.path.isdir(path):
for root, dirs, files in os.walk(path):
@ -182,9 +135,8 @@ def process_path(path: str, output_format: str, output_file=None):
file_extension = os.path.splitext(file)[1]
if file_extension in SUPPORTED_EXTENSIONS:
file_path = os.path.join(root, file)
file_results = find_dangerous_functions(
read_file_content(file_path), file_extension
)
file_results = checkModeAndDetect(mode, file_path, file_extension)
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
@ -196,9 +148,7 @@ def process_path(path: str, output_format: str, output_file=None):
elif os.path.isfile(path):
file_extension = os.path.splitext(path)[1]
if file_extension in SUPPORTED_EXTENSIONS:
file_results = find_dangerous_functions(
read_file_content(path), file_extension
)
file_results = checkModeAndDetect(mode, path, file_extension)
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
@ -223,6 +173,9 @@ def main():
parser = argparse.ArgumentParser(description="Backdoor detection tool.")
parser.add_argument("path", help="Path to the code to analyze")
parser.add_argument("-o", "--output", help="Output file path", default=None)
parser.add_argument(
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
)
args = parser.parse_args()
output_format = "txt" # Default output format
output_file = None
@ -237,7 +190,8 @@ def main():
"Your input file format was incorrect, the output has been saved as a TXT file."
)
output_file = args.output.rsplit(".", 1)[0] + ".txt"
process_path(args.path, output_format, output_file)
# 如果未指定输出文件,则输出到 stdout否则写入文件
process_path(args.path, output_format, args.mode, output_file)
if __name__ == "__main__":

24
detection/utils.py Normal file
View File

@ -0,0 +1,24 @@
import re
import sys
def read_file_content(file_path: str) -> str:
try:
with open(file_path, "r", encoding="utf-8") as file:
return file.read()
except FileNotFoundError:
print("Error: File not found.")
sys.exit(1)
except IOError:
print("Error: Could not read file.")
sys.exit(1)
def remove_comments(code: str, extension: str) -> str:
if extension == ".py":
return code.split("#")[0].strip()
elif extension in {".js", ".cpp"}:
code = re.sub(r"//.*", "", code)
code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)
return code.strip()
return code.strip()

View File

@ -26,7 +26,7 @@ pip install packaging reportlab
**命令格式**
```bash
python requirements_detection.py <vulnerabilities_file> <requirements_file> -o <output_file>
python requirements_detection.py <vulnerabilities_file> <requirements_file> -o <output_file>
```
**参数说明**
@ -35,6 +35,7 @@ python requirements_detection.py <vulnerabilities_file> <requirements_file> -o <
- `<requirements_file>`: 项目的 `requirements.txt` 文件路径。
- `<output_file>`: 指定输出结果的文件路径和格式,支持的格式有 `.txt`, `.md`, `.html`, `.pdf`
**示例**
```bash
@ -46,18 +47,19 @@ python requirements_detection.py vulnerabilities_data.txt requirements.txt -o ou
**命令格式**
```bash
python backdoor_detection.py <code_path> -o <output_file>
python backdoor_detection.py <code_path> -o <output_file> -m <mode>
```
**参数说明**
- `<code_path>`: 代码文件或目录的路径。
- `<output_file>`: 指定输出结果的文件路径和格式,支持的格式有 `.txt`, `.md`, `.html`, `.pdf`
- `<mode>`: 指定检测模式,目前支持的模式有 `regex``llm`
**示例**
```bash
python backdoor_detection.py ./src -o output/report.pdf
python backdoor_detection.py ./src -o output/report.pdf -m regex
```
## 结果解读

View File

@ -1,2 +1,3 @@
reportlab
packaging
packaging
openai

View File

@ -1,18 +0,0 @@
name: Python application test
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
build:
runs-on: "ubuntu-latest"
steps:
- uses: actions/checkout@v2
- name: Install dependencies
run: pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
- name: Run tests
run: python -m unittest discover -s tests

View File

@ -1,6 +1,9 @@
import unittest
import warnings
from detection.backdoor_detection import find_dangerous_functions
from detection.GPTdetection import detectGPT
import os
class TestBackdoorDetection(unittest.TestCase):
@ -55,6 +58,38 @@ class TestBackdoorDetection(unittest.TestCase):
results["medium"],
)
def test_gpt_risk_detection(self):
if os.getenv("OPENAI_API_KEY") is None:
warnings.warn("OPENAI_API_KEY is not set, test skipped.", UserWarning)
self.skipTest("OPENAI_API_KEY is not set")
content = """import os
os.system('rm -rf /') # high risk
exec('print("Hello")') # high risk
eval('2 + 2') # high risk
"""
results = detectGPT(content)
self.assertEqual(len(results["high"]), 3)
def test_gpt_no_risk_detection(self):
if os.getenv("OPENAI_API_KEY") is None:
warnings.warn("OPENAI_API_KEY is not set, test skipped.", UserWarning)
self.skipTest("OPENAI_API_KEY is not set")
content = """a = 10
b = a + 5
print('This should not be detected as risky.')
"""
results = detectGPT(content)
self.assertEqual(len(results["high"]), 0)
self.assertEqual(len(results["medium"]), 0)
self.assertEqual(len(results["low"]), 0)
def test_gpt_env_no_set(self):
if os.getenv("OPENAI_API_KEY") is not None:
self.skipTest("OPENAI_API_KEY is setted")
content = "print('test test')"
with self.assertRaises(ValueError):
detectGPT(content)
if __name__ == "__main__":
unittest.main()