tests/final-tests 完成最终代码 #34
| @@ -25,15 +25,25 @@ def find_dangerous_functions( | ||||
|         ".cpp": { | ||||
|             r"\bsystem\(": "high", | ||||
|         }, | ||||
|         ".pyc": { | ||||
|             r"\bexec\b": "high", | ||||
|             r"\beval\b": "high", | ||||
|             r"\bos\.system\b": "high", | ||||
|             r"\bos\.exec\b": "high", | ||||
|             r"\bos\.fork\b": "high", | ||||
|             r"\bos\.kill\b": "high", | ||||
|             r"\bos\.popen\b": "medium", | ||||
|             r"\bos\.spawn\b": "medium", | ||||
|         }, | ||||
|     } | ||||
|     risk_patterns = patterns.get(file_extension, {}) | ||||
|     classified_results = {"high": [], "medium": [], "low": [], "none": []} | ||||
|     for line_number, line in enumerate(file_content.split("\n"), start=1): | ||||
|         clean_line = remove_comments(line, file_extension) | ||||
|         if not clean_line: | ||||
|             continue | ||||
|         for pattern, risk_level in risk_patterns.items(): | ||||
|             if re.search(pattern, clean_line): | ||||
|                 classified_results[risk_level].append((line_number, clean_line)) | ||||
|     if file_content is not None: | ||||
|         for line_number, line in enumerate(file_content.split("\n"), start=1): | ||||
|             clean_line = remove_comments(line, file_extension) | ||||
|             if not clean_line: | ||||
|                 continue | ||||
|             for pattern, risk_level in risk_patterns.items(): | ||||
|                 if re.search(pattern, clean_line): | ||||
|                     classified_results[risk_level].append((line_number, clean_line)) | ||||
|     return classified_results | ||||
|  | ||||
|   | ||||
| @@ -5,11 +5,16 @@ from reportlab.lib.styles import getSampleStyleSheet | ||||
| from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate | ||||
| from .Regexdetection import find_dangerous_functions | ||||
| from .GPTdetection import detectGPT | ||||
| from .pyc_detection import disassemble_pyc | ||||
| from .utils import * | ||||
| import sys | ||||
| from colorama import init, Fore, Style | ||||
| from tqdm import tqdm | ||||
| from pathlib import Path | ||||
|  | ||||
| SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"} | ||||
| PYCDC_FLAG = True | ||||
| PYCDC_ADDR_FLAG = True | ||||
| SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"} | ||||
| OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] | ||||
| ORDERS = [ | ||||
|     "__import__", | ||||
| @@ -325,46 +330,74 @@ def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None): | ||||
|         return text_output | ||||
|  | ||||
|  | ||||
| def checkModeAndDetect(mode: str, filePath: str, fileExtension: str): | ||||
| def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: str): | ||||
|     # TODO:添加更多方式,这里提高代码的复用性和扩展性 | ||||
|     if mode == "regex": | ||||
|         return find_dangerous_functions(read_file_content(filePath), fileExtension) | ||||
|     elif mode == "llm": | ||||
|         return detectGPT(read_file_content(filePath)) | ||||
|     if fileExtension == ".pyc": | ||||
|         # 反汇编pyc文件 | ||||
|         file_content = disassemble_pyc(filePath, pycdc_addr) | ||||
|         if file_content == "none": | ||||
|             global PYCDC_FLAG | ||||
|             PYCDC_FLAG = False | ||||
|             return "" | ||||
|         elif file_content == "invalid": | ||||
|             global PYCDC_ADDR_FLAG | ||||
|             PYCDC_ADDR_FLAG = False | ||||
|         if mode == "regex": | ||||
|             return find_dangerous_functions(file_content, fileExtension) | ||||
|         elif mode == "llm": | ||||
|             return detectGPT(file_content) | ||||
|         else: | ||||
|             return find_dangerous_functions(file_content, fileExtension) | ||||
|     else: | ||||
|         return find_dangerous_functions(read_file_content(filePath), fileExtension) | ||||
|         file_content = read_file_content(filePath) | ||||
|         if mode == "regex": | ||||
|             return find_dangerous_functions(file_content, fileExtension) | ||||
|         elif mode == "llm": | ||||
|             return detectGPT(file_content) | ||||
|         else: | ||||
|             return find_dangerous_functions(file_content, fileExtension) | ||||
|  | ||||
|  | ||||
| def process_path(path: str, output_format: str, mode: str, output_file=None): | ||||
| def process_path( | ||||
|     path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None | ||||
| ): | ||||
|     results = {"high": [], "medium": [], "low": [], "none": []} | ||||
|     if os.path.isdir(path): | ||||
|         for root, dirs, files in os.walk(path): | ||||
|             for file in files: | ||||
|                 file_extension = os.path.splitext(file)[1] | ||||
|                 if file_extension in SUPPORTED_EXTENSIONS: | ||||
|                     file_path = os.path.join(root, file) | ||||
|         # 使用rglob获取所有文件 | ||||
|         all_files = [ | ||||
|             file_path | ||||
|             for file_path in Path(path).rglob("*") | ||||
|             if file_path.suffix in SUPPORTED_EXTENSIONS | ||||
|         ] | ||||
|  | ||||
|                     file_results = checkModeAndDetect(mode, file_path, file_extension) | ||||
|                     for key in file_results: | ||||
|                         if key != "none":  # Exclude 'none' risk level | ||||
|                             results[key].extend( | ||||
|                                 [ | ||||
|                                     (f"{file_path}: Line {line_num}", line) | ||||
|                                     for line_num, line in file_results[key] | ||||
|                                 ] | ||||
|                             ) | ||||
|         # 扫描动画 | ||||
|         for file_path in tqdm(all_files, desc="Scanning files", unit="file"): | ||||
|             file_extension = file_path.suffix | ||||
|             file_results = checkModeAndDetect( | ||||
|                 mode, str(file_path), file_extension, pycdc_addr | ||||
|             ) | ||||
|             if file_results is not None: | ||||
|                 for key in file_results: | ||||
|                     if key != "none":  # Exclude 'none' risk level | ||||
|                         results[key].extend( | ||||
|                             [ | ||||
|                                 (f"{file_path}: Line {line_num}", line) | ||||
|                                 for line_num, line in file_results[key] | ||||
|                             ] | ||||
|                         ) | ||||
|     elif os.path.isfile(path): | ||||
|         file_extension = os.path.splitext(path)[1] | ||||
|         if file_extension in SUPPORTED_EXTENSIONS: | ||||
|             file_results = checkModeAndDetect(mode, path, file_extension) | ||||
|             for key in file_results: | ||||
|                 if key != "none":  # Exclude 'none' risk level | ||||
|                     results[key].extend( | ||||
|                         [ | ||||
|                             (f"{path}: Line {line_num}", line) | ||||
|                             for line_num, line in file_results[key] | ||||
|                         ] | ||||
|                     ) | ||||
|             file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) | ||||
|             if file_results is not None: | ||||
|                 for key in file_results: | ||||
|                     if key != "none":  # Exclude 'none' risk level | ||||
|                         results[key].extend( | ||||
|                             [ | ||||
|                                 (f"{path}: Line {line_num}", line) | ||||
|                                 for line_num, line in file_results[key] | ||||
|                             ] | ||||
|                         ) | ||||
|         else: | ||||
|             print("Unsupported file type.") | ||||
|             return | ||||
| @@ -386,6 +419,9 @@ def main(): | ||||
|     parser.add_argument( | ||||
|         "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "-p", "--pycdc", help="Path to pycdc.exe to decompile", default=None | ||||
|     ) | ||||
|     args = parser.parse_args() | ||||
|     output_format = "txt"  # Default output format | ||||
|     output_file = None | ||||
| @@ -401,7 +437,15 @@ def main(): | ||||
|             ) | ||||
|             output_file = args.output.rsplit(".", 1)[0] + ".txt" | ||||
|     # 如果未指定输出文件,则输出到 stdout;否则写入文件 | ||||
|     process_path(args.path, output_format, args.mode, output_file) | ||||
|     process_path(args.path, output_format, args.mode, args.pycdc, output_file) | ||||
|     if PYCDC_FLAG == False: | ||||
|         print( | ||||
|             "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." | ||||
|         ) | ||||
|         print("Repo: https://github.com/zrax/pycdc.git") | ||||
|     if PYCDC_ADDR_FLAG == False: | ||||
|         print("ERROR: The specified pycdc.exe path is not valid") | ||||
|         print("Please check your pycdc path.") | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|   | ||||
							
								
								
									
										204
									
								
								detection/backdoor_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										204
									
								
								detection/backdoor_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,204 @@ | ||||
