feature/pickle-data #20
							
								
								
									
										204
									
								
								detection/backdoor_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										204
									
								
								detection/backdoor_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,204 @@ | ||||
| import os | ||||
| from typing import Dict, List, Tuple | ||||
| from reportlab.lib.pagesizes import letter | ||||
| from reportlab.lib.styles import getSampleStyleSheet | ||||
| from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate | ||||
|  | ||||
| from detection.pickle_detection import pickleDataDetection | ||||
| from .Regexdetection import find_dangerous_functions | ||||
| from .GPTdetection import detectGPT | ||||
| from .utils import * | ||||
| import sys | ||||
|  | ||||
| SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"} | ||||
| OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] | ||||
|  | ||||
|  | ||||
| def generate_text_content(results): | ||||
|     text_output = "Security Analysis Report\n" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if entries and risk_level != "none": | ||||
|             text_output += f"{risk_level.capitalize()} Risk:\n" | ||||
|             for line_num, line in entries: | ||||
|                 text_output += f"  Line {line_num}: {line}\n" | ||||
|     return text_output | ||||
|  | ||||
|  | ||||
| def output_results(results, output_format, output_file=None): | ||||
|     if output_file: | ||||
|         file_name = os.path.splitext(output_file) | ||||
|         if output_format not in OUTPUT_FORMATS: | ||||
|             output_format = "txt" | ||||
|             output_file = f"{file_name}.txt" | ||||
|         results_dir = os.path.dirname(output_file) | ||||
|         if not os.path.exists(results_dir): | ||||
|             os.makedirs(results_dir) | ||||
|         if output_format == "pdf": | ||||
|             output_pdf(results, output_file) | ||||
|         elif output_format == "html": | ||||
|             output_html(results, output_file) | ||||
|         elif output_format == "md": | ||||
|             output_markdown(results, output_file) | ||||
|         else:  # Default to txt | ||||
|             output_text(results, output_file) | ||||
|     else: | ||||
|         # If no output file is specified, default to text output to the terminal. | ||||
|         txt_output = generate_text_content(results) | ||||
|         print(txt_output) | ||||
|  | ||||
|  | ||||
| def output_pdf(results: Dict[str, List[Tuple[int, str]]], file_name): | ||||
|     doc = SimpleDocTemplate(file_name, pagesize=letter) | ||||
|     story = [] | ||||
|     styles = getSampleStyleSheet() | ||||
|  | ||||
|     # Add the title centered | ||||
|     title_style = styles["Title"] | ||||
|     title_style.alignment = 1  # Center alignment | ||||
|     title = Paragraph("Security Analysis Report", title_style) | ||||
|     story.append(title) | ||||
|     story.append(Spacer(1, 20))  # Space after title | ||||
|  | ||||
|     # Add risk levels and entries | ||||
|     normal_style = styles["BodyText"] | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             story.append( | ||||
|                 Paragraph(f"{risk_level.capitalize()} Risk:", styles["Heading2"]) | ||||
|             ) | ||||
|             for line_num, line in entries: | ||||
|                 entry = Paragraph(f"Line {line_num}: {line}", normal_style) | ||||
|                 story.append(entry) | ||||
|             story.append(Spacer(1, 12))  # Space between sections | ||||
|  | ||||
|     doc.build(story) | ||||
|  | ||||
|  | ||||
| def output_html(results: Dict[str, List[Tuple[int, str]]], file_name=None): | ||||
|     html_output = "<html><head><title>Security Analysis Report</title></head><body>" | ||||
|     html_output += "<h1>Security Analysis Report</h1>" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             html_output += f"<h2>{risk_level.capitalize()} Risk</h2><ul>" | ||||
|             for line_num, line in entries: | ||||
|                 html_output += f"<li>{line_num}: {line}</li>" | ||||
|             html_output += "</ul>" | ||||
|     html_output += "</body></html>" | ||||
|     if file_name: | ||||
|         with open(file_name, "w") as file: | ||||
|             file.write(html_output) | ||||
|     else: | ||||
|         return html_output | ||||
|  | ||||
|  | ||||
| def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name=None): | ||||
|     md_output = "# Security Analysis Report\n" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             md_output += f"## {risk_level.capitalize()} Risk\n" | ||||
|             for line_num, line in entries: | ||||
|                 md_output += f"- {line_num}: {line}\n" | ||||
|     if file_name: | ||||
|         with open(file_name, "w") as file: | ||||
|             file.write(md_output) | ||||
|     else: | ||||
|         return md_output | ||||
|  | ||||
|  | ||||
| def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None): | ||||
|     text_output = "Security Analysis Report\n" | ||||
|     for risk_level, entries in results.items(): | ||||
|         if risk_level != "none": | ||||
|             text_output += f"{risk_level.capitalize()} Risk:\n" | ||||
|             for line_num, line in entries: | ||||
|                 text_output += f"  {line_num}: {line}\n" | ||||
|     if file_name: | ||||
|         with open(file_name, "w") as file: | ||||
|             file.write(text_output) | ||||
|     else: | ||||
|         return text_output | ||||
|  | ||||
|  | ||||
| def checkModeAndDetect(mode: str, filePath: str, fileExtension: str): | ||||
|     # TODO:添加更多方式,这里提高代码的复用性和扩展性 | ||||
|     if mode == "regex": | ||||
|         return find_dangerous_functions(read_file_content(filePath), fileExtension) | ||||
|     elif mode == "llm": | ||||
|         return detectGPT(read_file_content(filePath)) | ||||
|     else: | ||||
|         return find_dangerous_functions(read_file_content(filePath), fileExtension) | ||||
|  | ||||
|  | ||||
| def process_path(path: str, output_format: str, mode: str, output_file=None): | ||||
|     results = {"high": [], "medium": [], "low": [], "none": []} | ||||
|     if os.path.isdir(path): | ||||
|         for root, dirs, files in os.walk(path): | ||||
|             for file in files: | ||||
|                 file_extension = os.path.splitext(file)[1] | ||||
|                 if file_extension in SUPPORTED_EXTENSIONS: | ||||
|                     file_path = os.path.join(root, file) | ||||
|  | ||||
|                     file_results = checkModeAndDetect(mode, file_path, file_extension) | ||||
|                     for key in file_results: | ||||
|                         if key != "none":  # Exclude 'none' risk level | ||||
|                             results[key].extend( | ||||
|                                 [ | ||||
|                                     (f"{file_path}: Line {line_num}", line) | ||||
|                                     for line_num, line in file_results[key] | ||||
|                                 ] | ||||
|                             ) | ||||
|     elif os.path.isfile(path): | ||||
|         file_extension = os.path.splitext(path)[1] | ||||
|         if file_extension in SUPPORTED_EXTENSIONS: | ||||
|             file_results = checkModeAndDetect(mode, path, file_extension) | ||||
|             for key in file_results: | ||||
|                 if key != "none":  # Exclude 'none' risk level | ||||
|                     results[key].extend( | ||||
|                         [ | ||||
|                             (f"{path}: Line {line_num}", line) | ||||
|                             for line_num, line in file_results[key] | ||||
|                         ] | ||||
|                     ) | ||||
|         else: | ||||
|             print("Unsupported file type.") | ||||
|             return | ||||
|     else: | ||||
|         print("Invalid path.") | ||||
|         sys.exit(1) | ||||
|  | ||||
|     output_results(results, output_format, output_file) | ||||
|  | ||||
|  | ||||
| def main(): | ||||
|     import argparse | ||||
|  | ||||
|     parser = argparse.ArgumentParser(description="Backdoor detection tool.") | ||||
|     parser.add_argument("path", help="Path to the code to analyze") | ||||
|     parser.add_argument("-o", "--output", help="Output file path", default=None) | ||||
|     parser.add_argument( | ||||
|         "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" | ||||
|     ) | ||||
|     parser.add_argument("-p", "--pickle", help="analyze the pickle file", default=None) | ||||
|     args = parser.parse_args() | ||||
|     output_format = "txt"  # Default output format | ||||
|     output_file = None | ||||
|     if args.output: | ||||
|         _, ext = os.path.splitext(args.output) | ||||
|         ext = ext.lower() | ||||
|         if ext in [".html", ".md", ".txt", ".pdf"]: | ||||
|             output_format = ext.replace(".", "") | ||||
|             output_file = args.output | ||||
|         else: | ||||
|             print( | ||||
|                 "Your input file format was incorrect, the output has been saved as a TXT file." | ||||
|             ) | ||||
|             output_file = args.output.rsplit(".", 1)[0] + ".txt" | ||||
|     # 如果未指定输出文件,则输出到 stdout;否则写入文件 | ||||
|     if args.pickle: | ||||
|         pickleDataDetection(args.pickle, output_file) | ||||
|     else: | ||||
|         process_path(args.path, output_format, args.mode, output_file) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     main() | ||||
							
								
								
									
										153
									
								
								detection/pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										153
									
								
								detection/pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,153 @@ | ||||
