diff --git a/detection/Regexdetection.py b/detection/Regexdetection.py index 2daa291..0ad2188 100644 --- a/detection/Regexdetection.py +++ b/detection/Regexdetection.py @@ -34,6 +34,7 @@ def find_dangerous_functions( r"\bos\.kill\b": "high", r"\bos\.popen\b": "medium", r"\bos\.spawn\b": "medium", + r"\bsubprocess": "medium", }, } risk_patterns = patterns.get(file_extension, {}) @@ -43,7 +44,9 @@ def find_dangerous_functions( clean_line = remove_comments(line, file_extension) if not clean_line: continue + # 消除换行符,避免影响正则匹配 + clean_line = clean_line.replace("\\n", "") for pattern, risk_level in risk_patterns.items(): - if re.search(pattern, clean_line): + if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL): classified_results[risk_level].append((line_number, clean_line)) return classified_results diff --git a/detection/__main__.py b/detection/__main__.py index 3b5aedb..47d99e6 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -6,8 +6,11 @@ from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection + +from .requirements_detection import requirement_detection from .Regexdetection import find_dangerous_functions -from .GPTdetection import detectGPT,GPTdetectFileList +from .GPTdetection import detectGPT, GPTdetectFileList + # from .cngptdetection import detectGPT,GPTdetectFileList from .pyc_detection import disassemble_pyc from .utils import * @@ -30,6 +33,8 @@ ORDERS = [ "__getattribute__", "getattr", "child_process", + "kill", + "fork", ] # Initialize colorama @@ -147,8 +152,6 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: text_output += line_text text_output += "\n" - - return text_output @@ -371,9 +374,14 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: def process_path( - path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None + path: str, + output_format: str, + mode: str, + pycdc_addr: str, + output_file=None, + requirement_path=None, ): - results = {"high": [], "medium": [], "low": [], "none": [],"pickles": []} + results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []} if os.path.isdir(path): # 使用rglob获取所有文件 all_files = [ @@ -385,17 +393,14 @@ def process_path( if mode == "llm": results = GPTdetectFileList(all_files) else: - # 扫描动画 + # 扫描动画 for file_path in tqdm(all_files, desc="Scanning files", unit="file"): file_extension = file_path.suffix # print(file_extension) if file_extension in [".pkl",".pickle"]: # print("识别到pickle") res = pickleDataDetection(str(file_path), output_file) - results["pickles"].append({ - "file": str(file_path), - "result": res - }) + results["pickles"].append({"file": str(file_path), "result": res}) continue file_results = checkModeAndDetect( mode, str(file_path), file_extension, pycdc_addr @@ -413,10 +418,7 @@ def process_path( file_extension = os.path.splitext(path)[1] if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(path), output_file) - results["pickles"].append({ - "file": str(path), - "result": res - }) + results["pickles"].append({"file": str(path), "result": res}) elif file_extension in SUPPORTED_EXTENSIONS: file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) if file_results is not None: @@ -434,7 +436,8 @@ def process_path( else: print("Invalid path.") sys.exit(1) - + if requirement_path is not None: + requirement_detection(requirement_path, output_file) output_results(results, output_format, output_file) @@ -455,6 +458,18 @@ def main(): help="Path to pycdc.exe to decompile", default=os.getenv("PATH"), ) + parser.add_argument( + "-P", + "--Pickle", + help="Path to pickle file to analyze", + default=None, + ) + parser.add_argument( + "-r", + "--requirement", + help="Path to requirement file to analyze", + default=None, + ) args = parser.parse_args() output_format = "txt" # Default output format output_file = None @@ -470,7 +485,9 @@ def main(): ) output_file = args.output.rsplit(".", 1)[0] + ".txt" # 如果未指定输出文件,则输出到 stdout;否则写入文件 - process_path(args.path, output_format, args.mode, args.pycdc, output_file) + process_path( + args.path, output_format, args.mode, args.pycdc, output_file, args.requirement + ) if PYCDC_FLAG == False: print( "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." diff --git a/detection/requirements_detection.py b/detection/requirements_detection.py index 5a1c78f..c1c3538 100644 --- a/detection/requirements_detection.py +++ b/detection/requirements_detection.py @@ -3,6 +3,16 @@ import requests from bs4 import BeautifulSoup from packaging.version import Version, InvalidVersion import sys +from reportlab.lib.pagesizes import letter +from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer +from colorama import Fore, Style, init +from tqdm import tqdm +import html +import os + + +init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色 def fetch_html(url: str) -> str: @@ -55,7 +65,6 @@ def version_in_range(version, range_str: str) -> bool: except InvalidVersion: return False else: - # 如果没有给版本号,默认使用最新版本 if range_str[-2] == ",": return True @@ -77,64 +86,183 @@ def version_in_range(version, range_str: str) -> bool: return True -def check_vulnerabilities(requirements: list, base_url: str, output_file: str): - with open(output_file, "w") as out_file: - for req in requirements: - version = "" - # 如果有版本 - if "==" in req: - package_name, version = req.split("==") - # 没有版本 - else: - package_name, version = req, None - # 拼接URL - url = f"{base_url}{package_name}" - print(f"Fetching data for {package_name} from {url}") - html_content = fetch_html(url) - if html_content: - # 解析hmtl - extracted_data = parse_html(html_content) - if extracted_data: - relevant_vulns = [] - for vuln in extracted_data: - if version_in_range(version, vuln["chip"]): - relevant_vulns.append(vuln) - if relevant_vulns: - out_file.write(f"Vulnerabilities found for {package_name}:\n") - for vuln in relevant_vulns: - out_file.write(f" - {vuln['link']}\n") - out_file.write("\n") - else: - print(f"No relevant data found for {package_name}.") - else: - print(f"Failed to fetch data for {package_name}.") +def check_vulnerabilities(requirements: list, base_url: str) -> str: + results = [] + for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"): + version = "" + if "==" in req: + package_name, version = req.split("==") + else: + package_name, version = req, None + url = f"{base_url}{package_name}" + # print(f"\nFetching data for {package_name} from {url}") + html_content = fetch_html(url) + if html_content: + extracted_data = parse_html(html_content) + if extracted_data: + relevant_vulns = [] + for vuln in extracted_data: + if version_in_range(version, vuln["chip"]): + relevant_vulns.append(vuln) + if relevant_vulns: + result = f"Vulnerabilities found for {package_name}:\n" + for vuln in relevant_vulns: + result += f" - {vuln['link']}\n" + results.append(result) + return "\n".join(results) -def main(): - parser = argparse.ArgumentParser( - description="Check project dependencies for vulnerabilities." - ) - parser.add_argument( - "-r", - "--requirement", - help="Path to the requirements file of the project", - required=True, - ) - parser.add_argument( - "-o", - "--output", - help="Output file path with extension, e.g., './output/report.txt'", - required=True, - ) - args = parser.parse_args() +def save_to_file(output_path: str, data: str): + if output_path.endswith(".html"): + save_as_html(output_path, data) + elif output_path.endswith(".pdf"): + save_as_pdf(output_path, data) + elif output_path.endswith(".md"): + save_as_markdown(output_path, data) + else: + save_as_txt(output_path, data) + +def save_as_html(output_path: str, data: str): + escaped_data = html.escape(data) + html_content = f""" + +
+ + + +{escaped_data}+