| import os | ||||
| from typing import Dict, List, Tuple | ||||
| from reportlab.lib.pagesizes import letter | ||||
| from reportlab.lib.styles import getSampleStyleSheet | ||||
| from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate | ||||
|  | ||||
| from detection.pickle_detection import pickleDataDetection | ||||
| from .Regexdetection import find_dangerous_functions | ||||
| from .GPTdetection import detectGPT | ||||
| from .utils import * | ||||
| import sys | ||||
|  | ||||
| SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"} | ||||
| OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] | ||||
|  | ||||
|  | ||||
| def generate_text_content(results): | ||||
|     text_output = "Security Analysis Report\n" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if entries and risk_level != "none": | ||||
|             text_output += f"{risk_level.capitalize()} Risk:\n" | ||||
|             for line_num, line in entries: | ||||
|                 text_output += f"  Line {line_num}: {line}\n" | ||||
|     return text_output | ||||
|  | ||||
|  | ||||
| def output_results(results, output_format, output_file=None): | ||||
|     if output_file: | ||||
|         file_name = os.path.splitext(output_file) | ||||
|         if output_format not in OUTPUT_FORMATS: | ||||
|             output_format = "txt" | ||||
|             output_file = f"{file_name}.txt" | ||||
|         results_dir = os.path.dirname(output_file) | ||||
|         if not os.path.exists(results_dir): | ||||
|             os.makedirs(results_dir) | ||||
|         if output_format == "pdf": | ||||
|             output_pdf(results, output_file) | ||||
|         elif output_format == "html": | ||||
|             output_html(results, output_file) | ||||
|         elif output_format == "md": | ||||
|             output_markdown(results, output_file) | ||||
|         else:  # Default to txt | ||||
|             output_text(results, output_file) | ||||
|     else: | ||||
|         # If no output file is specified, default to text output to the terminal. | ||||
|         txt_output = generate_text_content(results) | ||||
|         print(txt_output) | ||||
|  | ||||
|  | ||||
| def output_pdf(results: Dict[str, List[Tuple[int, str]]], file_name): | ||||
|     doc = SimpleDocTemplate(file_name, pagesize=letter) | ||||
|     story = [] | ||||
|     styles = getSampleStyleSheet() | ||||
|  | ||||
|     # Add the title centered | ||||
|     title_style = styles["Title"] | ||||
|     title_style.alignment = 1  # Center alignment | ||||
|     title = Paragraph("Security Analysis Report", title_style) | ||||
|     story.append(title) | ||||
|     story.append(Spacer(1, 20))  # Space after title | ||||
|  | ||||
|     # Add risk levels and entries | ||||
|     normal_style = styles["BodyText"] | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             story.append( | ||||
|                 Paragraph(f"{risk_level.capitalize()} Risk:", styles["Heading2"]) | ||||
|             ) | ||||
|             for line_num, line in entries: | ||||
|                 entry = Paragraph(f"Line {line_num}: {line}", normal_style) | ||||
|                 story.append(entry) | ||||
|             story.append(Spacer(1, 12))  # Space between sections | ||||
|  | ||||
|     doc.build(story) | ||||
|  | ||||
|  | ||||
| def output_html(results: Dict[str, List[Tuple[int, str]]], file_name=None): | ||||
|     html_output = "<html><head><title>Security Analysis Report</title></head><body>" | ||||
|     html_output += "<h1>Security Analysis Report</h1>" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             html_output += f"<h2>{risk_level.capitalize()} Risk</h2><ul>" | ||||
|             for line_num, line in entries: | ||||
|                 html_output += f"<li>{line_num}: {line}</li>" | ||||
|             html_output += "</ul>" | ||||
|     html_output += "</body></html>" | ||||
|     if file_name: | ||||
|         with open(file_name, "w") as file: | ||||
|             file.write(html_output) | ||||
|     else: | ||||
|         return html_output | ||||
|  | ||||
|  | ||||
| def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name=None): | ||||
|     md_output = "# Security Analysis Report\n" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             md_output += f"## {risk_level.capitalize()} Risk\n" | ||||
|             for line_num, line in entries: | ||||
|                 md_output += f"- {line_num}: {line}\n" | ||||
|     if file_name: | ||||
|         with open(file_name, "w") as file: | ||||
|             file.write(md_output) | ||||
|     else: | ||||
|         return md_output | ||||
|  | ||||
|  | ||||
| def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None): | ||||
|     text_output = "Security Analysis Report\n" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             text_output += f"{risk_level.capitalize()} Risk:\n" | ||||
|             for line_num, line in entries: | ||||
|                 text_output += f"  {line_num}: {line}\n" | ||||
|     if file_name: | ||||
|         with open(file_name, "w") as file: | ||||
|             file.write(text_output) | ||||
|     else: | ||||
|         return text_output | ||||
|  | ||||
|  | ||||
| def checkModeAndDetect(mode: str, filePath: str, fileExtension: str): | ||||
|     # TODO:添加更多方式,这里提高代码的复用性和扩展性 | ||||
|     if mode == "regex": | ||||
|         return find_dangerous_functions(read_file_content(filePath), fileExtension) | ||||
|     elif mode == "llm": | ||||
|         return detectGPT(read_file_content(filePath)) | ||||
|     else: | ||||
|         return find_dangerous_functions(read_file_content(filePath), fileExtension) | ||||
|  | ||||
|  | ||||
| def process_path(path: str, output_format: str, mode: str, output_file=None): | ||||
|     results = {"high": [], "medium": [], "low": [], "none": []} | ||||
|     if os.path.isdir(path): | ||||
|         for root, dirs, files in os.walk(path): | ||||
|             for file in files: | ||||
|                 file_extension = os.path.splitext(file)[1] | ||||
|                 if file_extension in SUPPORTED_EXTENSIONS: | ||||
|                     file_path = os.path.join(root, file) | ||||
|  | ||||
|                     file_results = checkModeAndDetect(mode, file_path, file_extension) | ||||
|                     for key in file_results: | ||||
|                         if key != "none":  # Exclude 'none' risk level | ||||
|                             results[key].extend( | ||||
|                                 [ | ||||
|                                     (f"{file_path}: Line {line_num}", line) | ||||
|                                     for line_num, line in file_results[key] | ||||
|                                 ] | ||||
|                             ) | ||||
|     elif os.path.isfile(path): | ||||
|         file_extension = os.path.splitext(path)[1] | ||||
|         if file_extension in SUPPORTED_EXTENSIONS: | ||||
|             file_results = checkModeAndDetect(mode, path, file_extension) | ||||
|             for key in file_results: | ||||
|                 if key != "none":  # Exclude 'none' risk level | ||||
|                     results[key].extend( | ||||
|                         [ | ||||
|                             (f"{path}: Line {line_num}", line) | ||||
|                             for line_num, line in file_results[key] | ||||
|                         ] | ||||
|                     ) | ||||
|         else: | ||||
|             print("Unsupported file type.") | ||||
|             return | ||||
|     else: | ||||
|         print("Invalid path.") | ||||
|         sys.exit(1) | ||||
|  | ||||
|     output_results(results, output_format, output_file) | ||||
|  | ||||
|  | ||||
| def main(): | ||||
|     import argparse | ||||
|  | ||||
|     parser = argparse.ArgumentParser(description="Backdoor detection tool.") | ||||
|     parser.add_argument("path", help="Path to the code to analyze") | ||||
|     parser.add_argument("-o", "--output", help="Output file path", default=None) | ||||
|     parser.add_argument( | ||||
|         "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" | ||||
|     ) | ||||
|     parser.add_argument("-p", "--pickle", help="analyze the pickle file", default=None) | ||||
|     args = parser.parse_args() | ||||
|     output_format = "txt"  # Default output format | ||||
|     output_file = None | ||||
|     if args.output: | ||||
|         _, ext = os.path.splitext(args.output) | ||||
|         ext = ext.lower() | ||||
|         if ext in [".html", ".md", ".txt", ".pdf"]: | ||||
|             output_format = ext.replace(".", "") | ||||
|             output_file = args.output | ||||
|         else: | ||||
|             print( | ||||
|                 "Your input file format was incorrect, the output has been saved as a TXT file." | ||||
|             ) | ||||
|             output_file = args.output.rsplit(".", 1)[0] + ".txt" | ||||
|     # 如果未指定输出文件,则输出到 stdout;否则写入文件 | ||||
|     if args.pickle: | ||||
|         pickleDataDetection(args.pickle, output_file) | ||||
|     else: | ||||
|         process_path(args.path, output_format, args.mode, output_file) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
							
								
								
									
										153
									
								
								detection/pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										153
									
								
								detection/pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,153 @@ | ||||
