From c811e434c690f94dc29b0fe3a611212d9430f34f Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Wed, 5 Jun 2024 10:46:42 +0800 Subject: [PATCH 1/4] =?UTF-8?q?fix:=20=E4=BE=9D=E8=B5=96=E6=8A=A5=E5=91=8A?= =?UTF-8?q?=E8=BE=93=E5=87=BA=E6=A0=BC=E5=BC=8F=E4=BF=AE=E6=94=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/requirements_detection.py | 203 +++++++++++++++++++++++----- 1 file changed, 166 insertions(+), 37 deletions(-) diff --git a/detection/requirements_detection.py b/detection/requirements_detection.py index 5a1c78f..8f2cdea 100644 --- a/detection/requirements_detection.py +++ b/detection/requirements_detection.py @@ -3,6 +3,15 @@ import requests from bs4 import BeautifulSoup from packaging.version import Version, InvalidVersion import sys +from reportlab.lib.pagesizes import letter +from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle +from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer +from colorama import Fore, Style, init +from tqdm import tqdm +import html + + +init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色 def fetch_html(url: str) -> str: @@ -55,7 +64,6 @@ def version_in_range(version, range_str: str) -> bool: except InvalidVersion: return False else: - # 如果没有给版本号,默认使用最新版本 if range_str[-2] == ",": return True @@ -77,37 +85,155 @@ def version_in_range(version, range_str: str) -> bool: return True -def check_vulnerabilities(requirements: list, base_url: str, output_file: str): - with open(output_file, "w") as out_file: - for req in requirements: - version = "" - # 如果有版本 - if "==" in req: - package_name, version = req.split("==") - # 没有版本 - else: - package_name, version = req, None - # 拼接URL - url = f"{base_url}{package_name}" - print(f"Fetching data for {package_name} from {url}") - html_content = fetch_html(url) - if html_content: - # 解析hmtl - extracted_data = parse_html(html_content) - if extracted_data: - relevant_vulns = [] - for vuln in extracted_data: - if version_in_range(version, vuln["chip"]): - relevant_vulns.append(vuln) - if relevant_vulns: - out_file.write(f"Vulnerabilities found for {package_name}:\n") - for vuln in relevant_vulns: - out_file.write(f" - {vuln['link']}\n") - out_file.write("\n") - else: - print(f"No relevant data found for {package_name}.") - else: - print(f"Failed to fetch data for {package_name}.") +def check_vulnerabilities(requirements: list, base_url: str) -> str: + results = [] + for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"): + version = "" + if "==" in req: + package_name, version = req.split("==") + else: + package_name, version = req, None + url = f"{base_url}{package_name}" + # print(f"Fetching data for {package_name} from {url}") + html_content = fetch_html(url) + if html_content: + extracted_data = parse_html(html_content) + if extracted_data: + relevant_vulns = [] + for vuln in extracted_data: + if version_in_range(version, vuln["chip"]): + relevant_vulns.append(vuln) + if relevant_vulns: + result = f"Vulnerabilities found for {package_name}:\n" + for vuln in relevant_vulns: + result += f" - {vuln['link']}\n" + results.append(result) + return "\n".join(results) + + +def save_to_file(output_path: str, data: str): + if output_path.endswith(".html"): + save_as_html(output_path, data) + elif output_path.endswith(".pdf"): + save_as_pdf(output_path, data) + elif output_path.endswith(".md"): + save_as_markdown(output_path, data) + else: + save_as_txt(output_path, data) + + +def save_as_html(output_path: str, data: str): + escaped_data = html.escape(data) + html_content = f""" + + + + + + Vulnerability Report + + + +
+
Vulnerability Report
+
{escaped_data}
+
+ + + """ + with open(output_path, "w", encoding="utf-8") as file: + file.write(html_content) + + +def save_as_pdf(output_path: str, data: str): + doc = SimpleDocTemplate(output_path, pagesize=letter) + story = [] + styles = getSampleStyleSheet() + + # Add the title centered + title_style = ParagraphStyle( + "Title", + parent=styles["Title"], + alignment=1, # Center alignment + fontSize=24, + leading=28, + spaceAfter=20, + fontName="Helvetica-Bold", + ) + title = Paragraph("Vulnerability Report", title_style) + story.append(title) + + # Normal body text style + normal_style = ParagraphStyle( + "BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12 + ) + + # Add the vulnerability details + for line in data.split("\n"): + if line.strip(): # Skip empty lines + story.append(Paragraph(line, normal_style)) + + doc.build(story) + + +def save_as_markdown(output_path: str, data: str): + with open(output_path, "w") as file: + file.write("## Vulnerability Report: \n\n") + file.write(data) + + +def save_as_txt(output_path: str, data: str): + with open(output_path, "w") as file: + file.write("Vulnerability Report: \n\n") + file.write(data) + + +def print_separator(title, char="-", length=50, padding=2): + print(f"{title:^{length + 4*padding}}") # 居中打印标题,两侧各有padding个空格 + print(char * (length + 2 * padding)) # 打印分割线,两侧各有padding个字符的空格 def main(): @@ -124,16 +250,19 @@ def main(): "-o", "--output", help="Output file path with extension, e.g., './output/report.txt'", - required=True, ) args = parser.parse_args() base_url = "https://security.snyk.io/package/pip/" - # 分析项目依赖,包括名称和版本(如果有的话) requirements = load_requirements(args.requirement) - # 传入依赖信息,url前缀,扫描结果输出位置 - check_vulnerabilities(requirements, base_url, args.output) - print("Vulnerability scan complete. Results saved to", args.output) + results = check_vulnerabilities(requirements, base_url) + + if args.output: + save_to_file(args.output, results) + print(f"Vulnerability scan complete. Results saved to {args.output}") + else: + print_separator("\n\nVulnerability Report", "=", 40, 5) + print(results) if __name__ == "__main__": From 373defc5bb09b922031149c8cafc6fdee9e5f630 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Wed, 5 Jun 2024 15:56:06 +0800 Subject: [PATCH 2/4] =?UTF-8?q?feat:=20=E5=B0=86=E4=BE=9D=E8=B5=96?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E6=B7=BB=E5=8A=A0=E5=88=B0=E6=A8=A1=E7=BB=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 34 +++++++++++------- detection/requirements_detection.py | 53 ++++++++++++++--------------- 2 files changed, 48 insertions(+), 39 deletions(-) diff --git a/detection/__main__.py b/detection/__main__.py index ad63295..0157b03 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -5,6 +5,8 @@ from reportlab.lib.styles import getSampleStyleSheet from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection + +from .requirements_detection import requirement_detection from .Regexdetection import find_dangerous_functions from .GPTdetection import detectGPT from .pyc_detection import disassemble_pyc @@ -361,7 +363,12 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: def process_path( - path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None + path: str, + output_format: str, + mode: str, + pycdc_addr: str, + output_file=None, + requirement_path=None, ): results = {"high": [], "medium": [], "low": [], "none": []} if os.path.isdir(path): @@ -375,12 +382,9 @@ def process_path( # 扫描动画 for file_path in tqdm(all_files, desc="Scanning files", unit="file"): file_extension = file_path.suffix - if file_extension in [".pkl",".pickle"]: + if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(file_path), output_file) - results["pickles"].append({ - "file": str(file_path), - "result": res - }) + results["pickles"].append({"file": str(file_path), "result": res}) continue file_results = checkModeAndDetect( mode, str(file_path), file_extension, pycdc_addr @@ -398,10 +402,7 @@ def process_path( file_extension = os.path.splitext(path)[1] if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(path), output_file) - results["pickles"].append({ - "file": str(path), - "result": res - }) + results["pickles"].append({"file": str(path), "result": res}) elif file_extension in SUPPORTED_EXTENSIONS: file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) if file_results is not None: @@ -419,7 +420,8 @@ def process_path( else: print("Invalid path.") sys.exit(1) - + if requirement_path is not None: + requirement_detection(requirement_path, output_file) output_results(results, output_format, output_file) @@ -446,6 +448,12 @@ def main(): help="Path to pickle file to analyze", default=None, ) + parser.add_argument( + "-r", + "--requirement", + help="Path to requirement file to analyze", + default=None, + ) args = parser.parse_args() output_format = "txt" # Default output format output_file = None @@ -464,7 +472,9 @@ def main(): ) output_file = args.output.rsplit(".", 1)[0] + ".txt" # 如果未指定输出文件,则输出到 stdout;否则写入文件 - process_path(args.path, output_format, args.mode, args.pycdc, output_file) + process_path( + args.path, output_format, args.mode, args.pycdc, output_file, args.requirement + ) if PYCDC_FLAG == False: print( "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." diff --git a/detection/requirements_detection.py b/detection/requirements_detection.py index 8f2cdea..c1c3538 100644 --- a/detection/requirements_detection.py +++ b/detection/requirements_detection.py @@ -9,6 +9,7 @@ from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer from colorama import Fore, Style, init from tqdm import tqdm import html +import os init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色 @@ -94,7 +95,7 @@ def check_vulnerabilities(requirements: list, base_url: str) -> str: else: package_name, version = req, None url = f"{base_url}{package_name}" - # print(f"Fetching data for {package_name} from {url}") + # print(f"\nFetching data for {package_name} from {url}") html_content = fetch_html(url) if html_content: extracted_data = parse_html(html_content) @@ -236,34 +237,32 @@ def print_separator(title, char="-", length=50, padding=2): print(char * (length + 2 * padding)) # 打印分割线,两侧各有padding个字符的空格 -def main(): - parser = argparse.ArgumentParser( - description="Check project dependencies for vulnerabilities." - ) - parser.add_argument( - "-r", - "--requirement", - help="Path to the requirements file of the project", - required=True, - ) - parser.add_argument( - "-o", - "--output", - help="Output file path with extension, e.g., './output/report.txt'", - ) - args = parser.parse_args() +def modify_file_name(file_path: str) -> str: + """ + Modify the file name by adding '-re' before the file extension. + Args: + file_path (str): The original file path. + + Returns: + str: The modified file path. + """ + directory, file_name = os.path.split(file_path) + name, ext = os.path.splitext(file_name) + new_file_name = f"{name}-re{ext}" + new_file_path = os.path.join(directory, new_file_name) + return new_file_path + + +def requirement_detection(requirement_path, output_path=None): base_url = "https://security.snyk.io/package/pip/" - requirements = load_requirements(args.requirement) + requirements = load_requirements(requirement_path) results = check_vulnerabilities(requirements, base_url) - - if args.output: - save_to_file(args.output, results) - print(f"Vulnerability scan complete. Results saved to {args.output}") + if output_path is not None: + new_path = modify_file_name(output_path) + save_to_file(new_path, results) + print(f"Vulnerability scan complete. Results saved to {output_path}") + print(f"Requirements scan complete. Results saved to {new_path}") else: - print_separator("\n\nVulnerability Report", "=", 40, 5) + print_separator("\nVulnerability Report", "=", 40, 5) print(results) - - -if __name__ == "__main__": - main() From 752e7747146474539c7a57fe9d70860154425ac6 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Thu, 6 Jun 2024 16:05:25 +0800 Subject: [PATCH 3/4] =?UTF-8?q?fix:=20=E4=BF=AE=E6=94=B9=E6=AD=A3=E5=88=99?= =?UTF-8?q?=E5=8C=B9=E9=85=8D=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/Regexdetection.py | 5 ++++- detection/__main__.py | 23 +++++++++-------------- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/detection/Regexdetection.py b/detection/Regexdetection.py index 2daa291..0ad2188 100644 --- a/detection/Regexdetection.py +++ b/detection/Regexdetection.py @@ -34,6 +34,7 @@ def find_dangerous_functions( r"\bos\.kill\b": "high", r"\bos\.popen\b": "medium", r"\bos\.spawn\b": "medium", + r"\bsubprocess": "medium", }, } risk_patterns = patterns.get(file_extension, {}) @@ -43,7 +44,9 @@ def find_dangerous_functions( clean_line = remove_comments(line, file_extension) if not clean_line: continue + # 消除换行符,避免影响正则匹配 + clean_line = clean_line.replace("\\n", "") for pattern, risk_level in risk_patterns.items(): - if re.search(pattern, clean_line): + if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL): classified_results[risk_level].append((line_number, clean_line)) return classified_results diff --git a/detection/__main__.py b/detection/__main__.py index a0be3bb..9dfdc5d 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -7,7 +7,8 @@ from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection from .Regexdetection import find_dangerous_functions -from .GPTdetection import detectGPT,GPTdetectFileList +from .GPTdetection import detectGPT, GPTdetectFileList + # from .cngptdetection import detectGPT,GPTdetectFileList from .pyc_detection import disassemble_pyc from .utils import * @@ -30,6 +31,8 @@ ORDERS = [ "__getattribute__", "getattr", "child_process", + "kill", + "fork", ] # Initialize colorama @@ -146,8 +149,6 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: text_output += line_text text_output += "\n" - - return text_output @@ -372,7 +373,7 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: def process_path( path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None ): - results = {"high": [], "medium": [], "low": [], "none": [],"pickles": []} + results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []} if os.path.isdir(path): # 使用rglob获取所有文件 all_files = [ @@ -383,15 +384,12 @@ def process_path( if mode == "llm": results = GPTdetectFileList(all_files) else: - # 扫描动画 + # 扫描动画 for file_path in tqdm(all_files, desc="Scanning files", unit="file"): file_extension = file_path.suffix - if file_extension in [".pkl",".pickle"]: + if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(file_path), output_file) - results["pickles"].append({ - "file": str(file_path), - "result": res - }) + results["pickles"].append({"file": str(file_path), "result": res}) continue file_results = checkModeAndDetect( mode, str(file_path), file_extension, pycdc_addr @@ -409,10 +407,7 @@ def process_path( file_extension = os.path.splitext(path)[1] if file_extension in [".pkl", ".pickle"]: res = pickleDataDetection(str(path), output_file) - results["pickles"].append({ - "file": str(path), - "result": res - }) + results["pickles"].append({"file": str(path), "result": res}) elif file_extension in SUPPORTED_EXTENSIONS: file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) if file_results is not None: From 2adb1cbc2e01639852cf4b5e189d6a491549f6e1 Mon Sep 17 00:00:00 2001 From: dqy <1016751306@qq.com> Date: Thu, 6 Jun 2024 17:14:47 +0800 Subject: [PATCH 4/4] =?UTF-8?q?fix:=20=E5=88=A0=E9=99=A4head?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/__main__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/detection/__main__.py b/detection/__main__.py index f179285..c620c8e 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -388,7 +388,6 @@ def process_path( for file_path in Path(path).rglob("*") if file_path.suffix in SUPPORTED_EXTENSIONS ] -<<<<<<< HEAD if mode == "llm": results = GPTdetectFileList(all_files) else: