From df65fff2c7335a9519ffef0f4437cb8e0a342b10 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Fri, 31 May 2024 20:33:47 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E5=AF=B9python=203.1?= =?UTF-8?q?1=E7=9A=84=E5=8F=8D=E7=BC=96=E8=AF=91=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/Regexdetection.py | 15 ++++++----- detection/__main__.py | 52 +++++++++++++++++++++++++------------ detection/pyc_detection.py | 18 +++++-------- 3 files changed, 50 insertions(+), 35 deletions(-) diff --git a/detection/Regexdetection.py b/detection/Regexdetection.py index 39f6280..4296b49 100644 --- a/detection/Regexdetection.py +++ b/detection/Regexdetection.py @@ -38,11 +38,12 @@ def find_dangerous_functions( } risk_patterns = patterns.get(file_extension, {}) classified_results = {"high": [], "medium": [], "low": [], "none": []} - for line_number, line in enumerate(file_content.split("\n"), start=1): - clean_line = remove_comments(line, file_extension) - if not clean_line: - continue - for pattern, risk_level in risk_patterns.items(): - if re.search(pattern, clean_line): - classified_results[risk_level].append((line_number, clean_line)) + if file_content is not None: + for line_number, line in enumerate(file_content.split("\n"), start=1): + clean_line = remove_comments(line, file_extension) + if not clean_line: + continue + for pattern, risk_level in risk_patterns.items(): + if re.search(pattern, clean_line): + classified_results[risk_level].append((line_number, clean_line)) return classified_results diff --git a/detection/__main__.py b/detection/__main__.py index 6d4aecb..2538ad6 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -1,4 +1,5 @@ import os +from pickle import FALSE from typing import Dict, List, Tuple, Optional from reportlab.lib.pagesizes import letter from reportlab.lib.styles import getSampleStyleSheet @@ -10,6 +11,8 @@ from .utils import * import sys from colorama import init, Fore, Style +PYCDC_FLAG = True +PYCDC_ADDR_FLAG = True SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"} OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] ORDERS = [ @@ -331,6 +334,13 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: if fileExtension == ".pyc": # 反汇编pyc文件 file_content = disassemble_pyc(filePath, pycdc_addr) + if file_content == "none": + global PYCDC_FLAG + PYCDC_FLAG = False + return "" + elif file_content == "invalid": + global PYCDC_ADDR_FLAG + PYCDC_ADDR_FLAG = False if mode == "regex": return find_dangerous_functions(file_content, fileExtension) elif mode == "llm": @@ -360,26 +370,28 @@ def process_path( file_results = checkModeAndDetect( mode, file_path, file_extension, pycdc_addr ) - for key in file_results: - if key != "none": # Exclude 'none' risk level - results[key].extend( - [ - (f"{file_path}: Line {line_num}", line) - for line_num, line in file_results[key] - ] - ) + if file_results != "": + for key in file_results: + if key != "none": # Exclude 'none' risk level + results[key].extend( + [ + (f"{file_path}: Line {line_num}", line) + for line_num, line in file_results[key] + ] + ) elif os.path.isfile(path): file_extension = os.path.splitext(path)[1] if file_extension in SUPPORTED_EXTENSIONS: file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) - for key in file_results: - if key != "none": # Exclude 'none' risk level - results[key].extend( - [ - (f"{path}: Line {line_num}", line) - for line_num, line in file_results[key] - ] - ) + if file_results != "": + for key in file_results: + if key != "none": # Exclude 'none' risk level + results[key].extend( + [ + (f"{path}: Line {line_num}", line) + for line_num, line in file_results[key] + ] + ) else: print("Unsupported file type.") return @@ -420,6 +432,14 @@ def main(): output_file = args.output.rsplit(".", 1)[0] + ".txt" # 如果未指定输出文件,则输出到 stdout;否则写入文件 process_path(args.path, output_format, args.mode, args.pycdc, output_file) + if PYCDC_FLAG == False: + print( + "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." + ) + print("Repo: https://github.com/zrax/pycdc.git") + if PYCDC_ADDR_FLAG == False: + print("ERROR: The specified pycdc.exe path is not valid") + print("Please check your pycdc path.") if __name__ == "__main__": diff --git a/detection/pyc_detection.py b/detection/pyc_detection.py index e898685..d350421 100644 --- a/detection/pyc_detection.py +++ b/detection/pyc_detection.py @@ -3,6 +3,7 @@ import uncompyle6 import io import os import subprocess +from contextlib import redirect_stdout, redirect_stderr def run_pycdc(exe_path: str, pyc_file: str) -> str: @@ -17,15 +18,12 @@ def run_pycdc(exe_path: str, pyc_file: str) -> str: str: Output from pycdc.exe. """ if not os.path.isfile(exe_path): - print(f"ERROR: The specified pycdc.exe path is not valid: {exe_path}") - print("Please check your pycdc path.") - exit(1) + return "invalid" command = f'"{exe_path}" "{pyc_file}"' - result = subprocess.run(command, capture_output=True, text=True, shell=True) - - if result.returncode != 0: - raise Exception(f"Error running pycdc.exe: {result.stderr}") + result = subprocess.run( + command, capture_output=True, text=True, shell=True, encoding="utf-8" + ) return result.stdout @@ -46,10 +44,6 @@ def disassemble_pyc(file_path: str, pycdc_addr=None) -> str: return output.getvalue() except Exception as e: if pycdc_addr is None: - print( - "ERROR: For Python 3.11 and above, you need to install pycdc and compile it yourself to obtain pycdc.exe." - ) - print("repo: https://github.com/zrax/pycdc.git") - exit(1) + return "none" else: return run_pycdc(pycdc_addr, file_path)