115 lines
3.6 KiB
Python
115 lines
3.6 KiB
Python
import os
|
||
import requests
|
||
import signal
|
||
import re
|
||
import json
|
||
from typing import List, Dict, Any
|
||
|
||
|
||
class TimeoutException(Exception):
|
||
"""Custom exception to handle timeouts."""
|
||
pass
|
||
|
||
|
||
def timeout_handler(signum, frame):
|
||
"""Handle the SIGALRM signal by raising a TimeoutException."""
|
||
raise TimeoutException
|
||
|
||
|
||
# 从环境变量中获取API密钥
|
||
API_KEY = os.getenv('BAIDU_API_KEY')
|
||
SECRET_KEY = os.getenv('BAIDU_SECRET_KEY')
|
||
|
||
#API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa"
|
||
#SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7"
|
||
|
||
|
||
def detectGPT(content):
|
||
# signal.signal(signal.SIGTERM, timeout_handler)
|
||
# signal.alarm(10)
|
||
|
||
url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token=" + get_access_token()
|
||
|
||
# 注意message必须是奇数条
|
||
payload = json.dumps({
|
||
"messages": [
|
||
{
|
||
"role": "user",
|
||
"content": (
|
||
"You are a Python code reviewer. Read the code below and identify any potential "
|
||
"security vulnerabilities. Classify them by risk level (high, medium, low, none). "
|
||
'Only report the line number and the risk level.\nYou should output the result as '
|
||
'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] '
|
||
"Each of these three fields is required.\nYou are required to only output the json format. "
|
||
"Do not output any other information." + content
|
||
)
|
||
}
|
||
]
|
||
})
|
||
headers = {
|
||
'Content-Type': 'application/json'
|
||
}
|
||
|
||
res_json = requests.request("POST", url, headers=headers, data=payload).json()
|
||
try:
|
||
message_content = res_json.get('result') # 使用get方法获取result,避免KeyError异常
|
||
if message_content is None:
|
||
raise ValueError("API response content is None")
|
||
|
||
except TimeoutException:
|
||
raise TimeoutException("The api call timed out")
|
||
|
||
except Exception as e:
|
||
raise ValueError(f"Error: {str(e)}")
|
||
# finally:
|
||
# signal.alarm(0)
|
||
|
||
# 提取数据
|
||
extracted_data = extract_json_from_text(message_content)
|
||
|
||
# 输出提取的 JSON 数据
|
||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||
for res in extracted_data:
|
||
classified_results[res["Risk"]].append(
|
||
(res["Line"], content.split("\n")[res["Line"] - 1].strip())
|
||
)
|
||
#return classified_results
|
||
result = json.dumps(classified_results, indent=2, ensure_ascii=False)
|
||
return result
|
||
|
||
# 获得访问令牌
|
||
def get_access_token():
|
||
"""
|
||
使用 AK,SK 生成鉴权签名(Access Token)
|
||
:return: access_token,或是None(如果错误)
|
||
"""
|
||
url = "https://aip.baidubce.com/oauth/2.0/token"
|
||
params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY}
|
||
return str(requests.post(url, params=params).json().get("access_token"))
|
||
|
||
|
||
def extract_json_from_text(text: str) -> List[Dict[str, Any]]:
|
||
"""
|
||
从文本中提取 JSON 数据。
|
||
|
||
参数:
|
||
- text: 包含 JSON 数据的字符串文本。
|
||
|
||
返回:
|
||
- 包含提取 JSON 数据的字典列表。
|
||
"""
|
||
# 使用正则表达式找到 JSON 部分
|
||
json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL)
|
||
if not json_match:
|
||
print("未找到 JSON 数据")
|
||
return []
|
||
|
||
json_string = json_match.group(0)
|
||
try:
|
||
data = json.loads(json_string)
|
||
except json.JSONDecodeError as e:
|
||
print(f"解码 JSON 时出错: {e}")
|
||
return []
|
||
|
||
return data
|