import os import requests import signal import re import json from typing import List, Dict, Any class TimeoutException(Exception): """Custom exception to handle timeouts.""" pass def timeout_handler(signum, frame): """Handle the SIGALRM signal by raising a TimeoutException.""" raise TimeoutException # 从环境变量中获取API密钥 API_KEY = os.getenv('BAIDU_API_KEY') SECRET_KEY = os.getenv('BAIDU_SECRET_KEY') #API_KEY = "DUBWNIrB6QJLOsLkpnEz2ZZa" #SECRET_KEY = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" def detectGPT(content): # signal.signal(signal.SIGTERM, timeout_handler) # signal.alarm(10) url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/eb-instant?access_token=" + get_access_token() # 注意message必须是奇数条 payload = json.dumps({ "messages": [ { "role": "user", "content": ( "You are a Python code reviewer. Read the code below and identify any potential " "security vulnerabilities. Classify them by risk level (high, medium, low, none). " 'Only report the line number and the risk level.\nYou should output the result as ' 'json format in one line. For example: [{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnerable}"}] ' "Each of these three fields is required.\nYou are required to only output the json format. " "Do not output any other information." + content ) } ] }) headers = { 'Content-Type': 'application/json' } res_json = requests.request("POST", url, headers=headers, data=payload).json() try: message_content = res_json.get('result') # 使用get方法获取result,避免KeyError异常 if message_content is None: raise ValueError("API response content is None") except TimeoutException: raise TimeoutException("The api call timed out") except Exception as e: raise ValueError(f"Error: {str(e)}") # finally: # signal.alarm(0) # 提取数据 extracted_data = extract_json_from_text(message_content) # 输出提取的 JSON 数据 classified_results = {"high": [], "medium": [], "low": [], "none": []} for res in extracted_data: classified_results[res["Risk"]].append( (res["Line"], content.split("\n")[res["Line"] - 1].strip()) ) #return classified_results result = json.dumps(classified_results, indent=2, ensure_ascii=False) classified_results = json.loads(result) return classified_results # 获得访问令牌 def get_access_token(): """ 使用 AK,SK 生成鉴权签名(Access Token) :return: access_token,或是None(如果错误) """ url = "https://aip.baidubce.com/oauth/2.0/token" params = {"grant_type": "client_credentials", "client_id": API_KEY, "client_secret": SECRET_KEY} return str(requests.post(url, params=params).json().get("access_token")) def extract_json_from_text(text: str) -> List[Dict[str, Any]]: """ 从文本中提取 JSON 数据。 参数: - text: 包含 JSON 数据的字符串文本。 返回: - 包含提取 JSON 数据的字典列表。 """ # 使用正则表达式找到 JSON 部分 json_match = re.search(r'\[\s*{.*?}\s*\]', text, re.DOTALL) if not json_match: print("未找到 JSON 数据") return [] json_string = json_match.group(0) try: data = json.loads(json_string) except json.JSONDecodeError as e: print(f"解码 JSON 时出错: {e}") return [] return data