67 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			67 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import os
 | |
| from .utils import *
 | |
| import openai
 | |
| import signal
 | |
| 
 | |
| 
 | |
| class TimeoutException(Exception):
 | |
|     """Custom exception to handle timeouts."""
 | |
| 
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def timeout_handler(signum, frame):
 | |
|     """Handle the SIGALRM signal by raising a TimeoutException."""
 | |
|     raise TimeoutException
 | |
| 
 | |
| 
 | |
| def detectGPT(content: str):
 | |
|     api_key = os.getenv("OPENAI_API_KEY")
 | |
|     if api_key is None:
 | |
|         raise ValueError("env OPENAI_API_KEY no set")
 | |
| 
 | |
|     # Set alarm timer
 | |
|     signal.signal(signal.SIGTERM, timeout_handler)
 | |
|     signal.alarm(10)
 | |
| 
 | |
|     client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1",api_key=api_key)
 | |
|     text = content
 | |
|     # client = openai.OpenAI(api_key="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") #测试用key
 | |
|     response = client.chat.completions.create(
 | |
|         messages=[
 | |
|             {
 | |
|                 "role": "system",
 | |
|                 "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: "
 | |
|                 '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
 | |
|                 "You are required to only output the json format. Do not output any other information.\n",
 | |
|             },
 | |
|             {
 | |
|                 "role": "user",
 | |
|                 "content": text,
 | |
|             },
 | |
|         ],
 | |
|         model="gpt-3.5-turbo",
 | |
|     )
 | |
|     try:
 | |
|         message_content = response.choices[0].message.content
 | |
|         if message_content is None:
 | |
|             raise ValueError("API response content is None")
 | |
|         res_json = json.loads(message_content)
 | |
| 
 | |
|     except json.JSONDecodeError:
 | |
|         raise ValueError("Error: Could not parse the response. Please try again.")
 | |
| 
 | |
|     except TimeoutException:
 | |
|         raise TimeoutException("The api call timed out")
 | |
| 
 | |
|     finally:
 | |
|         signal.alarm(0)
 | |
| 
 | |
|     classified_results = {"high": [], "medium": [], "low": [], "none": []}
 | |
|     for res in res_json:
 | |
|         classified_results[res["Risk"]].append(
 | |
|             (res["Line"], text.split("\n")[res["Line"] - 1].strip())
 | |
|         )
 | |
|     return classified_results
 |