| import io | ||||
| import json | ||||
| import pickle | ||||
|  | ||||
|  | ||||
| class _Unframer: | ||||
|  | ||||
|     def __init__(self, file_read, file_readline, file_tell=None): | ||||
|         self.file_read = file_read | ||||
|         self.file_readline = file_readline | ||||
|         self.current_frame = None | ||||
|  | ||||
|     def readinto(self, buf): | ||||
|         if self.current_frame: | ||||
|             n = self.current_frame.readinto(buf) | ||||
|             if n == 0 and len(buf) != 0: | ||||
|                 self.current_frame = None | ||||
|                 n = len(buf) | ||||
|                 buf[:] = self.file_read(n) | ||||
|                 return n | ||||
|             if n < len(buf): | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return n | ||||
|         else: | ||||
|             n = len(buf) | ||||
|             buf[:] = self.file_read(n) | ||||
|             return n | ||||
|  | ||||
|     def read(self, n): | ||||
|         if self.current_frame: | ||||
|             data = self.current_frame.read(n) | ||||
|             if not data and n != 0: | ||||
|                 self.current_frame = None | ||||
|                 return self.file_read(n) | ||||
|             if len(data) < n: | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return data | ||||
|         else: | ||||
|             return self.file_read(n) | ||||
|  | ||||
|     def readline(self): | ||||
|         if self.current_frame: | ||||
|             data = self.current_frame.readline() | ||||
|             if not data: | ||||
|                 self.current_frame = None | ||||
|                 return self.file_readline() | ||||
|             if data[-1] != b"\n"[0]: | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return data | ||||
|         else: | ||||
|             return self.file_readline() | ||||
|  | ||||
|     def load_frame(self, frame_size): | ||||
|         if self.current_frame and self.current_frame.read() != b"": | ||||
|             raise pickle.UnpicklingError( | ||||
|                 "beginning of a new frame before end of current frame" | ||||
|             ) | ||||
|         self.current_frame = io.BytesIO(self.file_read(frame_size)) | ||||
|  | ||||
|  | ||||
| dangerous_modules = ["os", "subprocess", "builtins", "nt"] | ||||
| dangerous_names = [ | ||||
|     "system", | ||||
|     "popen", | ||||
|     "run", | ||||
|     "call", | ||||
|     "check_output", | ||||
|     "check_call", | ||||
| ] | ||||
|  | ||||
|  | ||||
| class pickleScanner: | ||||
|  | ||||
|     def __init__( | ||||
|         self, file, *, fix_imports=True, encoding="ASCII", errors="strict", buffers=None | ||||
|     ): | ||||
|         self._buffers = iter(buffers) if buffers is not None else None | ||||
|         self._file_readline = file.readline | ||||
|         self._file_read = file.read | ||||
|         self.memo = {} | ||||
|         self.encoding = encoding | ||||
|         self.errors = errors | ||||
|         self.proto = 0 | ||||
|         self.fix_imports = fix_imports | ||||
|         self.file = file | ||||
|         self.ReduceCount = 0 | ||||
|         self.maliciousModule = [] | ||||
|  | ||||
|     def find_class(self, module, name): | ||||
|         if module.decode() in dangerous_modules or name.decode() in dangerous_names: | ||||
|             # self.maliciousCount += 1 | ||||
|             self.maliciousModule.append((module.decode(), name.decode())) | ||||
|  | ||||
|     def load(self): | ||||
|         self._unframer = _Unframer(self._file_read, self._file_readline) | ||||
|         self.read = self._unframer.read | ||||
|         self.readinto = self._unframer.readinto | ||||
|         self.readline = self._unframer.readline | ||||
|         self.seek = self.file.seek | ||||
|         self.metastack = [] | ||||
|         self.stack = [] | ||||
|         self.append = self.stack.append | ||||
|         self.proto = 0 | ||||
|         # 扫描所有的opcodes | ||||
|         opcode = self.read(1) | ||||
|         while opcode: | ||||
|             if opcode == b"c": | ||||
|                 self.seek(-2, 1) | ||||
|                 codeN1 = self.read(1) | ||||
|                 if ( | ||||
|                     65 <= ord(codeN1) <= 90 | ||||
|                     or 97 <= ord(codeN1) <= 122 | ||||
|                     or ord(codeN1) == 0 | ||||
|                 ): | ||||
|                     self.read(1) | ||||
|                 else: | ||||
|                     self.read(1) | ||||
|                     module = self.readline()[:-1] | ||||
|                     name = self.readline()[:-1] | ||||
|                     self.find_class(module, name) | ||||
|             elif opcode in self.unsafe_opcodes: | ||||
|                 self.ReduceCount += 1 | ||||
|             opcode = self.read(1) | ||||
|  | ||||
|     unsafe_opcodes = { | ||||
|         b"r",  # REDUCE - call a callable with arguments | ||||
|         b"R",  # REDUCE - same as 'r', but for args tuple | ||||
|     } | ||||
|  | ||||
|     def output(self) -> dict: | ||||
|         return { | ||||
|             "ReduceCount": self.ReduceCount, | ||||
|             "maliciousModule": self.maliciousModule, | ||||
|         } | ||||
|  | ||||
|  | ||||
| def pickleDataDetection(filename: str, output_file=None): | ||||
|     """ | ||||
|     :param file: pickle file path | ||||
|     """ | ||||
|     with open(filename, "rb") as file: | ||||
|         pickscan = pickleScanner(file) | ||||
|         pickscan.load() | ||||
|     res = pickscan.output() | ||||
|     if output_file: | ||||
|         with open(output_file, "w") as file: | ||||
|             json.dump(res, file, indent=4) | ||||
|     else: | ||||
|         print(json.dumps(res)) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     pickleDataDetection("test.pkl") | ||||
							
								
								
									
										49
									
								
								detection/pyc_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										49
									
								
								detection/pyc_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,49 @@ | ||||