| import io | ||||
| import json | ||||
| import pickle | ||||
|  | ||||
|  | ||||
| class _Unframer: | ||||
|  | ||||
|     def __init__(self, file_read, file_readline, file_tell=None): | ||||
|         self.file_read = file_read | ||||
|         self.file_readline = file_readline | ||||
|         self.current_frame = None | ||||
|  | ||||
|     def readinto(self, buf): | ||||
|         if self.current_frame: | ||||
|             n = self.current_frame.readinto(buf) | ||||
|             if n == 0 and len(buf) != 0: | ||||
|                 self.current_frame = None | ||||
|                 n = len(buf) | ||||
|                 buf[:] = self.file_read(n) | ||||
|                 return n | ||||
|             if n < len(buf): | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return n | ||||
|         else: | ||||
|             n = len(buf) | ||||
|             buf[:] = self.file_read(n) | ||||
|             return n | ||||
|  | ||||
|     def read(self, n): | ||||
|         if self.current_frame: | ||||
|             data = self.current_frame.read(n) | ||||
|             if not data and n != 0: | ||||
|                 self.current_frame = None | ||||
|                 return self.file_read(n) | ||||
|             if len(data) < n: | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return data | ||||
|         else: | ||||
|             return self.file_read(n) | ||||
|  | ||||
|     def readline(self): | ||||
|         if self.current_frame: | ||||
|             data = self.current_frame.readline() | ||||
|             if not data: | ||||
|                 self.current_frame = None | ||||
|                 return self.file_readline() | ||||
|             if data[-1] != b"\n"[0]: | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return data | ||||
|         else: | ||||
|             return self.file_readline() | ||||
|  | ||||
|     def load_frame(self, frame_size): | ||||
|         if self.current_frame and self.current_frame.read() != b"": | ||||
|             raise pickle.UnpicklingError( | ||||
|                 "beginning of a new frame before end of current frame" | ||||
|             ) | ||||
|         self.current_frame = io.BytesIO(self.file_read(frame_size)) | ||||
|  | ||||
|  | ||||
| dangerous_modules = ["os", "subprocess", "builtins", "nt"] | ||||
| dangerous_names = [ | ||||
|     "system", | ||||
|     "popen", | ||||
|     "run", | ||||
|     "call", | ||||
|     "check_output", | ||||
|     "check_call", | ||||
| ] | ||||
|  | ||||
|  | ||||
| class pickleScanner: | ||||
|  | ||||
|     def __init__( | ||||
|         self, file, *, fix_imports=True, encoding="ASCII", errors="strict", buffers=None | ||||
|     ): | ||||
|         self._buffers = iter(buffers) if buffers is not None else None | ||||
|         self._file_readline = file.readline | ||||
|         self._file_read = file.read | ||||
|         self.memo = {} | ||||
|         self.encoding = encoding | ||||
|         self.errors = errors | ||||
|         self.proto = 0 | ||||
|         self.fix_imports = fix_imports | ||||
|         self.file = file | ||||
|         self.ReduceCount = 0 | ||||
|         self.maliciousModule = [] | ||||
|  | ||||
|     def find_class(self, module, name): | ||||
|         if module.decode() in dangerous_modules or name.decode() in dangerous_names: | ||||
|             # self.maliciousCount += 1 | ||||
|             self.maliciousModule.append((module.decode(), name.decode())) | ||||
|  | ||||
|     def load(self): | ||||
|         self._unframer = _Unframer(self._file_read, self._file_readline) | ||||
|         self.read = self._unframer.read | ||||
|         self.readinto = self._unframer.readinto | ||||
|         self.readline = self._unframer.readline | ||||
|         self.seek = self.file.seek | ||||
|         self.metastack = [] | ||||
|         self.stack = [] | ||||
|         self.append = self.stack.append | ||||
|         self.proto = 0 | ||||
|         # 扫描所有的opcodes | ||||
|         opcode = self.read(1) | ||||
|         while opcode: | ||||
|             if opcode == b"c": | ||||
|                 self.seek(-2, 1) | ||||
|                 codeN1 = self.read(1) | ||||
|                 if ( | ||||
|                     65 <= ord(codeN1) <= 90 | ||||
|                     or 97 <= ord(codeN1) <= 122 | ||||
|                     or ord(codeN1) == 0 | ||||
|                 ): | ||||
|                     self.read(1) | ||||
|                 else: | ||||
|                     self.read(1) | ||||
|                     module = self.readline()[:-1] | ||||
|                     name = self.readline()[:-1] | ||||
|                     self.find_class(module, name) | ||||
|             elif opcode in self.unsafe_opcodes: | ||||
|                 self.ReduceCount += 1 | ||||
|             opcode = self.read(1) | ||||
|  | ||||
|     unsafe_opcodes = { | ||||
|         b"r",  # REDUCE - call a callable with arguments | ||||
|         b"R",  # REDUCE - same as 'r', but for args tuple | ||||
|     } | ||||
|  | ||||
|     def output(self) -> dict: | ||||
|         return { | ||||
|             "ReduceCount": self.ReduceCount, | ||||
|             "maliciousModule": self.maliciousModule, | ||||
|         } | ||||
|  | ||||
|  | ||||
| def pickleDataDetection(filename: str, output_file=None): | ||||
|     """ | ||||
|     :param file: pickle file path | ||||
|     """ | ||||
|     with open(filename, "rb") as file: | ||||
|         pickscan = pickleScanner(file) | ||||
|         pickscan.load() | ||||
|     res = pickscan.output() | ||||
|     if output_file: | ||||
|         with open(output_file, "w") as file: | ||||
|             json.dump(res, file, indent=4) | ||||
|     else: | ||||
|         print(json.dumps(res)) | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     pickleDataDetection("test.pkl") | ||||
							
								
								
									
										56
									
								
								tests/test_pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										56
									
								
								tests/test_pickle_detection.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,56 @@ | ||||
