feature/package-development #22
1
.gitattributes
vendored
Normal file
1
.gitattributes
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
*.webp filter=lfs diff=lfs merge=lfs -text
|
@ -1,5 +1,6 @@
|
|||||||
# BackDoorBuster
|
# BackDoorBuster
|
||||||
|
|
||||||
|

|
||||||
## 项目背景
|
## 项目背景
|
||||||
|
|
||||||
随着网络安全威胁的增加,恶意软件和后门的检测成为了保护个人和组织数据安全的重要任务。后门通常被隐藏在合法软件中,给黑客提供远程控制目标系统的能力。本项目旨在开发一个工具,能够有效识别和评估潜在的后门风险。
|
随着网络安全威胁的增加,恶意软件和后门的检测成为了保护个人和组织数据安全的重要任务。后门通常被隐藏在合法软件中,给黑客提供远程控制目标系统的能力。本项目旨在开发一个工具,能够有效识别和评估潜在的后门风险。
|
||||||
|
BIN
banner.webp
(Stored with Git LFS)
Normal file
BIN
banner.webp
(Stored with Git LFS)
Normal file
Binary file not shown.
113
detection/cngptdetection.py
Normal file
113
detection/cngptdetection.py
Normal file
@ -0,0 +1,113 @@
|
|||||||
|
import os
|
||||||
|
import requests
|
||||||
|
import re
|
||||||
|
import json
|
||||||
|
from typing import List, Dict, Any
|
||||||
|
|
||||||
|
|
||||||
|
class TimeoutException(Exception):
|
||||||
|
"""自定义异常用于处理超时情况。"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def detectGPT(content: str) -> str:
|
||||||
|
"""
|
||||||
|
检测给定的代码内容中的潜在安全漏洞。
|
||||||
|
|
||||||
|
参数:
|
||||||
|
- content: 要检测的代码字符串。
|
||||||
|
|
||||||
|
返回:
|
||||||
|
- 分类后的漏洞信息的JSON字符串。
|
||||||
|
"""
|
||||||
|
api_key = os.getenv("BAIDU_API_KEY")
|
||||||
|
secret_key = os.getenv("BAIDU_SECRET_KEY")
|
||||||
|
#api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa"
|
||||||
|
#secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
|
||||||
|
if not api_key or not secret_key:
|
||||||
|
raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||||
|
|
||||||
|
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token(
|
||||||
|
api_key, secret_key)
|
||||||
|
|
||||||
|
payload = json.dumps({
|
||||||
|
"messages": [
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": (
|
||||||
|
"You are a Python code reviewer. Read the code below and identify any potential "
|
||||||
|
"security vulnerabilities. Classify them by risk level (high, medium, low, none). "
|
||||||
|
'Only report the line number and the risk level.\nYou should output the result as '
|
||||||
|
'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] '
|
||||||
|
"Each of these three fields is required.\nYou are required to only output the json format. "
|
||||||
|
"Do not output any other information." + content
|
||||||
|
)
|
||||||
|
}
|
||||||
|
]
|
||||||
|
})
|
||||||
|
headers = {
|
||||||
|
'Content-Type': 'application/json'
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
response = requests.post(url, headers=headers, data=payload)
|
||||||
|
response.raise_for_status()
|
||||||
|
res_json = response.json()
|
||||||
|
message_content = res_json.get('result')
|
||||||
|
if message_content is None:
|
||||||
|
raise ValueError("API response content is None")
|
||||||
|
except requests.RequestException as e:
|
||||||
|
raise ValueError(f"Request failed: {str(e)}")
|
||||||
|
|
||||||
|
extracted_data = extract_json_from_text(message_content)
|
||||||
|
|
||||||
|
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||||
|
for res in extracted_data:
|
||||||
|
try:
|
||||||
|
line_number = int(res["Line"])
|
||||||
|
classified_results[res["Risk"]].append(
|
||||||
|
(line_number, content.split("\n")[line_number - 1].strip())
|
||||||
|
)
|
||||||
|
except (ValueError, IndexError, KeyError):
|
||||||
|
continue
|
||||||
|
|
||||||
|
return json.dumps(classified_results, indent=2, ensure_ascii=False)
|
||||||
|
|
||||||
|
|
||||||
|
def get_access_token(api_key: str, secret_key: str) -> str:
|
||||||
|
"""
|
||||||
|
使用API密钥和秘密生成访问令牌。
|
||||||
|
|
||||||
|
返回:
|
||||||
|
- access_token字符串。
|
||||||
|
"""
|
||||||
|
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||||||
|
params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key}
|
||||||
|
response = requests.post(url, params=params)
|
||||||
|
response.raise_for_status()
|
||||||
|
return response.json().get("access_token")
|
||||||
|
|
||||||
|
|
||||||
|
def extract_json_from_text(text: str) -> List[Dict[str, Any]]:
|
||||||
|
"""
|
||||||
|
从文本中提取JSON数据。
|
||||||
|
|
||||||
|
参数:
|
||||||
|
- text: 包含JSON数据的字符串文本。
|
||||||
|
|
||||||
|
返回:
|
||||||
|
- 包含提取JSON数据的字典列表。
|
||||||
|
"""
|
||||||
|
json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL)
|
||||||
|
if not json_match:
|
||||||
|
print("未找到 JSON 数据")
|
||||||
|
return []
|
||||||
|
|
||||||
|
json_string = json_match.group(0)
|
||||||
|
try:
|
||||||
|
data = json.loads(json_string)
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f"解码 JSON 时出错: {e}")
|
||||||
|
return []
|
||||||
|
|
||||||
|
return data
|
11
docs/idea.md
11
docs/idea.md
@ -8,6 +8,10 @@
|
|||||||
|
|
||||||
参考项目: [https://github.com/SonarSource/sonarqube]
|
参考项目: [https://github.com/SonarSource/sonarqube]
|
||||||
|
|
||||||
|
检查源代码的语法和关键词。通过这种方式,可以发现是否存在与其他语言的交互,比如调用外部命令、使用其他语言的扩展模块、与其他语言的接口交互等。
|
||||||
|
|
||||||
|
实现方法:可以使用Python代码解析库(如ast模块)来分析语法树,并检查特定的代码模式或结构;开发脚本来搜索Python代码中常用于与其他语言交互的关键词和函数,例如ctypes、subprocess、os.system等
|
||||||
|
|
||||||
## 控制流分析
|
## 控制流分析
|
||||||
|
|
||||||
通过分析程序的控制流(即程序中各个操作的执行顺序),可以检测到异常的控制流路径,这些路径可能是后门的迹象。
|
通过分析程序的控制流(即程序中各个操作的执行顺序),可以检测到异常的控制流路径,这些路径可能是后门的迹象。
|
||||||
@ -22,6 +26,10 @@
|
|||||||
|
|
||||||
这个网站可以搜索依赖中是否存在漏洞: [https://security.snyk.io/package/pip/]
|
这个网站可以搜索依赖中是否存在漏洞: [https://security.snyk.io/package/pip/]
|
||||||
|
|
||||||
|
分析代码库中的依赖关系,查找是否导入了与其他语言交互相关的模块或库
|
||||||
|
|
||||||
|
实施策略:开发脚本进行依赖库对比匹配
|
||||||
|
|
||||||
## 异常行为检测
|
## 异常行为检测
|
||||||
|
|
||||||
通过定义“正常”代码行为的基线,可以标识出异常行为,这些异常行为可能指示着后门的存在。
|
通过定义“正常”代码行为的基线,可以标识出异常行为,这些异常行为可能指示着后门的存在。
|
||||||
@ -33,3 +41,6 @@
|
|||||||
使用NLP技术来训练机器学习模型,以自动从大量代码中学习和识别异常或潜在的后门模式。
|
使用NLP技术来训练机器学习模型,以自动从大量代码中学习和识别异常或潜在的后门模式。
|
||||||
|
|
||||||
开发方法:采用深度学习框架如TensorFlow或PyTorch,结合NLP处理工具,训练模型识别代码中的异常行为。
|
开发方法:采用深度学习框架如TensorFlow或PyTorch,结合NLP处理工具,训练模型识别代码中的异常行为。
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
40
tests/test_CN_GPT_detection.py
Normal file
40
tests/test_CN_GPT_detection.py
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
import unittest
|
||||||
|
import warnings
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
|
||||||
|
from detection.cngptdetection import detectGPT
|
||||||
|
|
||||||
|
class TestBackdoorDetection(unittest.TestCase):
|
||||||
|
def test_gpt_risk_detection(self):
|
||||||
|
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
|
||||||
|
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
|
||||||
|
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||||
|
|
||||||
|
content = """import os
|
||||||
|
os.system('rm -rf /') # high risk
|
||||||
|
exec('print("Hello")') # high risk
|
||||||
|
eval('2 + 2') # high risk
|
||||||
|
"""
|
||||||
|
results1 = detectGPT(content)
|
||||||
|
classified_results = json.loads(results1)
|
||||||
|
self.assertEqual(len(classified_results["high"]), 3)
|
||||||
|
|
||||||
|
def test_gpt_no_risk_detection(self):
|
||||||
|
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
|
||||||
|
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
|
||||||
|
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
|
||||||
|
|
||||||
|
content = """a = 10
|
||||||
|
b = a + 5
|
||||||
|
print('This should not be detected as risky.')
|
||||||
|
"""
|
||||||
|
results2 = detectGPT(content)
|
||||||
|
classified_results = json.loads(results2)
|
||||||
|
self.assertEqual(len(classified_results["high"]), 0)
|
||||||
|
self.assertEqual(len(classified_results["medium"]), 0)
|
||||||
|
self.assertEqual(len(classified_results["low"]), 0)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
@ -83,13 +83,5 @@ class TestBackdoorDetection(unittest.TestCase):
|
|||||||
self.assertEqual(len(results["medium"]), 0)
|
self.assertEqual(len(results["medium"]), 0)
|
||||||
self.assertEqual(len(results["low"]), 0)
|
self.assertEqual(len(results["low"]), 0)
|
||||||
|
|
||||||
def test_gpt_env_no_set(self):
|
|
||||||
if os.getenv("OPENAI_API_KEY") is not None:
|
|
||||||
self.skipTest("OPENAI_API_KEY is setted")
|
|
||||||
content = "print('test test')"
|
|
||||||
with self.assertRaises(ValueError):
|
|
||||||
detectGPT(content)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user