| from typing import List, Tuple | ||||
| import uncompyle6 | ||||
| import io | ||||
| import os | ||||
| import subprocess | ||||
| from contextlib import redirect_stdout, redirect_stderr | ||||
|  | ||||
|  | ||||
| def run_pycdc(exe_path: str, pyc_file: str) -> str: | ||||
|     """ | ||||
|     Executes pycdc.exe with the given .pyc file using a command line string and captures the output. | ||||
|  | ||||
|     Args: | ||||
|         exe_path (str): Path to the pycdc.exe executable. | ||||
|         pyc_file (str): Path to the .pyc file to decompile. | ||||
|  | ||||
|     Returns: | ||||
|         str: Output from pycdc.exe. | ||||
|     """ | ||||
|     if not os.path.isfile(exe_path): | ||||
|         return "invalid" | ||||
|  | ||||
|     command = f'"{exe_path}" "{pyc_file}"' | ||||
|     result = subprocess.run( | ||||
|         command, capture_output=True, text=True, shell=True, encoding="utf-8" | ||||
|     ) | ||||
|  | ||||
|     return result.stdout | ||||
|  | ||||
|  | ||||
| def disassemble_pyc(file_path: str, pycdc_addr=None) -> str: | ||||
|     """ | ||||
|     Disassembles a .pyc file using uncompyle6. | ||||
|  | ||||
|     Args: | ||||
|         file_path (str): The path to the .pyc file. | ||||
|  | ||||
|     Returns: | ||||
|         str: The disassembled code as a string. | ||||
|     """ | ||||
|     output = io.StringIO() | ||||
|     try: | ||||
|         uncompyle6.main.decompile_file(file_path, output) | ||||
|         return output.getvalue() | ||||
|     except Exception as e: | ||||
|         if pycdc_addr is None: | ||||
|             return "none" | ||||
|         else: | ||||
|             return run_pycdc(pycdc_addr, file_path) | ||||
| @@ -4,7 +4,7 @@ import sys | ||||
|  | ||||
| def read_file_content(file_path: str) -> str: | ||||
|     try: | ||||
|         with open(file_path, "r", encoding="utf-8",errors="ignore") as file: | ||||
|         with open(file_path, "r", encoding="utf-8", errors="ignore") as file: | ||||
|             return file.read() | ||||
|     except FileNotFoundError: | ||||
|         print("Error: File not found.") | ||||
|   | ||||
| @@ -4,3 +4,5 @@ packaging | ||||
| openai | ||||
| bs4 | ||||
| uncompyle6 | ||||
| colorama | ||||
| tqdm | ||||
							
								
								
									
										2
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								setup.py
									
									
									
									
									
								
							| @@ -39,5 +39,7 @@ setup( | ||||
|         "openai", | ||||
|         "bs4", | ||||
|         "uncompyle6", | ||||
|         "tqdm", | ||||
|         "colorama", | ||||
|     ], | ||||
| ) | ||||
|   | ||||
| @@ -83,6 +83,30 @@ class TestBackdoorDetection(unittest.TestCase): | ||||
|         self.assertEqual(len(results["medium"]), 0) | ||||
|         self.assertEqual(len(results["low"]), 0) | ||||
|  | ||||
|     def test_gpt_env_no_set(self): | ||||
|         if os.getenv("OPENAI_API_KEY") is not None: | ||||
|             self.skipTest("OPENAI_API_KEY is setted") | ||||
|         content = "print('test test')" | ||||
|         with self.assertRaises(ValueError): | ||||
|             detectGPT(content) | ||||
|  | ||||
|     def test_find_dangerous_functions_pyc(self): | ||||
|         file_content = """import os | ||||
|         os.system('rm -rf /') | ||||
|         """ | ||||
|         file_extension = ".pyc" | ||||
|  | ||||
|         expected_result = { | ||||
|             "high": [(2, "os.system('rm -rf /')")], | ||||
|             "medium": [], | ||||
|             "low": [], | ||||
|             "none": [], | ||||
|         } | ||||
|  | ||||
|         result = find_dangerous_functions(file_content, file_extension) | ||||
|  | ||||
|         self.assertEqual(result, expected_result) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     unittest.main() | ||||
|   | ||||
							
								
								
									
										56
									
								
								tests/test_pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								tests/test_pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,56 @@ | ||||