| import unittest | ||||
| import pickle | ||||
| import tempfile | ||||
| from detection.pickle_detection import pickleScanner, pickleDataDetection | ||||
| from unittest.mock import patch | ||||
|  | ||||
|  | ||||
| class TestPickleScanner(unittest.TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         # Create temporary files with valid and malicious data | ||||
|         self.valid_data = {"key": "value"} | ||||
|         self.malicious_data = b"\x80\x03csubprocess\ncheck_output\nq\x00X\x05\x00\x00\x00echo 1q\x01\x85q\x02Rq\x03." | ||||
|  | ||||
|         self.valid_file = tempfile.NamedTemporaryFile(delete=False) | ||||
|         self.valid_file.write(pickle.dumps(self.valid_data)) | ||||
|         self.valid_file.close() | ||||
|  | ||||
|         self.malicious_file = tempfile.NamedTemporaryFile(delete=False) | ||||
|         self.malicious_file.write(self.malicious_data) | ||||
|         self.malicious_file.close() | ||||
|  | ||||
|     def tearDown(self): | ||||
|         # Clean up temporary files | ||||
|         import os | ||||
|  | ||||
|         os.remove(self.valid_file.name) | ||||
|         os.remove(self.malicious_file.name) | ||||
|  | ||||
|     def test_valid_pickle(self): | ||||
|         with open(self.valid_file.name, "rb") as file: | ||||
|             scanner = pickleScanner(file) | ||||
|             print(scanner.maliciousModule) | ||||
|             scanner.load() | ||||
|         output = scanner.output() | ||||
|         self.assertEqual(output["ReduceCount"], 0) | ||||
|         self.assertEqual(output["maliciousModule"], []) | ||||
|  | ||||
|     def test_malicious_pickle(self): | ||||
|         with open(self.malicious_file.name, "rb") as file: | ||||
|             scanner = pickleScanner(file) | ||||
|             scanner.load() | ||||
|         output = scanner.output() | ||||
|         self.assertEqual(output["ReduceCount"], 1) | ||||
|         self.assertIn(("subprocess", "check_output"), output["maliciousModule"]) | ||||
|  | ||||
|     @patch("builtins.print") | ||||
|     def test_pickleDataDetection_no_output_file(self, mock_print): | ||||
|         # test output to stdout if filename is not given | ||||
|         with patch("builtins.print") as mock_print: | ||||
|             pickleDataDetection(self.valid_file.name) | ||||
|             mock_print.assert_called_once() | ||||
|  | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     unittest.main() | ||||
		Reference in New Issue
	
	Block a user