From 2e5460a5225735e006a80e493186f024e4a9211e Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Tue, 14 May 2024 20:24:01 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feature/GPT:=E6=96=87=E5=BF=83=E4=B8=80?= =?UTF-8?q?=E8=A8=80api=EF=BC=8C=E5=9B=BD=E5=86=85gpt=EF=BC=88=E7=99=BE?= =?UTF-8?q?=E5=BA=A6=E5=A4=A7=E6=A8=A1=E5=9E=8B=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/cn-gptdetection.py | 97 ++++++++++++++++++++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 detection/cn-gptdetection.py diff --git a/detection/cn-gptdetection.py b/detection/cn-gptdetection.py new file mode 100644 index 0000000..2c79304 --- /dev/null +++ b/detection/cn-gptdetection.py @@ -0,0 +1,97 @@ +import json +import requests +import signal +from typing import Dict, List, Tuple # 用于类型提示的模块,使用了 Dict, List, Tuple 进行类型注解。 + +# 参考文档:https://blog.csdn.net/weixin_73654895/article/details/133799269 + +class TimeoutException(Exception): + """Custom exception to handle timeouts.""" + pass + + +def timeout_handler(signum, frame): + """Handle the SIGALRM signal by raising a TimeoutException.""" + raise TimeoutException + + +def get_baidu_access_token(api_key: str, secret_key: str) -> str: + """ + Retrieve the access token from Baidu API using API key and Secret key. + + Args: + api_key (str): The API key for Baidu API. + secret_key (str): The Secret key for Baidu API. + + Returns: + str: The access token. + """ + url = "https://aip.baidubce.com/oauth/2.0/token" + params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} + response = requests.post(url, params=params) + response_data = response.json() + if 'access_token' not in response_data: + raise ValueError("Error: Could not retrieve access token.") + return str(response_data["access_token"]) + + +def detectGPT(content: str) -> Dict[str, List[Tuple[int, str]]]: + """ + Detect potential security vulnerabilities in the provided code content using Baidu's AI model. + + Args: + content (str): The code content to be analyzed. + + Returns: + Dict[str, List[Tuple[int, str]]]: Classified results of detected vulnerabilities. + """ + API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa" + SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" + + # Set alarm timer + signal.signal(signal.SIGTERM, timeout_handler) + signal.alarm(10) + + try: + access_token = get_baidu_access_token(API_KEY, SECRET_KEY) + url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token={access_token}" + + payload = json.dumps({ + "messages": [ + { + "role": "system", + "content": "You are a Python code reviewer. Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " + '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] Each of these three fields is required.\n' + "You are required to only output the json format. Do not output any other information.\n" + }, + { + "role": "user", + "content": content + } + ] + }) + + headers = { + 'Content-Type': 'application/json' + } + + response = requests.post(url, headers=headers, data=payload) + response_data = response.json() + message_content = response_data.get('result', None) + if message_content is None: + raise ValueError("API response content is None") + res_json = json.loads(message_content) + + except json.JSONDecodeError: + raise ValueError("Error: Could not parse the response. Please try again.") + except TimeoutException: + raise TimeoutException("The API call timed out") + finally: + signal.alarm(0) + + classified_results = {"high": [], "medium": [], "low": [], "none": []} + for res in res_json: + classified_results[res["Risk"]].append( + (res["Line"], content.split("\n")[res["Line"] - 1].strip()) + ) + return classified_results From 9d6f0544788818005d1ad7d8553df6315b1974d4 Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Wed, 15 May 2024 13:38:01 +0800 Subject: [PATCH 2/7] =?UTF-8?q?fix=EF=BC=9A=E8=A1=A5=E5=85=85=E4=BA=86?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/cngptdetection.py | 97 ++++++++++++++++++++++++++++++++++ tests/test_CN_GPT_detection.py | 35 ++++++++++++ 2 files changed, 132 insertions(+) create mode 100644 detection/cngptdetection.py create mode 100644 tests/test_CN_GPT_detection.py diff --git a/detection/cngptdetection.py b/detection/cngptdetection.py new file mode 100644 index 0000000..f0ab311 --- /dev/null +++ b/detection/cngptdetection.py @@ -0,0 +1,97 @@ +import json +import requests +import signal +from typing import Dict, List, Tuple # 用于类型提示的模块,使用了 Dict, List, Tuple 进行类型注解。 + +# 参考文档:https://blog.csdn.net/weixin_73654895/article/details/133799269 + +class TimeoutException(Exception): + """Custom exception to handle timeouts.""" + pass + + +def timeout_handler(signum, frame): + """Handle the SIGALRM signal by raising a TimeoutException.""" + raise TimeoutException + + +def get_baidu_access_token(api_key: str, secret_key: str) -> str: + """ + Retrieve the access token from Baidu API using API key and Secret key. + + Args: + api_key (str): The API key for Baidu API. + secret_key (str): The Secret key for Baidu API. + + Returns: + str: The access token. + """ + url = "https://aip.baidubce.com/oauth/2.0/token" + params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} + response = requests.post(url, params=params) + response_data = response.json() + if 'access_token' not in response_data: + raise ValueError("Error: Could not retrieve access token.") + return str(response_data["access_token"]) + + +def cndetectGPT(content: str) -> Dict[str, List[Tuple[int, str]]]: + """ + Detect potential security vulnerabilities in the provided code content using Baidu's AI model. + + Args: + content (str): The code content to be analyzed. + + Returns: + Dict[str, List[Tuple[int, str]]]: Classified results of detected vulnerabilities. + """ + API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa" + SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" + + # Set alarm timer + signal.signal(signal.SIGTERM, timeout_handler) + signal.alarm(10) + + try: + access_token = get_baidu_access_token(API_KEY, SECRET_KEY) + url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token={access_token}" + + payload = json.dumps({ + "messages": [ + { + "role": "system", + "content": "You are a Python code reviewer. Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " + '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] Each of these three fields is required.\n' + "You are required to only output the json format. Do not output any other information.\n" + }, + { + "role": "user", + "content": content + } + ] + }) + + headers = { + 'Content-Type': 'application/json' + } + + response = requests.post(url, headers=headers, data=payload) + response_data = response.json() + message_content = response_data.get('result', None) + if message_content is None: + raise ValueError("API response content is None") + res_json = json.loads(message_content) + + except json.JSONDecodeError: + raise ValueError("Error: Could not parse the response. Please try again.") + except TimeoutException: + raise TimeoutException("The API call timed out") + finally: + signal.alarm(0) + + classified_results = {"high": [], "medium": [], "low": [], "none": []} + for res in res_json: + classified_results[res["Risk"]].append( + (res["Line"], content.split("\n")[res["Line"] - 1].strip()) + ) + return classified_results diff --git a/tests/test_CN_GPT_detection.py b/tests/test_CN_GPT_detection.py new file mode 100644 index 0000000..dc6c5ae --- /dev/null +++ b/tests/test_CN_GPT_detection.py @@ -0,0 +1,35 @@ +import unittest +import warnings +import os +from detection.cngptdetection import cndetectGPT + +class TestBackdoorDetection(unittest.TestCase): + def test_gpt_risk_detection(self): + content = """import os + os.system('rm -rf /') # high risk + exec('print("Hello")') # high risk + eval('2 + 2') # high risk + """ + results = cndetectGPT(content) + self.assertEqual(len(results["high"]), 3) + + def test_gpt_no_risk_detection(self): + content = """a = 10 + b = a + 5 + print('This should not be detected as risky.') + """ + results = cndetectGPT(content) + self.assertEqual(len(results["high"]), 0) + self.assertEqual(len(results["medium"]), 0) + self.assertEqual(len(results["low"]), 0) + + def test_gpt_env_no_set(self): + if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: + self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + content = "print('test test')" + with self.assertRaises(ValueError): + cndetectGPT(content) + + +if __name__ == "__main__": + unittest.main() From dd45c467a3f27d3e880d36c5b03d6dd68fbf7a13 Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Thu, 16 May 2024 21:15:22 +0800 Subject: [PATCH 3/7] =?UTF-8?q?feature/=E5=9B=BD=E5=86=85GPT-=E6=96=87?= =?UTF-8?q?=E5=BF=83=E4=B8=80=E8=A8=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/cngptdetection.py | 155 ++++++++++++++++++--------------- tests/test_CN_GPT_detection.py | 34 +++++--- 2 files changed, 110 insertions(+), 79 deletions(-) diff --git a/detection/cngptdetection.py b/detection/cngptdetection.py index f0ab311..8cd802e 100644 --- a/detection/cngptdetection.py +++ b/detection/cngptdetection.py @@ -1,9 +1,10 @@ -import json +import os import requests import signal -from typing import Dict, List, Tuple # 用于类型提示的模块,使用了 Dict, List, Tuple 进行类型注解。 +import re +import json +from typing import List, Dict, Any -# 参考文档:https://blog.csdn.net/weixin_73654895/article/details/133799269 class TimeoutException(Exception): """Custom exception to handle timeouts.""" @@ -15,83 +16,99 @@ def timeout_handler(signum, frame): raise TimeoutException -def get_baidu_access_token(api_key: str, secret_key: str) -> str: - """ - Retrieve the access token from Baidu API using API key and Secret key. +# 从环境变量中获取API密钥 +API_KEY = os.getenv('BAIDU_API_KEY') +SECRET_KEY = os.getenv('BAIDU_SECRET_KEY') - Args: - api_key (str): The API key for Baidu API. - secret_key (str): The Secret key for Baidu API. - - Returns: - str: The access token. - """ - url = "https://aip.baidubce.com/oauth/2.0/token" - params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} - response = requests.post(url, params=params) - response_data = response.json() - if 'access_token' not in response_data: - raise ValueError("Error: Could not retrieve access token.") - return str(response_data["access_token"]) +#API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa" +#SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" -def cndetectGPT(content: str) -> Dict[str, List[Tuple[int, str]]]: - """ - Detect potential security vulnerabilities in the provided code content using Baidu's AI model. +def detectGPT(content): + # signal.signal(signal.SIGTERM, timeout_handler) + # signal.alarm(10) - Args: - content (str): The code content to be analyzed. + url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token=" + get_access_token() - Returns: - Dict[str, List[Tuple[int, str]]]: Classified results of detected vulnerabilities. - """ - API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa" - SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" - - # Set alarm timer - signal.signal(signal.SIGTERM, timeout_handler) - signal.alarm(10) + # 注意message必须是奇数条 + payload = json.dumps({ + "messages": [ + { + "role": "user", + "content": ( + "You are a Python code reviewer. Read the code below and identify any potential " + "security vulnerabilities. Classify them by risk level (high, medium, low, none). " + 'Only report the line number and the risk level.\nYou should output the result as ' + 'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] ' + "Each of these three fields is required.\nYou are required to only output the json format. " + "Do not output any other information." + content + ) + } + ] + }) + headers = { + 'Content-Type': 'application/json' + } + res_json = requests.request("POST", url, headers=headers, data=payload).json() try: - access_token = get_baidu_access_token(API_KEY, SECRET_KEY) - url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token={access_token}" - - payload = json.dumps({ - "messages": [ - { - "role": "system", - "content": "You are a Python code reviewer. Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " - '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] Each of these three fields is required.\n' - "You are required to only output the json format. Do not output any other information.\n" - }, - { - "role": "user", - "content": content - } - ] - }) - - headers = { - 'Content-Type': 'application/json' - } - - response = requests.post(url, headers=headers, data=payload) - response_data = response.json() - message_content = response_data.get('result', None) + message_content = res_json.get('result') # 使用get方法获取result,避免KeyError异常 if message_content is None: raise ValueError("API response content is None") - res_json = json.loads(message_content) - - except json.JSONDecodeError: - raise ValueError("Error: Could not parse the response. Please try again.") - except TimeoutException: - raise TimeoutException("The API call timed out") - finally: - signal.alarm(0) + except TimeoutException: + raise TimeoutException("The api call timed out") + + except Exception as e: + raise ValueError(f"Error: {str(e)}") + # finally: + # signal.alarm(0) + + # 提取数据 + extracted_data = extract_json_from_text(message_content) + + # 输出提取的 JSON 数据 classified_results = {"high": [], "medium": [], "low": [], "none": []} - for res in res_json: + for res in extracted_data: classified_results[res["Risk"]].append( (res["Line"], content.split("\n")[res["Line"] - 1].strip()) ) - return classified_results + #return classified_results + result = json.dumps(classified_results, indent=2, ensure_ascii=False) + return result + +# 获得访问令牌 +def get_access_token(): + """ + 使用 AK,SK 生成鉴权签名(Access Token) + :return: access_token,或是None(如果错误) + """ + url = "https://aip.baidubce.com/oauth/2.0/token" + params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY} + return str(requests.post(url, params=params).json().get("access_token")) + + +def extract_json_from_text(text: str) -> List[Dict[str, Any]]: + """ + 从文本中提取 JSON 数据。 + + 参数: + - text: 包含 JSON 数据的字符串文本。 + + 返回: + - 包含提取 JSON 数据的字典列表。 + """ + # 使用正则表达式找到 JSON 部分 + json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL) + if not json_match: + print("未找到 JSON 数据") + return [] + + json_string = json_match.group(0) + try: + data = json.loads(json_string) + except json.JSONDecodeError as e: + print(f"解码 JSON 时出错: {e}") + return [] + + return data diff --git a/tests/test_CN_GPT_detection.py b/tests/test_CN_GPT_detection.py index dc6c5ae..39e08ac 100644 --- a/tests/test_CN_GPT_detection.py +++ b/tests/test_CN_GPT_detection.py @@ -1,34 +1,48 @@ import unittest import warnings import os -from detection.cngptdetection import cndetectGPT + +from detection.cngptdetection import detectGPT # 导入调用百度 ai 模型的函数 + class TestBackdoorDetection(unittest.TestCase): def test_gpt_risk_detection(self): + """ + if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: + warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) + self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + """ content = """import os os.system('rm -rf /') # high risk exec('print("Hello")') # high risk eval('2 + 2') # high risk """ - results = cndetectGPT(content) - self.assertEqual(len(results["high"]), 3) + results1 = detectGPT(content) + self.assertEqual(len(results1["high"]), 3) def test_gpt_no_risk_detection(self): + """ + if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: + warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) + self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + """ content = """a = 10 b = a + 5 print('This should not be detected as risky.') """ - results = cndetectGPT(content) - self.assertEqual(len(results["high"]), 0) - self.assertEqual(len(results["medium"]), 0) - self.assertEqual(len(results["low"]), 0) + results2 = detectGPT(content) + self.assertEqual(len(results2["high"]), 0) + self.assertEqual(len(results2["medium"]), 0) + self.assertEqual(len(results2["low"]), 0) def test_gpt_env_no_set(self): - if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: - self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + """ + if os.getenv("BAIDU_API_KEY") is not None or os.getenv("BAIDU_SECRET_KEY") is not None: + self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is set") + """ content = "print('test test')" with self.assertRaises(ValueError): - cndetectGPT(content) + detectGPT(content) if __name__ == "__main__": From 9a7c38f1a839639c4770dc4464871a76618292a9 Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Thu, 16 May 2024 21:20:12 +0800 Subject: [PATCH 4/7] =?UTF-8?q?fix=EF=BC=9A=E4=BC=91=E6=95=B4=E4=BB=A3?= =?UTF-8?q?=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/cn-gptdetection.py | 97 ------------------------------------ 1 file changed, 97 deletions(-) delete mode 100644 detection/cn-gptdetection.py diff --git a/detection/cn-gptdetection.py b/detection/cn-gptdetection.py deleted file mode 100644 index 2c79304..0000000 --- a/detection/cn-gptdetection.py +++ /dev/null @@ -1,97 +0,0 @@ -import json -import requests -import signal -from typing import Dict, List, Tuple # 用于类型提示的模块,使用了 Dict, List, Tuple 进行类型注解。 - -# 参考文档:https://blog.csdn.net/weixin_73654895/article/details/133799269 - -class TimeoutException(Exception): - """Custom exception to handle timeouts.""" - pass - - -def timeout_handler(signum, frame): - """Handle the SIGALRM signal by raising a TimeoutException.""" - raise TimeoutException - - -def get_baidu_access_token(api_key: str, secret_key: str) -> str: - """ - Retrieve the access token from Baidu API using API key and Secret key. - - Args: - api_key (str): The API key for Baidu API. - secret_key (str): The Secret key for Baidu API. - - Returns: - str: The access token. - """ - url = "https://aip.baidubce.com/oauth/2.0/token" - params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} - response = requests.post(url, params=params) - response_data = response.json() - if 'access_token' not in response_data: - raise ValueError("Error: Could not retrieve access token.") - return str(response_data["access_token"]) - - -def detectGPT(content: str) -> Dict[str, List[Tuple[int, str]]]: - """ - Detect potential security vulnerabilities in the provided code content using Baidu's AI model. - - Args: - content (str): The code content to be analyzed. - - Returns: - Dict[str, List[Tuple[int, str]]]: Classified results of detected vulnerabilities. - """ - API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa" - SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" - - # Set alarm timer - signal.signal(signal.SIGTERM, timeout_handler) - signal.alarm(10) - - try: - access_token = get_baidu_access_token(API_KEY, SECRET_KEY) - url = f"https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token={access_token}" - - payload = json.dumps({ - "messages": [ - { - "role": "system", - "content": "You are a Python code reviewer. Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " - '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] Each of these three fields is required.\n' - "You are required to only output the json format. Do not output any other information.\n" - }, - { - "role": "user", - "content": content - } - ] - }) - - headers = { - 'Content-Type': 'application/json' - } - - response = requests.post(url, headers=headers, data=payload) - response_data = response.json() - message_content = response_data.get('result', None) - if message_content is None: - raise ValueError("API response content is None") - res_json = json.loads(message_content) - - except json.JSONDecodeError: - raise ValueError("Error: Could not parse the response. Please try again.") - except TimeoutException: - raise TimeoutException("The API call timed out") - finally: - signal.alarm(0) - - classified_results = {"high": [], "medium": [], "low": [], "none": []} - for res in res_json: - classified_results[res["Risk"]].append( - (res["Line"], content.split("\n")[res["Line"] - 1].strip()) - ) - return classified_results From faf68760c929d8af1767148833c41be6302b25e8 Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Fri, 24 May 2024 17:28:34 +0800 Subject: [PATCH 5/7] =?UTF-8?q?fix=EF=BC=9Atypeerror=EF=BC=8C=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E7=B1=BB=E5=9E=8B=E9=94=99=E8=AF=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/cngptdetection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/detection/cngptdetection.py b/detection/cngptdetection.py index 8cd802e..4e9e891 100644 --- a/detection/cngptdetection.py +++ b/detection/cngptdetection.py @@ -75,7 +75,8 @@ def detectGPT(content): ) #return classified_results result = json.dumps(classified_results, indent=2, ensure_ascii=False) - return result + classified_results = json.loads(result) + return classified_results # 获得访问令牌 def get_access_token(): From b1bc566c09b710c5ab6644043397bf36eb7a550c Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Fri, 24 May 2024 20:27:18 +0800 Subject: [PATCH 6/7] =?UTF-8?q?update=EF=BC=9A=E4=BF=AE=E6=94=B9=E5=9B=BD?= =?UTF-8?q?=E5=86=85gpt=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/cngptdetection.py | 94 +++++++++++++++++----------------- tests/test_CN_GPT_detection.py | 26 +++++----- 2 files changed, 58 insertions(+), 62 deletions(-) diff --git a/detection/cngptdetection.py b/detection/cngptdetection.py index 4e9e891..20a8a79 100644 --- a/detection/cngptdetection.py +++ b/detection/cngptdetection.py @@ -1,36 +1,35 @@ import os import requests -import signal import re import json from typing import List, Dict, Any class TimeoutException(Exception): - """Custom exception to handle timeouts.""" + """自定义异常用于处理超时情况。""" pass -def timeout_handler(signum, frame): - """Handle the SIGALRM signal by raising a TimeoutException.""" - raise TimeoutException +def detectGPT(content: str) -> str: + """ + 检测给定的代码内容中的潜在安全漏洞。 + 参数: + - content: 要检测的代码字符串。 -# 从环境变量中获取API密钥 -API_KEY = os.getenv('BAIDU_API_KEY') -SECRET_KEY = os.getenv('BAIDU_SECRET_KEY') + 返回: + - 分类后的漏洞信息的JSON字符串。 + """ + api_key = os.getenv("BAIDU_API_KEY") + secret_key = os.getenv("BAIDU_SECRET_KEY") + #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" + #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" + if not api_key or not secret_key: + raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") -#API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa" -#SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" + url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( + api_key, secret_key) - -def detectGPT(content): - # signal.signal(signal.SIGTERM, timeout_handler) - # signal.alarm(10) - - url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token=" + get_access_token() - - # 注意message必须是奇数条 payload = json.dumps({ "messages": [ { @@ -50,56 +49,55 @@ def detectGPT(content): 'Content-Type': 'application/json' } - res_json = requests.request("POST", url, headers=headers, data=payload).json() try: - message_content = res_json.get('result') # 使用get方法获取result,避免KeyError异常 + response = requests.post(url, headers=headers, data=payload) + response.raise_for_status() + res_json = response.json() + message_content = res_json.get('result') if message_content is None: raise ValueError("API response content is None") + except requests.RequestException as e: + raise ValueError(f"Request failed: {str(e)}") - except TimeoutException: - raise TimeoutException("The api call timed out") - - except Exception as e: - raise ValueError(f"Error: {str(e)}") - # finally: - # signal.alarm(0) - - # 提取数据 extracted_data = extract_json_from_text(message_content) - # 输出提取的 JSON 数据 classified_results = {"high": [], "medium": [], "low": [], "none": []} for res in extracted_data: - classified_results[res["Risk"]].append( - (res["Line"], content.split("\n")[res["Line"] - 1].strip()) - ) - #return classified_results - result = json.dumps(classified_results, indent=2, ensure_ascii=False) - classified_results = json.loads(result) - return classified_results + try: + line_number = int(res["Line"]) + classified_results[res["Risk"]].append( + (line_number, content.split("\n")[line_number - 1].strip()) + ) + except (ValueError, IndexError, KeyError): + continue -# 获得访问令牌 -def get_access_token(): + return json.dumps(classified_results, indent=2, ensure_ascii=False) + + +def get_access_token(api_key: str, secret_key: str) -> str: """ - 使用 AK,SK 生成鉴权签名(Access Token) - :return: access_token,或是None(如果错误) + 使用API密钥和秘密生成访问令牌。 + + 返回: + - access_token字符串。 """ url = "https://aip.baidubce.com/oauth/2.0/token" - params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY} - return str(requests.post(url, params=params).json().get("access_token")) + params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} + response = requests.post(url, params=params) + response.raise_for_status() + return response.json().get("access_token") def extract_json_from_text(text: str) -> List[Dict[str, Any]]: """ - 从文本中提取 JSON 数据。 + 从文本中提取JSON数据。 参数: - - text: 包含 JSON 数据的字符串文本。 + - text: 包含JSON数据的字符串文本。 返回: - - 包含提取 JSON 数据的字典列表。 + - 包含提取JSON数据的字典列表。 """ - # 使用正则表达式找到 JSON 部分 json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL) if not json_match: print("未找到 JSON 数据") @@ -112,4 +110,4 @@ def extract_json_from_text(text: str) -> List[Dict[str, Any]]: print(f"解码 JSON 时出错: {e}") return [] - return data + return data \ No newline at end of file diff --git a/tests/test_CN_GPT_detection.py b/tests/test_CN_GPT_detection.py index 39e08ac..6f0cdd2 100644 --- a/tests/test_CN_GPT_detection.py +++ b/tests/test_CN_GPT_detection.py @@ -1,49 +1,47 @@ import unittest import warnings import os +import json -from detection.cngptdetection import detectGPT # 导入调用百度 ai 模型的函数 - +from detection.cngptdetection import detectGPT class TestBackdoorDetection(unittest.TestCase): def test_gpt_risk_detection(self): - """ - if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: + if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") - """ + content = """import os os.system('rm -rf /') # high risk exec('print("Hello")') # high risk eval('2 + 2') # high risk """ results1 = detectGPT(content) - self.assertEqual(len(results1["high"]), 3) + classified_results = json.loads(results1) + self.assertEqual(len(classified_results["high"]), 3) def test_gpt_no_risk_detection(self): - """ if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None: warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning) self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") - """ + content = """a = 10 b = a + 5 print('This should not be detected as risky.') """ results2 = detectGPT(content) - self.assertEqual(len(results2["high"]), 0) - self.assertEqual(len(results2["medium"]), 0) - self.assertEqual(len(results2["low"]), 0) + classified_results = json.loads(results2) + self.assertEqual(len(classified_results["high"]), 0) + self.assertEqual(len(classified_results["medium"]), 0) + self.assertEqual(len(classified_results["low"]), 0) def test_gpt_env_no_set(self): - """ if os.getenv("BAIDU_API_KEY") is not None or os.getenv("BAIDU_SECRET_KEY") is not None: self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is set") - """ + content = "print('test test')" with self.assertRaises(ValueError): detectGPT(content) - if __name__ == "__main__": unittest.main() From b544007e6b1aa5ef9d641c0eaa8b2dbad8a718fa Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Fri, 24 May 2024 20:44:35 +0800 Subject: [PATCH 7/7] =?UTF-8?q?fix=EF=BC=9A=E5=88=A0=E9=99=A4=E6=97=A0?= =?UTF-8?q?=E7=94=A8=E6=B5=8B=E8=AF=95=E4=BB=A3=E7=A0=81=E2=80=94=E2=80=94?= =?UTF-8?q?api=5Fkey?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_CN_GPT_detection.py | 7 ------- tests/test_backdoor_detection.py | 8 -------- 2 files changed, 15 deletions(-) diff --git a/tests/test_CN_GPT_detection.py b/tests/test_CN_GPT_detection.py index 6f0cdd2..f902f50 100644 --- a/tests/test_CN_GPT_detection.py +++ b/tests/test_CN_GPT_detection.py @@ -35,13 +35,6 @@ class TestBackdoorDetection(unittest.TestCase): self.assertEqual(len(classified_results["medium"]), 0) self.assertEqual(len(classified_results["low"]), 0) - def test_gpt_env_no_set(self): - if os.getenv("BAIDU_API_KEY") is not None or os.getenv("BAIDU_SECRET_KEY") is not None: - self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is set") - - content = "print('test test')" - with self.assertRaises(ValueError): - detectGPT(content) if __name__ == "__main__": unittest.main() diff --git a/tests/test_backdoor_detection.py b/tests/test_backdoor_detection.py index ebbcd58..c0d2a05 100644 --- a/tests/test_backdoor_detection.py +++ b/tests/test_backdoor_detection.py @@ -83,13 +83,5 @@ class TestBackdoorDetection(unittest.TestCase): self.assertEqual(len(results["medium"]), 0) self.assertEqual(len(results["low"]), 0) - def test_gpt_env_no_set(self): - if os.getenv("OPENAI_API_KEY") is not None: - self.skipTest("OPENAI_API_KEY is setted") - content = "print('test test')" - with self.assertRaises(ValueError): - detectGPT(content) - - if __name__ == "__main__": unittest.main()