feat: 实现对文件夹进行递归检测;支持html,txt,md等多种输出方式;修改单元测试;支持检测多种语言;添加等级-none;
This commit is contained in:
		| @@ -1,21 +1,13 @@ | |||||||
| """ | import os | ||||||
| Usage: python backdoor_detection.py your_file_path |  | ||||||
| """ |  | ||||||
|  |  | ||||||
| import re | import re | ||||||
| from typing import List, Tuple, Dict |  | ||||||
| import sys | import sys | ||||||
|  | from typing import Dict, List, Tuple | ||||||
|  |  | ||||||
|  | SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"} | ||||||
|  | OUTPUT_FORMATS = ["html", "md", "txt"] | ||||||
|  |  | ||||||
|  |  | ||||||
| def read_file_content(file_path: str) -> str: | def read_file_content(file_path: str) -> str: | ||||||
|     """ |  | ||||||
|     Reads and returns the content of a specified file. Exits the program with an error if the file does not exist or cannot be read. |  | ||||||
|  |  | ||||||
|     :param file_path: The full path to the file. |  | ||||||
|     :return: The text content of the file. |  | ||||||
|     :raises FileNotFoundError: If the file does not exist. |  | ||||||
|     :raises IOError: If the file cannot be read. |  | ||||||
|     """ |  | ||||||
|     try: |     try: | ||||||
|         with open(file_path, "r", encoding="utf-8") as file: |         with open(file_path, "r", encoding="utf-8") as file: | ||||||
|             return file.read() |             return file.read() | ||||||
| @@ -27,61 +19,150 @@ def read_file_content(file_path: str) -> str: | |||||||
|         sys.exit(1) |         sys.exit(1) | ||||||
|  |  | ||||||
|  |  | ||||||
| def find_dangerous_functions(file_content: str) -> Dict[str, List[Tuple[int, str]]]: | def remove_comments(code: str, extension: str) -> str: | ||||||
|     """ |     if extension == ".py": | ||||||
|     Searches the given code text for potentially dangerous function calls and classifies results by risk level. |         return code.split("#")[0].strip() | ||||||
|     Ignores comments in the code. |     elif extension in {".js", ".cpp"}: | ||||||
|  |         code = re.sub(r"//.*", "", code) | ||||||
|  |         code = re.sub(r"/\*.*?\*/", "", code, flags=re.DOTALL) | ||||||
|  |         return code.strip() | ||||||
|  |     return code.strip() | ||||||
|  |  | ||||||
|     :param file_content: String content of the code file. |  | ||||||
|     :return: Dictionary with risk levels as keys and lists of tuples (line number, matched line content) as values. | def find_dangerous_functions( | ||||||
|     """ |     file_content: str, file_extension: str | ||||||
|     # Define dangerous functions and their risk levels | ) -> Dict[str, List[Tuple[int, str]]]: | ||||||
|     patterns: Dict[str, str] = { |     patterns = { | ||||||
|  |         ".py": { | ||||||
|             r"\bsystem\(": "high", |             r"\bsystem\(": "high", | ||||||
|             r"\bexec\(": "high", |             r"\bexec\(": "high", | ||||||
|             r"\bpopen\(": "medium", |             r"\bpopen\(": "medium", | ||||||
|             r"\beval\(": "high", |             r"\beval\(": "high", | ||||||
|             r"\bsubprocess\.run\(": "medium", |             r"\bsubprocess\.run\(": "medium", | ||||||
|  |         }, | ||||||
|  |         ".js": { | ||||||
|  |             r"\beval\(": "high", | ||||||
|  |             r"\bexec\(": "high", | ||||||
|  |             r"\bchild_process\.exec\(": "high", | ||||||
|  |         }, | ||||||
|  |         ".cpp": { | ||||||
|  |             r"\bsystem\(": "high", | ||||||
|  |         }, | ||||||
|     } |     } | ||||||
|     # Store results classified by risk level |     risk_patterns = patterns.get(file_extension, {}) | ||||||
|     classified_results = {"high": [], "medium": [], "low": []} |     classified_results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|     for line_number, line in enumerate(file_content.split("\n"), start=1): |     for line_number, line in enumerate(file_content.split("\n"), start=1): | ||||||
|         # Remove comments from the line |         clean_line = remove_comments(line, file_extension) | ||||||
|         clean_line = line.split("#")[0].strip() |         if not clean_line: | ||||||
|         if not clean_line:  # Skip empty or comment-only lines |  | ||||||
|             continue |             continue | ||||||
|         found = False |         found = False | ||||||
|         for pattern, risk_level in patterns.items(): |         for pattern, risk_level in risk_patterns.items(): | ||||||
|             if re.search(pattern, clean_line): |             if re.search(pattern, clean_line): | ||||||
|                 classified_results[risk_level].append((line_number, clean_line)) |                 classified_results[risk_level].append((line_number, clean_line)) | ||||||
|                 found = True |                 found = True | ||||||
|                 break  # Stop checking other patterns once a match is found |                 break | ||||||
|  |         if not found: | ||||||
|  |             classified_results["none"].append((line_number, clean_line)) | ||||||
|     return classified_results |     return classified_results | ||||||
|  |  | ||||||
|  |  | ||||||
| def main(file_path: str): | def output_results( | ||||||
|     """ |     results: Dict[str, List[Tuple[int, str]]], output_format: str, file_path: str | ||||||
|     Main function that reads file content, checks for dangerous functions, and outputs classified results by risk level. | ): | ||||||
|  |     # Create the 'results' directory if it does not exist | ||||||
|  |     results_dir = "../results" | ||||||
|  |     if not os.path.exists(results_dir): | ||||||
|  |         os.makedirs(results_dir) | ||||||
|  |  | ||||||
|     :param file_path: File path input from the command line. |     base_name = os.path.basename(file_path) | ||||||
|     """ |     output_file = os.path.join( | ||||||
|     file_content = read_file_content(file_path) |         results_dir, f"{os.path.splitext(base_name)[0]}.{output_format}" | ||||||
|     classified_dangerous = find_dangerous_functions(file_content) |     ) | ||||||
|     for risk_level in [ |  | ||||||
|         "high", |     if output_format == "html": | ||||||
|         "medium", |         output_html(results, output_file) | ||||||
|     ]:  # Only iterate over high and medium risk levels |     elif output_format == "md": | ||||||
|         occurrences = classified_dangerous[risk_level] |         output_markdown(results, output_file) | ||||||
|         if occurrences: |     elif output_format == "txt": | ||||||
|             print(f"Dangerous functions found at risk level {risk_level}:") |         output_text(results, output_file) | ||||||
|             for line_num, func in occurrences: |  | ||||||
|                 print(f"  Line {line_num}: {func}") |  | ||||||
|  | def output_html(results: Dict[str, List[Tuple[int, str]]], file_name: str): | ||||||
|  |     html_output = f"<html><head><title>Analysis of {file_name}</title></head><body>" | ||||||
|  |     html_output += "<h1>Security Analysis Report</h1>" | ||||||
|  |     for risk_level, entries in results.items(): | ||||||
|  |         html_output += f"<h2>{risk_level.capitalize()} Risk</h2><ul>" | ||||||
|  |         for line_num, line in entries: | ||||||
|  |             html_output += f"<li>Line {line_num}: {line}</li>" | ||||||
|  |         html_output += "</ul>" | ||||||
|  |     html_output += "</body></html>" | ||||||
|  |     with open(file_name, "w") as file: | ||||||
|  |         file.write(html_output) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name: str): | ||||||
|  |     md_output = f"# Security Analysis Report for {file_name}\n" | ||||||
|  |     for risk_level, entries in results.items(): | ||||||
|  |         md_output += f"## {risk_level.capitalize()} Risk\n" | ||||||
|  |         for line_num, line in entries: | ||||||
|  |             md_output += f"- Line {line_num}: {line}\n" | ||||||
|  |     with open(file_name, "w") as file: | ||||||
|  |         file.write(md_output) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def output_text(results: Dict[str, List[Tuple[int, str]]], file_name: str): | ||||||
|  |     text_output = f"Security Analysis Report for {file_name}\n" | ||||||
|  |     for risk_level, entries in results.items(): | ||||||
|  |         text_output += f"{risk_level.capitalize()} Risk:\n" | ||||||
|  |         for line_num, line in entries: | ||||||
|  |             text_output += f"  Line {line_num}: {line}\n" | ||||||
|  |     with open(file_name, "w") as file: | ||||||
|  |         file.write(text_output) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def process_path(path: str, output_format: str): | ||||||
|  |     if os.path.isdir(path): | ||||||
|  |         for root, dirs, files in os.walk(path): | ||||||
|  |             for file in files: | ||||||
|  |                 file_extension = os.path.splitext(file)[1] | ||||||
|  |                 if file_extension in SUPPORTED_EXTENSIONS: | ||||||
|  |                     file_path = os.path.join(root, file) | ||||||
|  |                     print(f"Processing {file_path}...") | ||||||
|  |                     file_results = find_dangerous_functions( | ||||||
|  |                         read_file_content(file_path), file_extension | ||||||
|  |                     ) | ||||||
|  |                     output_results(file_results, output_format, file_path) | ||||||
|  |     elif os.path.isfile(path): | ||||||
|  |         file_extension = os.path.splitext(path)[1] | ||||||
|  |         if file_extension in SUPPORTED_EXTENSIONS: | ||||||
|  |             file_results = find_dangerous_functions( | ||||||
|  |                 read_file_content(path), file_extension | ||||||
|  |             ) | ||||||
|  |             output_results(file_results, output_format, path) | ||||||
|         else: |         else: | ||||||
|             print(f"No dangerous functions found at risk level {risk_level}.") |             print("Unsupported file type.") | ||||||
|  |     else: | ||||||
|  |         print("Invalid path.") | ||||||
|  |         sys.exit(1) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def test(): | ||||||
|  |     print("hello world") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def main(): | ||||||
|  |     if len(sys.argv) < 3: | ||||||
|  |         print("Usage: python backdoor_detection.py <path> <output_format>") | ||||||
|  |         sys.exit(1) | ||||||
|  |     path = sys.argv[1] | ||||||
|  |     output_format = sys.argv[2] | ||||||
|  |     if output_format not in OUTPUT_FORMATS: | ||||||
|  |         print( | ||||||
|  |             f"Unsupported output format. Supported formats are: {', '.join(OUTPUT_FORMATS)}" | ||||||
|  |         ) | ||||||
|  |         sys.exit(1) | ||||||
|  |     process_path(path, output_format) | ||||||
|  |  | ||||||
|  |  | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|     if len(sys.argv) < 2: |     main() | ||||||
|         print("Usage: python script.py <file_path>") |  | ||||||
|         sys.exit(1) |  | ||||||
|     main(sys.argv[1]) |  | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user