Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests

This commit is contained in:
sangge-redmi 2024-06-04 18:12:08 +08:00
commit 610e35f868
6 changed files with 44 additions and 18 deletions

View File

@ -25,7 +25,7 @@ def detectGPT(content: str):
signal.signal(signal.SIGTERM, timeout_handler) signal.signal(signal.SIGTERM, timeout_handler)
signal.alarm(10) signal.alarm(10)
client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1",api_key=api_key) client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1", api_key=api_key)
text = content text = content
# client = openai.OpenAI(api_key="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") #测试用key # client = openai.OpenAI(api_key="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") #测试用key
response = client.chat.completions.create( response = client.chat.completions.create(
@ -33,8 +33,8 @@ def detectGPT(content: str):
{ {
"role": "system", "role": "system",
"content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: "
'[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n'
"You are required to only output the json format. Do not output any other information.\n", "You are required to only output the json format. Do not output any other information.\n",
}, },
{ {
"role": "user", "role": "user",
@ -60,7 +60,10 @@ def detectGPT(content: str):
classified_results = {"high": [], "medium": [], "low": [], "none": []} classified_results = {"high": [], "medium": [], "low": [], "none": []}
for res in res_json: for res in res_json:
classified_results[res["Risk"]].append( try:
(res["Line"], text.split("\n")[res["Line"] - 1].strip()) classified_results[res["Risk"]].append(
) (res["Line"], text.split("\n")[res["Line"] - 1].strip())
)
except IndexError:
pass
return classified_results return classified_results

View File

@ -420,7 +420,10 @@ def main():
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
) )
parser.add_argument( parser.add_argument(
"-p", "--pycdc", help="Path to pycdc.exe to decompile", default=None "-p",
"--pycdc",
help="Path to pycdc.exe to decompile",
default=os.getenv("pycdc"),
) )
args = parser.parse_args() args = parser.parse_args()
output_format = "txt" # Default output format output_format = "txt" # Default output format

View File

@ -1,5 +1,4 @@
from typing import List, Tuple from typing import List, Tuple
import uncompyle6
import io import io
import os import os
import subprocess import subprocess
@ -39,11 +38,7 @@ def disassemble_pyc(file_path: str, pycdc_addr=None) -> str:
str: The disassembled code as a string. str: The disassembled code as a string.
""" """
output = io.StringIO() output = io.StringIO()
try: if pycdc_addr is None:
uncompyle6.main.decompile_file(file_path, output) return "none"
return output.getvalue() else:
except Exception as e: return run_pycdc(pycdc_addr, file_path)
if pycdc_addr is None:
return "none"
else:
return run_pycdc(pycdc_addr, file_path)

View File

@ -3,6 +3,5 @@ requests
packaging packaging
openai openai
bs4 bs4
uncompyle6
colorama colorama
tqdm tqdm

View File

@ -38,7 +38,6 @@ setup(
"packaging", "packaging",
"openai", "openai",
"bs4", "bs4",
"uncompyle6",
"tqdm", "tqdm",
"colorama", "colorama",
], ],

View File

@ -1,6 +1,8 @@
import time
import unittest import unittest
import shutil import shutil
import os import os
import threading
from detection.utils import read_file_content from detection.utils import read_file_content
from .final_tests_util import clone_repo, Path, inject_random_backdoor from .final_tests_util import clone_repo, Path, inject_random_backdoor
@ -8,6 +10,25 @@ from detection.Regexdetection import find_dangerous_functions
from detection.GPTdetection import detectGPT from detection.GPTdetection import detectGPT
def GPTdetectFileList(fileList):
results = []
threads = []
for file in fileList:
content = read_file_content(str(file))
threads.append(threading.Thread(target=GPTThread(), args=(content, results)))
for thread in threads:
thread.start()
time.sleep(0.5)
for thread in threads:
thread.join()
return results
def GPTThread(content, results):
try:
results.append(detectGPT(content))
except Exception as e:
print(e)
class TestFinalTests(unittest.TestCase): class TestFinalTests(unittest.TestCase):
def setUp(self) -> None: def setUp(self) -> None:
self.path = "./tmp/repo/" self.path = "./tmp/repo/"
@ -79,6 +100,12 @@ class TestFinalTests(unittest.TestCase):
injected_detectedNum += 1 injected_detectedNum += 1
injected_accurency = injected_detectedNum / self.injectedNum injected_accurency = injected_detectedNum / self.injectedNum
print(f"injected files accurency: {injected_accurency}") print(f"injected files accurency: {injected_accurency}")
GPTresult = GPTdetectFileList(possibly_dangerous_file)
for result in GPTresult:
if len(result) > 0:
GPTdetectedNum += 1
print(GPTdetectedNum)
self.assertGreaterEqual(GPTdetectedNum, detectedNum)
# test pickle files # test pickle files
with open(self.path + "output.txt", "r") as f: with open(self.path + "output.txt", "r") as f: