import os import requests import re import json from typing import List, Dict, Any class TimeoutException(Exception): """自定义异常用于处理超时情况。""" pass def detectGPT(content: str) -> str: """ 检测给定的代码内容中的潜在安全漏洞。 参数: - content: 要检测的代码字符串。 返回: - 分类后的漏洞信息的JSON字符串。 """ api_key = os.getenv("BAIDU_API_KEY") secret_key = os.getenv("BAIDU_SECRET_KEY") #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" if not api_key or not secret_key: raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( api_key, secret_key) payload = json.dumps({ "messages": [ { "role": "user", "content": ( "You are a Python code reviewer. Read the code below and identify any potential " "security vulnerabilities. Classify them by risk level (high, medium, low, none). " 'Only report the line number and the risk level.\nYou should output the result as ' 'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] ' "Each of these three fields is required.\nYou are required to only output the json format. " "Do not output any other information." + content ) } ] }) headers = { 'Content-Type': 'application/json' } try: response = requests.post(url, headers=headers, data=payload) response.raise_for_status() res_json = response.json() message_content = res_json.get('result') if message_content is None: raise ValueError("API response content is None") except requests.RequestException as e: raise ValueError(f"Request failed: {str(e)}") extracted_data = extract_json_from_text(message_content) classified_results = {"high": [], "medium": [], "low": [], "none": []} for res in extracted_data: try: line_number = int(res["Line"]) classified_results[res["Risk"]].append( (line_number, content.split("\n")[line_number - 1].strip()) ) except (ValueError, IndexError, KeyError): continue return json.dumps(classified_results, indent=2, ensure_ascii=False) def get_access_token(api_key: str, secret_key: str) -> str: """ 使用API密钥和秘密生成访问令牌。 返回: - access_token字符串。 """ url = "https://aip.baidubce.com/oauth/2.0/token" params = {"grant_type": "client_credentials", "client_id": api_key, "client_secret": secret_key} response = requests.post(url, params=params) response.raise_for_status() return response.json().get("access_token") def extract_json_from_text(text: str) -> List[Dict[str, Any]]: """ 从文本中提取JSON数据。 参数: - text: 包含JSON数据的字符串文本。 返回: - 包含提取JSON数据的字典列表。 """ json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL) if not json_match: print("未找到 JSON 数据") return [] json_string = json_match.group(0) try: data = json.loads(json_string) except json.JSONDecodeError as e: print(f"解码 JSON 时出错: {e}") return [] return data