| import unittest | ||||
| import pickle | ||||
| import tempfile | ||||
| from detection.pickle_detection import pickleScanner, pickleDataDetection | ||||
| from unittest.mock import patch | ||||
|  | ||||
|  | ||||
| class TestPickleScanner(unittest.TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         # Create temporary files with valid and malicious data | ||||
|         self.valid_data = {"key": "value"} | ||||
|         self.malicious_data = b"\x80\x03csubprocess\ncheck_output\nq\x00X\x05\x00\x00\x00echo 1q\x01\x85q\x02Rq\x03." | ||||
|  | ||||
|         self.valid_file = tempfile.NamedTemporaryFile(delete=False) | ||||
|         self.valid_file.write(pickle.dumps(self.valid_data)) | ||||
|         self.valid_file.close() | ||||
|  | ||||
|         self.malicious_file = tempfile.NamedTemporaryFile(delete=False) | ||||
|         self.malicious_file.write(self.malicious_data) | ||||
|         self.malicious_file.close() | ||||
|  | ||||
|     def tearDown(self): | ||||
|         # Clean up temporary files | ||||
|         import os | ||||
|  | ||||
|         os.remove(self.valid_file.name) | ||||
|         os.remove(self.malicious_file.name) | ||||
|  | ||||
|     def test_valid_pickle(self): | ||||
|         with open(self.valid_file.name, "rb") as file: | ||||
|             scanner = pickleScanner(file) | ||||
|             print(scanner.maliciousModule) | ||||
|             scanner.load() | ||||
|         output = scanner.output() | ||||
|         self.assertEqual(output["ReduceCount"], 0) | ||||
|         self.assertEqual(output["maliciousModule"], []) | ||||
|  | ||||
|     def test_malicious_pickle(self): | ||||
|         with open(self.malicious_file.name, "rb") as file: | ||||
|             scanner = pickleScanner(file) | ||||
|             scanner.load() | ||||
|         output = scanner.output() | ||||
|         self.assertEqual(output["ReduceCount"], 1) | ||||
|         self.assertIn(("subprocess", "check_output"), output["maliciousModule"]) | ||||
|  | ||||
|     @patch("builtins.print") | ||||
|     def test_pickleDataDetection_no_output_file(self, mock_print): | ||||
|         # test output to stdout if filename is not given | ||||
|         with patch("builtins.print") as mock_print: | ||||
|             pickleDataDetection(self.valid_file.name) | ||||
|             mock_print.assert_called_once() | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     unittest.main() | ||||
		Reference in New Issue
	
	Block a user