feature/package-development #22
							
								
								
									
										1
									
								
								.gitattributes
									
									
									
									
										vendored
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								.gitattributes
									
									
									
									
										vendored
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| *.webp filter=lfs diff=lfs merge=lfs -text | ||||
| @@ -1,5 +1,6 @@ | ||||
| # BackDoorBuster | ||||
|  | ||||
|  | ||||
| ## 项目背景 | ||||
|  | ||||
| 随着网络安全威胁的增加,恶意软件和后门的检测成为了保护个人和组织数据安全的重要任务。后门通常被隐藏在合法软件中,给黑客提供远程控制目标系统的能力。本项目旨在开发一个工具,能够有效识别和评估潜在的后门风险。 | ||||
|   | ||||
							
								
								
									
										
											BIN
										
									
								
								banner.webp
									 (Stored with Git LFS)
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										
											BIN
										
									
								
								banner.webp
									 (Stored with Git LFS)
									
									
									
									
										Normal file
									
								
							
										
											Binary file not shown.
										
									
								
							
							
								
								
									
										113
									
								
								detection/cngptdetection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										113
									
								
								detection/cngptdetection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,113 @@ | ||||
| import os | ||||
| import requests | ||||
| import re | ||||
| import json | ||||
| from typing import List, Dict, Any | ||||
|  | ||||
|  | ||||
| class TimeoutException(Exception): | ||||
|     """自定义异常用于处理超时情况。""" | ||||
|     pass | ||||
|  | ||||
|  | ||||
| def detectGPT(content: str) -> str: | ||||
|     """ | ||||
|     检测给定的代码内容中的潜在安全漏洞。 | ||||
|  | ||||
|     参数: | ||||
|     - content: 要检测的代码字符串。 | ||||
|  | ||||
|     返回: | ||||
|     - 分类后的漏洞信息的JSON字符串。 | ||||
|     """ | ||||
|     api_key = os.getenv("BAIDU_API_KEY") | ||||
|     secret_key = os.getenv("BAIDU_SECRET_KEY") | ||||
|     #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" | ||||
|     #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" | ||||
|     if not api_key or not secret_key: | ||||
|         raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") | ||||
|  | ||||
|     url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( | ||||
|         api_key, secret_key) | ||||
|  | ||||
|     payload = json.dumps({ | ||||
|         "messages": [ | ||||
|             { | ||||
|                 "role": "user", | ||||
|                 "content": ( | ||||
|                         "You are a Python code reviewer. Read the code below and identify any potential " | ||||
|                         "security vulnerabilities. Classify them by risk level (high, medium, low, none). " | ||||
|                         'Only report the line number and the risk level.\nYou should output the result as ' | ||||
|                         'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] ' | ||||
|                         "Each of these three fields is required.\nYou are required to only output the json format. " | ||||
|                         "Do not output any other information." + content | ||||
|                 ) | ||||
|             } | ||||
|         ] | ||||
|     }) | ||||
|     headers = { | ||||
|         'Content-Type': 'application/json' | ||||
|     } | ||||
|  | ||||
|     try: | ||||
|         response = requests.post(url, headers=headers, data=payload) | ||||
|         response.raise_for_status() | ||||
|         res_json = response.json() | ||||
|         message_content = res_json.get('result') | ||||
|         if message_content is None: | ||||
|             raise ValueError("API response content is None") | ||||
|     except requests.RequestException as e: | ||||
|         raise ValueError(f"Request failed: {str(e)}") | ||||
|  | ||||
|     extracted_data = extract_json_from_text(message_content) | ||||
|  | ||||
|     classified_results = {"high": [], "medium": [], "low": [], "none": []} | ||||
|     for res in extracted_data: | ||||
|         try: | ||||
|             line_number = int(res["Line"]) | ||||
|             classified_results[res["Risk"]].append( | ||||
|                 (line_number, content.split("\n")[line_number - 1].strip()) | ||||
|             ) | ||||
|         except (ValueError, IndexError, KeyError): | ||||
|             continue | ||||
|  | ||||
|     return json.dumps(classified_results, indent=2, ensure_ascii=False) | ||||
|  | ||||
|  | ||||
| def get_access_token(api_key: str, secret_key: str) -> str: | ||||
|     """ | ||||
|     使用API密钥和秘密生成访问令牌。 | ||||
|  | ||||
|     返回: | ||||
|     - access_token字符串。 | ||||
|     """ | ||||
|     url = "https://aip.baidubce.com/oauth/2.0/token" | ||||
|     params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} | ||||
|     response = requests.post(url, params=params) | ||||
|     response.raise_for_status() | ||||
|     return response.json().get("access_token") | ||||
|  | ||||
|  | ||||
| def extract_json_from_text(text: str) -> List[Dict[str, Any]]: | ||||
|     """ | ||||
|     从文本中提取JSON数据。 | ||||
|  | ||||
|     参数: | ||||
|     - text: 包含JSON数据的字符串文本。 | ||||
|  | ||||
|     返回: | ||||
|     - 包含提取JSON数据的字典列表。 | ||||
|     """ | ||||
|     json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL) | ||||
|     if not json_match: | ||||
|         print("未找到 JSON 数据") | ||||
|         return [] | ||||
|  | ||||
|     json_string = json_match.group(0) | ||||
|     try: | ||||
|         data = json.loads(json_string) | ||||
|     except json.JSONDecodeError as e: | ||||
|         print(f"解码 JSON 时出错: {e}") | ||||
|         return [] | ||||
|  | ||||
|     return data | ||||
							
								
								
									
										11
									
								
								docs/idea.md
									
									
									
									
									
								
							
							
						
						
									
										11
									
								
								docs/idea.md
									
									
									
									
									
								
							| @@ -8,6 +8,10 @@ | ||||
