diff --git a/detection/cngptdetection.py b/detection/cngptdetection.py new file mode 100644 index 0000000..20a8a79 --- /dev/null +++ b/detection/cngptdetection.py @@ -0,0 +1,113 @@ +import os +import requests +import re +import json +from typing import List, Dict, Any + + +class TimeoutException(Exception): + """自定义异常用于处理超时情况。""" + pass + + +def detectGPT(content: str) -> str: + """ + 检测给定的代码内容中的潜在安全漏洞。 + + 参数: + - content: 要检测的代码字符串。 + + 返回: + - 分类后的漏洞信息的JSON字符串。 + """ + api_key = os.getenv("BAIDU_API_KEY") + secret_key = os.getenv("BAIDU_SECRET_KEY") + #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" + #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" + if not api_key or not secret_key: + raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + + url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( + api_key, secret_key) + + payload = json.dumps({ + "messages": [ + { + "role": "user", + "content": ( + "You are a Python code reviewer. Read the code below and identify any potential " + "security vulnerabilities. Classify them by risk level (high, medium, low, none). " + 'Only report the line number and the risk level.\nYou should output the result as ' + 'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] ' + "Each of these three fields is required.\nYou are required to only output the json format. " + "Do not output any other information." + content + ) + } + ] + }) + headers = { + 'Content-Type': 'application/json' + } + + try: + response = requests.post(url, headers=headers, data=payload) + response.raise_for_status() + res_json = response.json() + message_content = res_json.get('result') + if message_content is None: + raise ValueError("API response content is None") + except requests.RequestException as e: + raise ValueError(f"Request failed: {str(e)}") + + extracted_data = extract_json_from_text(message_content) + + classified_results = {"high": [], "medium": [], "low": [], "none": []} + for res in extracted_data: + try: + line_number = int(res["Line"]) + classified_results[res["Risk"]].append( + (line_number, content.split("\n")[line_number - 1].strip()) + ) + except (ValueError, IndexError, KeyError): + continue + + return json.dumps(classified_results, indent=2, ensure_ascii=False) + + +def get_access_token(api_key: str, secret_key: str) -> str: + """ + 使用API密钥和秘密生成访问令牌。 + + 返回: + - access_token字符串。 + """ + url = "https://aip.baidubce.com/oauth/2.0/token" + params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} + response = requests.post(url, params=params) + response.raise_for_status() + return response.json().get("access_token") + + +def extract_json_from_text(text: str) -> List[Dict[str, Any]]: + """ + 从文本中提取JSON数据。 + + 参数: + - text: 包含JSON数据的字符串文本。 + + 返回: + - 包含提取JSON数据的字典列表。 + """ + json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL) + if not json_match: + print("未找到 JSON 数据") + return [] + + json_string = json_match.group(0) + try: + data = json.loads(json_string) + except json.JSONDecodeError as e: + print(f"解码 JSON 时出错: {e}") + return [] + + return data \ No newline at end of file diff --git a/tests/test_CN_GPT_detection.py b/tests/test_CN_GPT_detection.py new file mode 100644 index 0000000..f902f50 --- /dev/null +++ b/tests/test_CN_GPT_detection.py @@ -0,0 +1,40 @@ +import unittest +import warnings +import os +import json + +from detection.cngptdetection import detectGPT + +class TestBackdoorDetection(unittest.TestCase): + def test_gpt_risk_detection(self): + if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: + warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) + self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + + content = """import os + os.system('rm -rf /') # high risk + exec('print("Hello")') # high risk + eval('2 + 2') # high risk + """ + results1 = detectGPT(content) + classified_results = json.loads(results1) + self.assertEqual(len(classified_results["high"]), 3) + + def test_gpt_no_risk_detection(self): + if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: + warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) + self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + + content = """a = 10 + b = a + 5 + print('This should not be detected as risky.') + """ + results2 = detectGPT(content) + classified_results = json.loads(results2) + self.assertEqual(len(classified_results["high"]), 0) + self.assertEqual(len(classified_results["medium"]), 0) + self.assertEqual(len(classified_results["low"]), 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_backdoor_detection.py b/tests/test_backdoor_detection.py index ebbcd58..c0d2a05 100644 --- a/tests/test_backdoor_detection.py +++ b/tests/test_backdoor_detection.py @@ -83,13 +83,5 @@ class TestBackdoorDetection(unittest.TestCase): self.assertEqual(len(results["medium"]), 0) self.assertEqual(len(results["low"]), 0) - def test_gpt_env_no_set(self): - if os.getenv("OPENAI_API_KEY") is not None: - self.skipTest("OPENAI_API_KEY is setted") - content = "print('test test')" - with self.assertRaises(ValueError): - detectGPT(content) - - if __name__ == "__main__": unittest.main()