Compare commits

..

11 Commits

Author SHA1 Message Date
dqy
c97780cde3 Merge pull request 'feature/cn-gpt' (#21) from feature/cn-gpt into main
Reviewed-on: #21
Reviewed-by: sangge <sangge@noreply.localhost>
Reviewed-by: dqy <dqy@noreply.localhost>
2024-05-26 16:59:22 +08:00
b544007e6b fix:删除无用测试代码——api_key 2024-05-24 20:44:35 +08:00
b1bc566c09 update:修改国内gpt调用 2024-05-24 20:27:18 +08:00
f0e2251dc0 Merge branch 'feature/cn-gpt' of https://git.mamahaha.work/sangge/BackDoorBuster into feature/cn-gpt 2024-05-24 17:29:10 +08:00
faf68760c9 fix:typeerror,修改类型错误 2024-05-24 17:28:34 +08:00
dqy
44c6086b8c Merge branch 'main' into feature/cn-gpt
Some checks failed
Python application test / build (pull_request) Failing after 14m6s
2024-05-18 20:58:38 +08:00
8fed7af432 Merge branch 'main' into feature/cn-gpt
Some checks failed
Python application test / build (pull_request) Failing after 12m46s
2024-05-17 16:06:00 +08:00
9a7c38f1a8 fix:休整代码
Some checks failed
Python application test / build (pull_request) Failing after 12m12s
2024-05-16 21:20:12 +08:00
dd45c467a3 feature/国内GPT-文心一言 2024-05-16 21:15:22 +08:00
9d6f054478 fix:补充了测试代码 2024-05-15 13:38:01 +08:00
2e5460a522 feature/GPT:文心一言api,国内gpt(百度大模型) 2024-05-14 20:24:01 +08:00
3 changed files with 153 additions and 8 deletions

113
detection/cngptdetection.py Normal file
View File

@@ -0,0 +1,113 @@
import os
import requests
import re
import json
from typing import List, Dict, Any
class TimeoutException(Exception):
"""自定义异常用于处理超时情况。"""
pass
def detectGPT(content: str) -> str:
"""
检测给定的代码内容中的潜在安全漏洞。
参数:
- content: 要检测的代码字符串。
返回:
- 分类后的漏洞信息的JSON字符串。
"""
api_key = os.getenv("BAIDU_API_KEY")
secret_key = os.getenv("BAIDU_SECRET_KEY")
#api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa"
#secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
if not api_key or not secret_key:
raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token(
api_key, secret_key)
payload = json.dumps({
"messages": [
{
"role": "user",
"content": (
"You are a Python code reviewer. Read the code below and identify any potential "
"security vulnerabilities. Classify them by risk level (high, medium, low, none). "
'Only report the line number and the risk level.\nYou should output the result as '
'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] '
"Each of these three fields is required.\nYou are required to only output the json format. "
"Do not output any other information." + content
)
}
]
})
headers = {
'Content-Type': 'application/json'
}
try:
response = requests.post(url, headers=headers, data=payload)
response.raise_for_status()
res_json = response.json()
message_content = res_json.get('result')
if message_content is None:
raise ValueError("API response content is None")
except requests.RequestException as e:
raise ValueError(f"Request failed: {str(e)}")
extracted_data = extract_json_from_text(message_content)
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for res in extracted_data:
try:
line_number = int(res["Line"])
classified_results[res["Risk"]].append(
(line_number, content.split("\n")[line_number - 1].strip())
)
except (ValueError, IndexError, KeyError):
continue
return json.dumps(classified_results, indent=2, ensure_ascii=False)
def get_access_token(api_key: str, secret_key: str) -> str:
"""
使用API密钥和秘密生成访问令牌。
返回:
- access_token字符串。
"""
url = "https://aip.baidubce.com/oauth/2.0/token"
params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key}
response = requests.post(url, params=params)
response.raise_for_status()
return response.json().get("access_token")
def extract_json_from_text(text: str) -> List[Dict[str, Any]]:
"""
从文本中提取JSON数据。
参数:
- text: 包含JSON数据的字符串文本。
返回:
- 包含提取JSON数据的字典列表。
"""
json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL)
if not json_match:
print("未找到 JSON 数据")
return []
json_string = json_match.group(0)
try:
data = json.loads(json_string)
except json.JSONDecodeError as e:
print(f"解码 JSON 时出错: {e}")
return []
return data

View File

@@ -0,0 +1,40 @@
import unittest
import warnings
import os
import json
from detection.cngptdetection import detectGPT
class TestBackdoorDetection(unittest.TestCase):
def test_gpt_risk_detection(self):
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
content = """import os
os.system('rm -rf /') # high risk
exec('print("Hello")') # high risk
eval('2 + 2') # high risk
"""
results1 = detectGPT(content)
classified_results = json.loads(results1)
self.assertEqual(len(classified_results["high"]), 3)
def test_gpt_no_risk_detection(self):
if os.getenv("BAIDU_API_KEY") is None or os.getenv("BAIDU_SECRET_KEY") is None:
warnings.warn("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set, test skipped.", UserWarning)
self.skipTest("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set")
content = """a = 10
b = a + 5
print('This should not be detected as risky.')
"""
results2 = detectGPT(content)
classified_results = json.loads(results2)
self.assertEqual(len(classified_results["high"]), 0)
self.assertEqual(len(classified_results["medium"]), 0)
self.assertEqual(len(classified_results["low"]), 0)
if __name__ == "__main__":
unittest.main()

View File

@@ -83,13 +83,5 @@ class TestBackdoorDetection(unittest.TestCase):
self.assertEqual(len(results["medium"]), 0) self.assertEqual(len(results["medium"]), 0)
self.assertEqual(len(results["low"]), 0) self.assertEqual(len(results["low"]), 0)
def test_gpt_env_no_set(self):
if os.getenv("OPENAI_API_KEY") is not None:
self.skipTest("OPENAI_API_KEY is setted")
content = "print('test test')"
with self.assertRaises(ValueError):
detectGPT(content)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()