|  | ||||
| 参考项目: [https://github.com/SonarSource/sonarqube] | ||||
|  | ||||
| 检查源代码的语法和关键词。通过这种方式,可以发现是否存在与其他语言的交互,比如调用外部命令、使用其他语言的扩展模块、与其他语言的接口交互等。 | ||||
|  | ||||
| 实现方法:可以使用Python代码解析库(如ast模块)来分析语法树,并检查特定的代码模式或结构;开发脚本来搜索Python代码中常用于与其他语言交互的关键词和函数,例如ctypes、subprocess、os.system等 | ||||
|  | ||||
| ## 控制流分析 | ||||
|  | ||||
| 通过分析程序的控制流(即程序中各个操作的执行顺序),可以检测到异常的控制流路径,这些路径可能是后门的迹象。 | ||||
| @@ -22,6 +26,10 @@ | ||||
|  | ||||
| 这个网站可以搜索依赖中是否存在漏洞: [https://security.snyk.io/package/pip/] | ||||
|  | ||||
| 分析代码库中的依赖关系,查找是否导入了与其他语言交互相关的模块或库 | ||||
|  | ||||
| 实施策略:开发脚本进行依赖库对比匹配 | ||||
|  | ||||
| ## 异常行为检测 | ||||
|  | ||||
| 通过定义“正常”代码行为的基线,可以标识出异常行为,这些异常行为可能指示着后门的存在。 | ||||
| @@ -33,3 +41,6 @@ | ||||
| 使用NLP技术来训练机器学习模型,以自动从大量代码中学习和识别异常或潜在的后门模式。 | ||||
|  | ||||
| 开发方法:采用深度学习框架如TensorFlow或PyTorch,结合NLP处理工具,训练模型识别代码中的异常行为。 | ||||
|  | ||||
|  | ||||
|  | ||||
|   | ||||
							
								
								
									
										40
									
								
								tests/test_CN_GPT_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										40
									
								
								tests/test_CN_GPT_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,40 @@ | ||||
| import unittest | ||||
| import warnings | ||||
| import os | ||||
| import json | ||||
|  | ||||
| from detection.cngptdetection import detectGPT | ||||
|  | ||||
| class TestBackdoorDetection(unittest.TestCase): | ||||
|     def test_gpt_risk_detection(self): | ||||
|         if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: | ||||
|             warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) | ||||
|             self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") | ||||
|  | ||||
|         content = """import os | ||||
|         os.system('rm -rf /')   # high risk | ||||
|         exec('print("Hello")')  # high risk | ||||
|         eval('2 + 2')   # high risk | ||||
|         """ | ||||
|         results1 = detectGPT(content) | ||||
|         classified_results = json.loads(results1) | ||||
|         self.assertEqual(len(classified_results["high"]), 3) | ||||
|  | ||||
|     def test_gpt_no_risk_detection(self): | ||||
|         if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: | ||||
|             warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) | ||||
|             self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") | ||||
|  | ||||
|         content = """a = 10 | ||||
|         b = a + 5 | ||||
|         print('This should not be detected as risky.') | ||||
|         """ | ||||
|         results2 = detectGPT(content) | ||||
|         classified_results = json.loads(results2) | ||||
|         self.assertEqual(len(classified_results["high"]), 0) | ||||
|         self.assertEqual(len(classified_results["medium"]), 0) | ||||
|         self.assertEqual(len(classified_results["low"]), 0) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     unittest.main() | ||||
| @@ -83,13 +83,5 @@ class TestBackdoorDetection(unittest.TestCase): | ||||
|         self.assertEqual(len(results["medium"]), 0) | ||||
|         self.assertEqual(len(results["low"]), 0) | ||||
|  | ||||
|     def test_gpt_env_no_set(self): | ||||
|         if os.getenv("OPENAI_API_KEY") is not None: | ||||
|             self.skipTest("OPENAI_API_KEY is setted") | ||||
|         content = "print('test test')" | ||||
|         with self.assertRaises(ValueError): | ||||
|             detectGPT(content) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     unittest.main() | ||||
|   | ||||
		Reference in New Issue
	
	Block a user