480 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
from typing import Dict, List, Tuple, Optional
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
from detection.pickle_detection import pickleDataDetection
from .Regexdetection import find_dangerous_functions
from .GPTdetection import detectGPT
from .pyc_detection import disassemble_pyc
from .utils import *
import sys
from colorama import init, Fore, Style
from tqdm import tqdm
from pathlib import Path
PYCDC_FLAG = True
PYCDC_ADDR_FLAG = True
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"}
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
ORDERS = [
"__import__",
"system",
"exec",
"popen",
"eval",
"subprocess",
"__getattribute__",
"getattr",
"child_process",
]
# Initialize colorama
init(autoreset=True)
ORANGE = "\033[38;5;214m"
CYAN = Fore.CYAN
def supports_color() -> bool:
"""
Checks if the running terminal supports color output.
Returns:
bool: True if the terminal supports color, False otherwise.
"""
# Windows support
if sys.platform == "win32":
return True
# Check if output is a TTY (terminal)
if hasattr(sys.stdout, "isatty") and sys.stdout.isatty():
return True
return False
def supports_emoji() -> bool:
"""
Checks if the running terminal supports emoji output.
Returns:
bool: True if the terminal supports emoji, False otherwise.
"""
# This is a simple check. Modern terminals typically support emoji.
return sys.platform != "win32" or os.getenv("WT_SESSION") is not None
def highlight_orders(line: str, risk_level: str, use_color: bool) -> str:
"""
Highlights specific orders in the line based on risk level.
Args:
line (str): The line to highlight.
risk_level (str): The risk level of the line ("high", "medium", "low").
use_color (bool): Whether to use color for highlighting.
Returns:
str: The highlighted line.
"""
risk_colors = {
"high": Fore.RED,
"medium": Fore.YELLOW,
"low": CYAN,
}
color = risk_colors.get(risk_level, Fore.WHITE) if use_color else ""
reset = Style.RESET_ALL if use_color else ""
for order in ORDERS:
line = line.replace(order, f"{color}{order}{reset}")
return line
def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str:
"""
Generates a formatted text report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
Returns:
str: The formatted text report as a string.
"""
use_color = supports_color()
use_emoji = supports_emoji()
text_output = "Security Analysis Report\n"
text_output += "=" * 30 + "\n\n"
for risk_level, entries in results.items():
if entries and risk_level != "none":
risk_color = (
{
"high": Fore.RED,
"medium": Fore.YELLOW,
"low": Fore.GREEN,
}.get(risk_level, Fore.WHITE)
if use_color
else ""
)
risk_title = (
{
"High": "👹",
"Medium": "👾",
"Low": "👻",
}
if use_emoji
else {
"High": "",
"Medium": "",
"Low": "",
}
)
text_output += f"{risk_color}{risk_level.capitalize()} Risk{risk_title[risk_level.capitalize()]}:{Style.RESET_ALL if use_color else ''}\n"
text_output += "-" * (len(risk_level) + 6) + "\n"
for line_num, line in entries:
line = highlight_orders(line, risk_level, use_color)
line_text = f"{Style.RESET_ALL if use_color else ''} {Fore.GREEN if use_color else ''}{line_num}{Style.RESET_ALL if use_color else ''}: {line}{Style.RESET_ALL if use_color else ''}\n"
text_output += line_text
text_output += "\n"
return text_output
def output_results(
results: Dict[str, List[Tuple[int, str]]],
output_format: str,
output_file: Optional[str] = None,
) -> None:
"""
Outputs the security analysis results in the specified format.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
output_format (str): The format to output the results in. Supported formats: "pdf", "html", "md", "txt".
output_file (Optional[str]): The name of the file to save the output. If None, prints to the terminal.
"""
OUTPUT_FORMATS = {"pdf", "html", "md", "txt"}
if output_file:
file_name, file_ext = os.path.splitext(output_file)
if output_format not in OUTPUT_FORMATS:
output_format = "txt"
output_file = f"{file_name}.txt"
results_dir = os.path.dirname(output_file)
if not os.path.exists(results_dir) and results_dir != "":
os.makedirs(results_dir)
if output_format == "pdf":
output_pdf(results, output_file)
elif output_format == "html":
output_html(results, output_file)
elif output_format == "md":
output_markdown(results, output_file)
else: # Default to txt
output_text(results, output_file)
else:
# If no output file is specified, default to text output to the terminal.
txt_output = generate_text_content(results)
print(txt_output)
def output_pdf(results: Dict[str, List[Tuple[int, str]]], file_name):
doc = SimpleDocTemplate(file_name, pagesize=letter)
story = []
styles = getSampleStyleSheet()
# Add the title centered
title_style = styles["Title"]
title_style.alignment = 1 # Center alignment
title = Paragraph("Security Analysis Report", title_style)
story.append(title)
story.append(Spacer(1, 20)) # Space after title
# Add risk levels and entries
normal_style = styles["BodyText"]
for risk_level, entries in results.items():
if risk_level != "none":
story.append(
Paragraph(f"{risk_level.capitalize()} Risk:", styles["Heading2"])
)
for line_num, line in entries:
entry = Paragraph(f"Line {line_num}: {line}", normal_style)
story.append(entry)
story.append(Spacer(1, 12)) # Space between sections
doc.build(story)
def output_html(results: Dict[str, List[Tuple[int, str]]], file_name=None):
"""
Generates an HTML report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
file_name (Optional[str]): The name of the file to save the HTML output. If None, returns the HTML string.
Returns:
Optional[str]: The HTML string if file_name is None, otherwise None.
"""
html_output = """
<html>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
<title>Security Analysis Report</title>
<style>
body {
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
background-size: 100%, auto;
background-attachment: fixed;
font-family: Arial, sans-serif;
}
h1, h2 {
color: white;
}
ul {
list-style-type: none;
padding: 0;
}
li {
background: rgba(255, 255, 255, 0.8);
margin: 5px 0;
padding: 10px;
border-radius: 5px;
}
</style>
</head>
<body>
<h1>Security Analysis Report</h1>
"""
for risk_level, entries in results.items():
if risk_level != "none":
risk_title = {
"High": f"<h2>{risk_level.capitalize()} Risk👹</h2><ul>",
"Medium": f"<h2>{risk_level.capitalize()} Risk👾</h2><ul>",
"Low": f"<h2>{risk_level.capitalize()} Risk👻</h2><ul>",
}
html_output += risk_title[risk_level.capitalize()]
for line_num, line in entries:
html_output += f"<li>{line_num}: {line}</li>"
html_output += "</ul>"
html_output += "</body></html>"
if file_name:
with open(file_name, "w", encoding="utf-8") as file:
file.write(html_output)
return None
else:
return html_output
def output_markdown(results: Dict[str, List[Tuple[int, str]]], file_name=None):
"""
Generates a Markdown report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
file_name (Optional[str]): The name of the file to save the Markdown output. If None, returns the Markdown string.
Returns:
Optional[str]: The Markdown string if file_name is None, otherwise None.
"""
md_output = "# Security Analysis Report\n\n"
for risk_level, entries in results.items():
if risk_level != "none":
md_output += f"## {risk_level.capitalize()} Risk\n\n"
md_output += "| Line Number | Description |\n"
md_output += "|-------------|-------------|\n"
for line_num, line in entries:
md_output += f"| {line_num} | {line} |\n"
md_output += "\n"
if file_name:
with open(file_name, "w") as file:
file.write(md_output)
return None
else:
return md_output
def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None):
"""
Generates a plain text report for security analysis results.
Args:
results (Dict[str, List[Tuple[int, str]]]): The security analysis results categorized by risk levels.
file_name (Optional[str]): The name of the file to save the text output. If None, returns the text string.
Returns:
Optional[str]: The text string if file_name is None, otherwise None.
"""
text_output = "Security Analysis Report\n"
text_output += "=" * len("Security Analysis Report") + "\n\n"
for risk_level, entries in results.items():
if risk_level != "none":
text_output += f"{risk_level.capitalize()} Risk:\n"
text_output += "-" * len(f"{risk_level.capitalize()} Risk:") + "\n"
for line_num, line in entries:
text_output += f" Line {line_num}: {line}\n"
text_output += "\n"
if file_name:
with open(file_name, "w") as file:
file.write(text_output)
return None
else:
return text_output
def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: str):
# TODO:添加更多方式,这里提高代码的复用性和扩展性
if fileExtension == ".pyc":
# 反汇编pyc文件
file_content = disassemble_pyc(filePath, pycdc_addr)
if file_content == "none":
global PYCDC_FLAG
PYCDC_FLAG = False
return ""
elif file_content == "invalid":
global PYCDC_ADDR_FLAG
PYCDC_ADDR_FLAG = False
if mode == "regex":
return find_dangerous_functions(file_content, fileExtension)
elif mode == "llm":
return detectGPT(file_content)
else:
return find_dangerous_functions(file_content, fileExtension)
else:
file_content = read_file_content(filePath)
if mode == "regex":
return find_dangerous_functions(file_content, fileExtension)
elif mode == "llm":
return detectGPT(file_content)
else:
return find_dangerous_functions(file_content, fileExtension)
def process_path(
path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None
):
results = {"high": [], "medium": [], "low": [], "none": []}
if os.path.isdir(path):
# 使用rglob获取所有文件
all_files = [
file_path
for file_path in Path(path).rglob("*")
if file_path.suffix in SUPPORTED_EXTENSIONS
]
# 扫描动画
for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
file_extension = file_path.suffix
if file_extension in [".pkl",".pickle"]:
res = pickleDataDetection(str(file_path), output_file)
results["pickles"].append({
"file": str(file_path),
"result": res
})
continue
file_results = checkModeAndDetect(
mode, str(file_path), file_extension, pycdc_addr
)
if file_results is not None:
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{file_path}: Line {line_num}", line)
for line_num, line in file_results[key]
]
)
elif os.path.isfile(path):
file_extension = os.path.splitext(path)[1]
if file_extension in [".pkl", ".pickle"]:
res = pickleDataDetection(str(path), output_file)
results["pickles"].append({
"file": str(path),
"result": res
})
elif file_extension in SUPPORTED_EXTENSIONS:
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
if file_results is not None:
for key in file_results:
if key != "none": # Exclude 'none' risk level
results[key].extend(
[
(f"{path}: Line {line_num}", line)
for line_num, line in file_results[key]
]
)
else:
print("Unsupported file type.")
return
else:
print("Invalid path.")
sys.exit(1)
output_results(results, output_format, output_file)
def main():
import argparse
parser = argparse.ArgumentParser(
description="Backdoor detection tool.", prog="detection"
)
parser.add_argument("path", help="Path to the code to analyze")
parser.add_argument("-o", "--output", help="Output file path", default=None)
parser.add_argument(
"-m", "--mode", help="Mode of operation:[regex,llm]", default="regex"
)
parser.add_argument(
"-p",
"--pycdc",
help="Path to pycdc.exe to decompile",
default=os.getenv("pycdc"),
)
parser.add_argument(
"-P",
"--Pickle",
help="Path to pickle file to analyze",
default=None,
)
args = parser.parse_args()
output_format = "txt" # Default output format
output_file = None
if args.Pickle:
pickleDataDetection(args.Pickle, args.output)
return
if args.output:
_, ext = os.path.splitext(args.output)
ext = ext.lower()
if ext in [".html", ".md", ".txt", ".pdf"]:
output_format = ext.replace(".", "")
output_file = args.output
else:
print(
"Your input file format was incorrect, the output has been saved as a TXT file."
)
output_file = args.output.rsplit(".", 1)[0] + ".txt"
# 如果未指定输出文件,则输出到 stdout否则写入文件
process_path(args.path, output_format, args.mode, args.pycdc, output_file)
if PYCDC_FLAG == False:
print(
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."
)
print("Repo: https://github.com/zrax/pycdc.git")
if PYCDC_ADDR_FLAG == False:
print("ERROR: The specified pycdc.exe path is not valid")
print("Please check your pycdc path.")
if __name__ == "__main__":
main()