Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests
This commit is contained in:
commit
7198c8b4da
@ -3,6 +3,8 @@ from typing import Dict, List, Tuple, Optional
|
||||
from reportlab.lib.pagesizes import letter
|
||||
from reportlab.lib.styles import getSampleStyleSheet
|
||||
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
|
||||
|
||||
from detection.pickle_detection import pickleDataDetection
|
||||
from .Regexdetection import find_dangerous_functions
|
||||
from .GPTdetection import detectGPT
|
||||
from .pyc_detection import disassemble_pyc
|
||||
@ -373,6 +375,13 @@ def process_path(
|
||||
# 扫描动画
|
||||
for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
|
||||
file_extension = file_path.suffix
|
||||
if file_extension in [".pkl",".pickle"]:
|
||||
res = pickleDataDetection(str(file_path), output_file)
|
||||
results["pickles"].append({
|
||||
"file": str(file_path),
|
||||
"result": res
|
||||
})
|
||||
continue
|
||||
file_results = checkModeAndDetect(
|
||||
mode, str(file_path), file_extension, pycdc_addr
|
||||
)
|
||||
@ -387,7 +396,13 @@ def process_path(
|
||||
)
|
||||
elif os.path.isfile(path):
|
||||
file_extension = os.path.splitext(path)[1]
|
||||
if file_extension in SUPPORTED_EXTENSIONS:
|
||||
if file_extension in [".pkl", ".pickle"]:
|
||||
res = pickleDataDetection(str(path), output_file)
|
||||
results["pickles"].append({
|
||||
"file": str(path),
|
||||
"result": res
|
||||
})
|
||||
elif file_extension in SUPPORTED_EXTENSIONS:
|
||||
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
|
||||
if file_results is not None:
|
||||
for key in file_results:
|
||||
@ -425,9 +440,18 @@ def main():
|
||||
help="Path to pycdc.exe to decompile",
|
||||
default=os.getenv("PATH"),
|
||||
)
|
||||
parser.add_argument(
|
||||
"-P",
|
||||
"--Pickle",
|
||||
help="Path to pickle file to analyze",
|
||||
default=None,
|
||||
)
|
||||
args = parser.parse_args()
|
||||
output_format = "txt" # Default output format
|
||||
output_file = None
|
||||
if args.Pickle:
|
||||
pickleDataDetection(args.Pickle, args.output)
|
||||
return
|
||||
if args.output:
|
||||
_, ext = os.path.splitext(args.output)
|
||||
ext = ext.lower()
|
||||
|
@ -142,11 +142,7 @@ def pickleDataDetection(filename: str, output_file=None):
|
||||
pickscan = pickleScanner(file)
|
||||
pickscan.load()
|
||||
res = pickscan.output()
|
||||
if output_file:
|
||||
with open(output_file, "w") as file:
|
||||
json.dump(res, file, indent=4)
|
||||
else:
|
||||
print(json.dumps(res))
|
||||
return res
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -3,9 +3,8 @@ from git import Repo # type: ignore
|
||||
import random
|
||||
from pathlib import Path
|
||||
import pickle
|
||||
import marshal
|
||||
import importlib.util
|
||||
import os
|
||||
import py_compile
|
||||
|
||||
|
||||
def clone_repo(repo_url: str, clone_dir: str) -> None:
|
||||
@ -26,7 +25,7 @@ def clone_repo(repo_url: str, clone_dir: str) -> None:
|
||||
|
||||
|
||||
def inject_random_backdoor(
|
||||
path: str, pickle: bool = False, pyc: bool = False, sample_rate: float = 0.1
|
||||
path: str, sample_rate: float = 0.1
|
||||
) -> Tuple[Tuple[str, int], ...]:
|
||||
"""
|
||||
Insert random backdoor into the path.
|
||||
@ -36,11 +35,6 @@ def inject_random_backdoor(
|
||||
pickle (bool): Whether to insert a backdoor into a pickle file.
|
||||
pyc (bool): Whether to insert a backdoor into a compiled Python file.
|
||||
"""
|
||||
if pickle:
|
||||
inject_pickle_backdoor(path)
|
||||
if pyc:
|
||||
inject_pyc_backdoor(path)
|
||||
|
||||
project_path = Path(path)
|
||||
all_python_files = list(project_path.rglob("*.py"))
|
||||
injected_python_files = []
|
||||
@ -175,24 +169,18 @@ def inject_pyc_backdoor(root_path: str) -> None:
|
||||
for path in paths:
|
||||
backdoor_id = random.randrange(0, len(backdoors))
|
||||
backdoor = backdoors[backdoor_id]
|
||||
filename = os.path.join(path, f"backdoor{backdoor_id}.pyc")
|
||||
py_filename = os.path.join(path, f"backdoor{backdoor_id}.py")
|
||||
pyc_filename = os.path.join(path, f"backdoor{backdoor_id}.pyc")
|
||||
with open(py_filename, "w") as f:
|
||||
f.write(backdoor)
|
||||
|
||||
# Compile the string to a code object
|
||||
code = compile(backdoor, filename, "exec")
|
||||
|
||||
# Create a code object header
|
||||
header = importlib.util.MAGIC_NUMBER
|
||||
if hasattr(importlib.util, "SOURCE_SUFFIXES"):
|
||||
header += b"\x00" * 4
|
||||
|
||||
# Write the .pyc file
|
||||
with open(filename, "wb") as file:
|
||||
file.write(header)
|
||||
marshal.dump(code, file)
|
||||
py_compile.compile(py_filename, cfile=pyc_filename)
|
||||
os.remove(py_filename)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
repo_url = "https://github.com/TheAlgorithms/Python.git"
|
||||
clone_dir = "/tmp/repo"
|
||||
clone_repo(repo_url, clone_dir)
|
||||
inject_random_backdoor(clone_dir, pickle=True, pyc=True)
|
||||
inject_random_backdoor(clone_dir)
|
||||
inject_pickle_backdoor(clone_dir)
|
||||
|
@ -1,10 +1,19 @@
|
||||
import time
|
||||
import unittest
|
||||
import shutil
|
||||
import os
|
||||
import threading
|
||||
import re
|
||||
|
||||
from detection.utils import read_file_content
|
||||
from .final_tests_util import clone_repo, Path, inject_random_backdoor
|
||||
from .final_tests_util import (
|
||||
clone_repo,
|
||||
Path,
|
||||
inject_pickle_backdoor,
|
||||
inject_random_backdoor,
|
||||
inject_pyc_backdoor,
|
||||
backdoors,
|
||||
)
|
||||
from detection.Regexdetection import find_dangerous_functions
|
||||
from detection.GPTdetection import detectGPT
|
||||
|
||||
@ -22,29 +31,44 @@ def GPTdetectFileList(fileList):
|
||||
thread.join()
|
||||
return results
|
||||
|
||||
|
||||
def GPTThread(content, results):
|
||||
try:
|
||||
results.append(detectGPT(content))
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
|
||||
class TestFinalTests(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.path = "./tmp/repo"
|
||||
self.path = "./tmp/repo/"
|
||||
shutil.rmtree(self.path, ignore_errors=True)
|
||||
clone_repo("https://github.com/injetlee/Python.git", self.path)
|
||||
if not os.path.exists("/tmp/Python/"):
|
||||
clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python")
|
||||
shutil.copytree("/tmp/Python", self.path)
|
||||
sampleRate = 0.1
|
||||
self.inject_reslt = inject_random_backdoor(self.path, sample_rate=sampleRate)
|
||||
self.injectedNum = len(self.inject_reslt)
|
||||
self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate)
|
||||
self.pickle_true_num = inject_pickle_backdoor(self.path)
|
||||
self.pyc_true_num = inject_pyc_backdoor(self.path)
|
||||
self.injectedNum = len(self.inject_result)
|
||||
print(self.injectedNum)
|
||||
project_path = Path(self.path)
|
||||
self.all_python_files = list(project_path.rglob("*.py"))
|
||||
self.py_filesNum = len(self.all_python_files)
|
||||
self.trueRate = self.injectedNum / self.py_filesNum
|
||||
print(self.trueRate)
|
||||
|
||||
# test backdoor code in python files
|
||||
self.all_python_files = list(project_path.rglob("*.py"))
|
||||
self.py_files_num = len(self.all_python_files)
|
||||
|
||||
all_pickle_files = list(project_path.rglob("*.pickle"))
|
||||
self.pickle_files_num = len(all_pickle_files)
|
||||
|
||||
all_pyc_files = list(project_path.rglob("*.pyc"))
|
||||
self.pyc_files_num = len(all_pyc_files)
|
||||
|
||||
os.system(
|
||||
"python -m detection " + self.path + " -o " + self.path + "output.txt"
|
||||
)
|
||||
|
||||
def test_final_tests_pycode(self):
|
||||
# test backdoor code in python files
|
||||
detectedNum = 0
|
||||
possibly_dangerous_file = []
|
||||
for file in self.all_python_files:
|
||||
@ -57,26 +81,83 @@ class TestFinalTests(unittest.TestCase):
|
||||
):
|
||||
detectedNum += 1
|
||||
possibly_dangerous_file.append(file)
|
||||
print(detectedNum / self.py_filesNum)
|
||||
self.assertAlmostEqual(detectedNum, self.py_filesNum, places=1)
|
||||
print(detectedNum / self.py_files_num)
|
||||
GPTdetectedNum = 0
|
||||
GPTresult = GPTdetectFileList(possibly_dangerous_file)
|
||||
for result in GPTresult:
|
||||
if len(result) > 0:
|
||||
GPTdetectedNum += 1
|
||||
print(GPTdetectedNum)
|
||||
self.assertGreaterEqual(GPTdetectedNum, detectedNum)
|
||||
|
||||
for i in possibly_dangerous_file:
|
||||
content = read_file_content(str(i))
|
||||
results = {}
|
||||
try:
|
||||
results = detectGPT(content)
|
||||
if (
|
||||
len(results["high"]) > 0
|
||||
or len(results["medium"]) > 0
|
||||
or len(results["low"]) > 0
|
||||
):
|
||||
GPTdetectedNum += 1
|
||||
print(GPTdetectedNum)
|
||||
|
||||
except Exception as e:
|
||||
# print(e)
|
||||
pass
|
||||
|
||||
# test injected code
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
injected_detected_num = 0
|
||||
injected_correct_num = 0
|
||||
pattern = r"\w+\.py: Line \d+: (.+)"
|
||||
for line in lines:
|
||||
if "py:" in line:
|
||||
injected_detected_num += 1
|
||||
match = re.search(pattern, line)
|
||||
command = ""
|
||||
if match:
|
||||
command = match.group(1)
|
||||
for backdoor in backdoors:
|
||||
if command in backdoor:
|
||||
injected_correct_num += 1
|
||||
break
|
||||
|
||||
injected_accurency = injected_detected_num / self.py_files_num
|
||||
print(f"injected files accurency: {injected_accurency}")
|
||||
try:
|
||||
GPTresult = GPTdetectFileList(possibly_dangerous_file)
|
||||
for result in GPTresult:
|
||||
if len(result) > 0:
|
||||
GPTdetectedNum += 1
|
||||
print(GPTdetectedNum)
|
||||
self.assertGreaterEqual(GPTdetectedNum, detectedNum)
|
||||
except Exception as e:
|
||||
# print(e)
|
||||
pass
|
||||
|
||||
# test pickle files
|
||||
pickle_detectedNum = 0
|
||||
pickle_tureNum = len(list(Path(self.path).glob("*.pickle")))
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
pickle_detected_num = 0
|
||||
pickle_correct_num = 0
|
||||
for line in lines:
|
||||
if "pickle" in line:
|
||||
pickle_detected_num += 1
|
||||
if re.search(r"backdoor\d*\.pickle", line):
|
||||
pickle_correct_num += 1
|
||||
|
||||
self.assertAlmostEqual(pickle_detectedNum, pickle_tureNum, places=1)
|
||||
pickle_accurency = pickle_detected_num / self.pickle_true_num
|
||||
print(f"pickle files accurency: {pickle_accurency}")
|
||||
|
||||
# test pyc files
|
||||
pyc_detectedNum = 0
|
||||
pyc_tureNum = len(list(Path(self.path).glob("*.pyc")))
|
||||
self.assertAlmostEqual(pyc_detectedNum, pyc_tureNum, places=1)
|
||||
with open(self.path + "output.txt", "r") as f:
|
||||
lines = f.readlines()
|
||||
pyc_detected_num = 0
|
||||
pyc_correct_num = 0
|
||||
for line in lines:
|
||||
if "pyc" in line:
|
||||
pyc_detected_num += 1
|
||||
if re.search(r"backdoor\d*\.pyc", line):
|
||||
pyc_correct_num += 1
|
||||
pyc_accurency = pyc_detected_num / self.pyc_true_num
|
||||
print(f"pyc files accurency: {pyc_accurency}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
Loading…
x
Reference in New Issue
Block a user