tests/final-tests 完成最终代码 #34
| @@ -1,7 +1,11 @@ | |||||||
| from typing import Tuple | from typing import Tuple, List | ||||||
| from git import Repo  # type: ignore | from git import Repo  # type: ignore | ||||||
| import random | import random | ||||||
| from pathlib import Path | from pathlib import Path | ||||||
|  | import pickle | ||||||
|  | import marshal | ||||||
|  | import importlib.util | ||||||
|  | import os | ||||||
|  |  | ||||||
|  |  | ||||||
| def clone_repo(repo_url: str, clone_dir: str) -> None: | def clone_repo(repo_url: str, clone_dir: str) -> None: | ||||||
| @@ -18,9 +22,12 @@ def clone_repo(repo_url: str, clone_dir: str) -> None: | |||||||
|         print(f"Error cloning repository: {e}") |         print(f"Error cloning repository: {e}") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # a return type of backdoor. Include injected file name and number. | ||||||
|  |  | ||||||
|  |  | ||||||
| def inject_random_backdoor( | def inject_random_backdoor( | ||||||
|     path: str, pickle: bool = False, pyc: bool = False, sample_rate: float = 0.1 |     path: str, pickle: bool = False, pyc: bool = False, sample_rate: float = 0.1 | ||||||
| ) -> int: | ) -> Tuple[Tuple[str, int], ...]: | ||||||
|     """ |     """ | ||||||
|     Insert random backdoor into the path. |     Insert random backdoor into the path. | ||||||
|  |  | ||||||
| @@ -43,21 +50,22 @@ def inject_random_backdoor( | |||||||
|         if random.random() < sample_rate: |         if random.random() < sample_rate: | ||||||
|             injected_python_files.append(python_file) |             injected_python_files.append(python_file) | ||||||
|     injectedNum = len(injected_python_files) |     injectedNum = len(injected_python_files) | ||||||
|  |  | ||||||
|  |     results: List[Tuple[str, int]] = [] | ||||||
|  |     # inject backdoor | ||||||
|     for python_file in injected_python_files: |     for python_file in injected_python_files: | ||||||
|         with open(python_file, "r+", errors="ignore") as f: |         with open(python_file, "r+", errors="ignore") as f: | ||||||
|             lines = f.readlines() |             lines = f.readlines() | ||||||
|             total_lines = len(lines) |             total_lines = len(lines) | ||||||
|             inject_line_number = random.randint(0, total_lines) |             inject_line_number = random.randint(0, total_lines) | ||||||
|  |             # choose random backdoor | ||||||
|             inject_code = random.choice(backdoors) |             inject_code = random.choice(backdoors) | ||||||
|  |  | ||||||
|             lines.insert(inject_line_number, inject_code + "\n") |             lines.insert(inject_line_number, inject_code + "\n") | ||||||
|  |  | ||||||
|             # Move the file pointer to the beginning of the file |  | ||||||
|             f.seek(0) |             f.seek(0) | ||||||
|  |  | ||||||
|             # Write the modified content back to the file |  | ||||||
|             f.writelines(lines) |             f.writelines(lines) | ||||||
|     return injectedNum |             results.append((str(python_file), inject_line_number)) | ||||||
|  |  | ||||||
|  |     return tuple(results) | ||||||
|  |  | ||||||
|  |  | ||||||
| # 示例1: 通过 os.system 执行命令 | # 示例1: 通过 os.system 执行命令 | ||||||
| @@ -144,7 +152,10 @@ def inject_pickle_backdoor(path: str) -> None: | |||||||
|     Args: |     Args: | ||||||
|         path (str): The path to the repository to insert the backdoor into. |         path (str): The path to the repository to insert the backdoor into. | ||||||
|     """ |     """ | ||||||
|     pass |     for i, backdoor in enumerate(backdoors): | ||||||
|  |         filename = os.path.join(path, f"backdoor{i}.pickle") | ||||||
|  |         with open(filename, "wb") as f: | ||||||
|  |             pickle.dump(backdoor, f) | ||||||
|  |  | ||||||
|  |  | ||||||
| def inject_pyc_backdoor(path: str) -> None: | def inject_pyc_backdoor(path: str) -> None: | ||||||
| @@ -154,25 +165,25 @@ def inject_pyc_backdoor(path: str) -> None: | |||||||
|     Args: |     Args: | ||||||
|         path (str): The path to the repository to insert the backdoor into. |         path (str): The path to the repository to insert the backdoor into. | ||||||
|     """ |     """ | ||||||
|  |     for i, backdoor in enumerate(backdoors): | ||||||
|  |         filename = os.path.join(path, f"backdoor{i}.pyc") | ||||||
|  |  | ||||||
|  |         # Compile the string to a code object | ||||||
|  |         code = compile(backdoor, filename, "exec") | ||||||
|  |  | ||||||
|  |         # Create a code object header | ||||||
|  |         header = importlib.util.MAGIC_NUMBER | ||||||
|  |         if hasattr(importlib.util, "SOURCE_SUFFIXES"): | ||||||
|  |             header += b"\x00" * 4 | ||||||
|  |  | ||||||
|  |         # Write the .pyc file | ||||||
|  |         with open(filename, "wb") as file: | ||||||
|  |             file.write(header) | ||||||
|  |             marshal.dump(code, file) | ||||||
|  |  | ||||||
|     pass |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
| def check_accuracy(report_file: str, backdoor_location: Tuple[str, int]) -> float: |  | ||||||
|     """ |  | ||||||
|     Check the accuracy of the backdoor insertion. |  | ||||||
|  |  | ||||||
|     Args: |  | ||||||
|         report_file (str): The path to the report file. |  | ||||||
|         backdoor_location (Tuple[str, int]): The location of the backdoor in the repository. |  | ||||||
|  |  | ||||||
|     Returns: |  | ||||||
|         float: The accuracy rate of the backdoor insertion. |  | ||||||
|     """ |  | ||||||
|     accuracy_rate = 0.0 |  | ||||||
|  |  | ||||||
|     return accuracy_rate |  | ||||||
|  |  | ||||||
|  |  | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     repo_url = "https://github.com/TheAlgorithms/Python.git" |     repo_url = "https://github.com/TheAlgorithms/Python.git" | ||||||
|     clone_dir = "/tmp/repo" |     clone_dir = "/tmp/repo" | ||||||
|   | |||||||
| @@ -12,7 +12,8 @@ class TestFinalTests(unittest.TestCase): | |||||||
|         shutil.rmtree("./tmp/repo", ignore_errors=True) |         shutil.rmtree("./tmp/repo", ignore_errors=True) | ||||||
|         clone_repo("https://github.com/injetlee/Python.git", "./tmp/repo") |         clone_repo("https://github.com/injetlee/Python.git", "./tmp/repo") | ||||||
|         sampleRate = 0.1 |         sampleRate = 0.1 | ||||||
|         self.injectedNum = inject_random_backdoor("./tmp/repo", sample_rate=sampleRate) |         self.inject_reslt = inject_random_backdoor("./tmp/repo", sample_rate=sampleRate) | ||||||
|  |         self.injectedNum = len(self.inject_reslt) | ||||||
|         print(self.injectedNum) |         print(self.injectedNum) | ||||||
|         project_path = Path("./tmp/repo") |         project_path = Path("./tmp/repo") | ||||||
|         self.all_python_files = list(project_path.rglob("*.py")) |         self.all_python_files = list(project_path.rglob("*.py")) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user