feature/GPT #12
@ -2,12 +2,29 @@ import json
|
|||||||
import os
|
import os
|
||||||
from .utils import *
|
from .utils import *
|
||||||
import openai
|
import openai
|
||||||
|
import signal
|
||||||
|
|
||||||
|
|
||||||
|
class TimeoutException(Exception):
|
||||||
|
"""Custom exception to handle timeouts."""
|
||||||
|
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def timeout_handler(signum, frame):
|
||||||
|
"""Handle the SIGALRM signal by raising a TimeoutException."""
|
||||||
|
raise TimeoutException
|
||||||
|
|
||||||
|
|
||||||
def detectGPT(content: str):
|
def detectGPT(content: str):
|
||||||
api_key = os.getenv("OPENAI_API_KEY")
|
api_key = os.getenv("OPENAI_API_KEY")
|
||||||
if api_key is None:
|
if api_key is None:
|
||||||
raise ValueError("env OPENAI_API_KEY no set")
|
raise ValueError("env OPENAI_API_KEY no set")
|
||||||
|
|
||||||
|
# Set alarm timer
|
||||||
|
signal.signal(signal.SIGTERM, timeout_handler)
|
||||||
|
signal.alarm(10)
|
||||||
|
|
||||||
client = openai.OpenAI(api_key=api_key)
|
client = openai.OpenAI(api_key=api_key)
|
||||||
text = content
|
text = content
|
||||||
# client = openai.OpenAI(api_key="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") #测试用key
|
# client = openai.OpenAI(api_key="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") #测试用key
|
||||||
@ -31,9 +48,16 @@ def detectGPT(content: str):
|
|||||||
if message_content is None:
|
if message_content is None:
|
||||||
raise ValueError("API response content is None")
|
raise ValueError("API response content is None")
|
||||||
res_json = json.loads(message_content)
|
res_json = json.loads(message_content)
|
||||||
|
|
||||||
except json.JSONDecodeError:
|
except json.JSONDecodeError:
|
||||||
raise ValueError("Error: Could not parse the response. Please try again.")
|
raise ValueError("Error: Could not parse the response. Please try again.")
|
||||||
|
|
||||||
|
except TimeoutException:
|
||||||
|
raise TimeoutException("The api call timed out")
|
||||||
|
|
||||||
|
finally:
|
||||||
|
signal.alarm(0)
|
||||||
|
|
||||||
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
classified_results = {"high": [], "medium": [], "low": [], "none": []}
|
||||||
for res in res_json:
|
for res in res_json:
|
||||||
classified_results[res["Risk"]].append(
|
classified_results[res["Risk"]].append(
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
import unittest
|
import unittest
|
||||||
import warnings
|
import warnings
|
||||||
|
|
||||||
|
from pydantic import NoneStr
|
||||||
|
|
||||||
from detection.backdoor_detection import find_dangerous_functions
|
from detection.backdoor_detection import find_dangerous_functions
|
||||||
from detection.GPTdetection import detectGPT
|
from detection.GPTdetection import detectGPT
|
||||||
import os
|
import os
|
||||||
@ -84,6 +86,8 @@ class TestBackdoorDetection(unittest.TestCase):
|
|||||||
self.assertEqual(len(results["low"]), 0)
|
self.assertEqual(len(results["low"]), 0)
|
||||||
|
|
||||||
def test_gpt_env_no_set(self):
|
def test_gpt_env_no_set(self):
|
||||||
|
if os.getenv("OPENAI_API_KEY") is not None:
|
||||||
|
self.skipTest("OPENAI_API_KEY is setted")
|
||||||
content = "print('test test')"
|
content = "print('test test')"
|
||||||
with self.assertRaises(ValueError):
|
with self.assertRaises(ValueError):
|
||||||
detectGPT(content)
|
detectGPT(content)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user