Compare commits

...

7 Commits

4 changed files with 227 additions and 68 deletions

View File

@@ -28,7 +28,7 @@ def detectGPT(content: str):
# signal.signal(signal.SIGTERM, timeout_handler) # signal.signal(signal.SIGTERM, timeout_handler)
# signal.alarm(10) # signal.alarm(10)
client = openai.OpenAI(base_url="https://api.xiaoai.plus/v1", api_key=api_key) client = openai.OpenAI(base_url="https://api.kpi7.cn/v1", api_key=api_key)
text = content text = content
# client = openai.OpenAI(api_key="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") #测试用key # client = openai.OpenAI(api_key="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") #测试用key
response = client.chat.completions.create( response = client.chat.completions.create(
@@ -46,7 +46,7 @@ def detectGPT(content: str):
"content": text, "content": text,
}, },
], ],
model="gpt-3.5-turbo", model="gpt-4o",
) )
try: try:
message_content = response.choices[0].message.content message_content = response.choices[0].message.content

View File

@@ -6,6 +6,8 @@ from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
from detection.pickle_detection import pickleDataDetection from detection.pickle_detection import pickleDataDetection
from .requirements_detection import requirement_detection
from .Regexdetection import find_dangerous_functions from .Regexdetection import find_dangerous_functions
from .GPTdetection import detectGPT, GPTdetectFileList from .GPTdetection import detectGPT, GPTdetectFileList
@@ -19,7 +21,7 @@ from pathlib import Path
PYCDC_FLAG = True PYCDC_FLAG = True
PYCDC_ADDR_FLAG = True PYCDC_ADDR_FLAG = True
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc"} SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp", ".pyc",".pkl",".pickle"}
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
ORDERS = [ ORDERS = [
"__import__", "__import__",
@@ -109,6 +111,7 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str:
text_output = "Security Analysis Report\n" text_output = "Security Analysis Report\n"
text_output += "=" * 30 + "\n\n" text_output += "=" * 30 + "\n\n"
# text_output+= "chatGPT检测结果\n\n"
for risk_level, entries in results.items(): for risk_level, entries in results.items():
# print(risk_level, entries) # print(risk_level, entries)
@@ -371,7 +374,12 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr:
def process_path( def process_path(
path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None path: str,
output_format: str,
mode: str,
pycdc_addr: str,
output_file=None,
requirement_path=None,
): ):
results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []} results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []}
if os.path.isdir(path): if os.path.isdir(path):
@@ -381,13 +389,16 @@ def process_path(
for file_path in Path(path).rglob("*") for file_path in Path(path).rglob("*")
if file_path.suffix in SUPPORTED_EXTENSIONS if file_path.suffix in SUPPORTED_EXTENSIONS
] ]
print(all_files)
if mode == "llm": if mode == "llm":
results = GPTdetectFileList(all_files) results = GPTdetectFileList(all_files)
else: else:
# 扫描动画 # 扫描动画
for file_path in tqdm(all_files, desc="Scanning files", unit="file"): for file_path in tqdm(all_files, desc="Scanning files", unit="file"):
file_extension = file_path.suffix file_extension = file_path.suffix
if file_extension in [".pkl", ".pickle"]: # print(file_extension)
if file_extension in [".pkl",".pickle"]:
# print("识别到pickle")
res = pickleDataDetection(str(file_path), output_file) res = pickleDataDetection(str(file_path), output_file)
results["pickles"].append({"file": str(file_path), "result": res}) results["pickles"].append({"file": str(file_path), "result": res})
continue continue
@@ -425,7 +436,8 @@ def process_path(
else: else:
print("Invalid path.") print("Invalid path.")
sys.exit(1) sys.exit(1)
if requirement_path is not None:
requirement_detection(requirement_path, output_file)
output_results(results, output_format, output_file) output_results(results, output_format, output_file)
@@ -446,6 +458,18 @@ def main():
help="Path to pycdc.exe to decompile", help="Path to pycdc.exe to decompile",
default=os.getenv("PATH"), default=os.getenv("PATH"),
) )
parser.add_argument(
"-P",
"--Pickle",
help="Path to pickle file to analyze",
default=None,
)
parser.add_argument(
"-r",
"--requirement",
help="Path to requirement file to analyze",
default=None,
)
args = parser.parse_args() args = parser.parse_args()
output_format = "txt" # Default output format output_format = "txt" # Default output format
output_file = None output_file = None
@@ -461,7 +485,9 @@ def main():
) )
output_file = args.output.rsplit(".", 1)[0] + ".txt" output_file = args.output.rsplit(".", 1)[0] + ".txt"
# 如果未指定输出文件,则输出到 stdout否则写入文件 # 如果未指定输出文件,则输出到 stdout否则写入文件
process_path(args.path, output_format, args.mode, args.pycdc, output_file) process_path(
args.path, output_format, args.mode, args.pycdc, output_file, args.requirement
)
if PYCDC_FLAG == False: if PYCDC_FLAG == False:
print( print(
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc." "ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."

View File

@@ -3,6 +3,16 @@ import requests
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from packaging.version import Version, InvalidVersion from packaging.version import Version, InvalidVersion
import sys import sys
from reportlab.lib.pagesizes import letter
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
from colorama import Fore, Style, init
from tqdm import tqdm
import html
import os
init(autoreset=True) # 初始化colorama并在每次打印后自动重置颜色
def fetch_html(url: str) -> str: def fetch_html(url: str) -> str:
@@ -55,7 +65,6 @@ def version_in_range(version, range_str: str) -> bool:
except InvalidVersion: except InvalidVersion:
return False return False
else: else:
# 如果没有给版本号,默认使用最新版本
if range_str[-2] == ",": if range_str[-2] == ",":
return True return True
@@ -77,22 +86,18 @@ def version_in_range(version, range_str: str) -> bool:
return True return True
def check_vulnerabilities(requirements: list, base_url: str, output_file: str): def check_vulnerabilities(requirements: list, base_url: str) -> str:
with open(output_file, "w") as out_file: results = []
for req in requirements: for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"):
version = "" version = ""
# 如果有版本
if "==" in req: if "==" in req:
package_name, version = req.split("==") package_name, version = req.split("==")
# 没有版本
else: else:
package_name, version = req, None package_name, version = req, None
# 拼接URL
url = f"{base_url}{package_name}" url = f"{base_url}{package_name}"
print(f"Fetching data for {package_name} from {url}") # print(f"\nFetching data for {package_name} from {url}")
html_content = fetch_html(url) html_content = fetch_html(url)
if html_content: if html_content:
# 解析hmtl
extracted_data = parse_html(html_content) extracted_data = parse_html(html_content)
if extracted_data: if extracted_data:
relevant_vulns = [] relevant_vulns = []
@@ -100,41 +105,164 @@ def check_vulnerabilities(requirements: list, base_url: str, output_file: str):
if version_in_range(version, vuln["chip"]): if version_in_range(version, vuln["chip"]):
relevant_vulns.append(vuln) relevant_vulns.append(vuln)
if relevant_vulns: if relevant_vulns:
out_file.write(f"Vulnerabilities found for {package_name}:\n") result = f"Vulnerabilities found for {package_name}:\n"
for vuln in relevant_vulns: for vuln in relevant_vulns:
out_file.write(f" - {vuln['link']}\n") result += f" - {vuln['link']}\n"
out_file.write("\n") results.append(result)
return "\n".join(results)
def save_to_file(output_path: str, data: str):
if output_path.endswith(".html"):
save_as_html(output_path, data)
elif output_path.endswith(".pdf"):
save_as_pdf(output_path, data)
elif output_path.endswith(".md"):
save_as_markdown(output_path, data)
else: else:
print(f"No relevant data found for {package_name}.") save_as_txt(output_path, data)
else:
print(f"Failed to fetch data for {package_name}.")
def main(): def save_as_html(output_path: str, data: str):
parser = argparse.ArgumentParser( escaped_data = html.escape(data)
description="Check project dependencies for vulnerabilities." html_content = f"""
) <html>
parser.add_argument( <head>
"-r", <meta charset="UTF-8">
"--requirement", <meta name="viewport" content="width=device-width, initial-scale=1.0">
help="Path to the requirements file of the project", <link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
required=True, <title>Vulnerability Report</title>
) <style>
parser.add_argument( body {{
"-o", font-family: Arial, sans-serif;
"--output", background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
help="Output file path with extension, e.g., './output/report.txt'", background-size: cover;
required=True, color: #333;
) margin: 0;
args = parser.parse_args() padding: 0;
display: flex;
justify-content: center;
align-items: center;
height: 100vh;
}}
.container {{
background: rgba(255, 255, 255, 0.8);
border-radius: 10px;
padding: 20px;
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
max-width: 800px;
width: 100%;
margin: 20px;
overflow-y: auto;
max-height: 90vh;
}}
.title {{
font-size: 24px;
font-weight: bold;
text-align: center;
margin-bottom: 20px;
}}
pre {{
white-space: pre-wrap;
word-wrap: break-word;
font-size: 14px;
line-height: 1.5;
color: #333;
background: #f4f4f4;
padding: 10px;
border-radius: 5px;
border: 1px solid #ddd;
overflow: auto;
font-weight: bold;
}}
</style>
</head>
<body>
<div class="container">
<div class="title">Vulnerability Report</div>
<pre>{escaped_data}</pre>
</div>
</body>
</html>
"""
with open(output_path, "w", encoding="utf-8") as file:
file.write(html_content)
def save_as_pdf(output_path: str, data: str):
doc = SimpleDocTemplate(output_path, pagesize=letter)
story = []
styles = getSampleStyleSheet()
# Add the title centered
title_style = ParagraphStyle(
"Title",
parent=styles["Title"],
alignment=1, # Center alignment
fontSize=24,
leading=28,
spaceAfter=20,
fontName="Helvetica-Bold",
)
title = Paragraph("Vulnerability Report", title_style)
story.append(title)
# Normal body text style
normal_style = ParagraphStyle(
"BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12
)
# Add the vulnerability details
for line in data.split("\n"):
if line.strip(): # Skip empty lines
story.append(Paragraph(line, normal_style))
doc.build(story)
def save_as_markdown(output_path: str, data: str):
with open(output_path, "w") as file:
file.write("## Vulnerability Report: \n\n")
file.write(data)
def save_as_txt(output_path: str, data: str):
with open(output_path, "w") as file:
file.write("Vulnerability Report: \n\n")
file.write(data)
def print_separator(title, char="-", length=50, padding=2):
print(f"{title:^{length + 4*padding}}") # 居中打印标题两侧各有padding个空格
print(char * (length + 2 * padding)) # 打印分割线两侧各有padding个字符的空格
def modify_file_name(file_path: str) -> str:
"""
Modify the file name by adding '-re' before the file extension.
Args:
file_path (str): The original file path.
Returns:
str: The modified file path.
"""
directory, file_name = os.path.split(file_path)
name, ext = os.path.splitext(file_name)
new_file_name = f"{name}-re{ext}"
new_file_path = os.path.join(directory, new_file_name)
return new_file_path
def requirement_detection(requirement_path, output_path=None):
base_url = "https://security.snyk.io/package/pip/" base_url = "https://security.snyk.io/package/pip/"
# 分析项目依赖,包括名称和版本(如果有的话) requirements = load_requirements(requirement_path)
requirements = load_requirements(args.requirement) results = check_vulnerabilities(requirements, base_url)
# 传入依赖信息url前缀扫描结果输出位置 if output_path is not None:
check_vulnerabilities(requirements, base_url, args.output) new_path = modify_file_name(output_path)
print("Vulnerability scan complete. Results saved to", args.output) save_to_file(new_path, results)
print(f"Vulnerability scan complete. Results saved to {output_path}")
print(f"Requirements scan complete. Results saved to {new_path}")
if __name__ == "__main__": else:
main() print_separator("\nVulnerability Report", "=", 40, 5)
print(results)

View File

@@ -106,7 +106,12 @@ backdoors = [
backdoor7, backdoor7,
] ]
backdoors_pickle = [
b'\x80\x03c__main__\nPerson\nq\x00)\x81q\x01}q\x02(X\x03\x00\x00\x00ageq\x03K\x12X\x04\x00\x00\x00nameq\x04X\x06\x00\x00\x00Pickleq\x05ub.',
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ub.',
b'cnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.',
b'\x80\x03c__main__\nUser\nq\x00)\x81q\x01}q\x02(X\x05\x00\x00\x00adminq\x03\x88X\x05\x00\x00\x00guestq\x04\x89ubcnt\nsystem\np0\n(Vcalc\np1\ntp2\nRp3\n.'
]
def inject_pickle_backdoor(root_path: str) -> None: def inject_pickle_backdoor(root_path: str) -> None:
""" """
Generate a pickle backdoor and insert it into the specified path. Generate a pickle backdoor and insert it into the specified path.
@@ -117,8 +122,8 @@ def inject_pickle_backdoor(root_path: str) -> None:
all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()] all_path = [str(p) for p in Path(root_path).glob("*") if p.is_dir()]
paths = random.sample(all_path, random.randrange(1, len(all_path))) paths = random.sample(all_path, random.randrange(1, len(all_path)))
for path in paths: for path in paths:
backdoor_id = random.randrange(0, len(backdoors)) backdoor_id = random.randrange(0, len(backdoors_pickle))
backdoor = backdoors[backdoor_id] backdoor = backdoors_pickle[backdoor_id]
filename = os.path.join(path, f"backdoor{backdoor_id}.pickle") filename = os.path.join(path, f"backdoor{backdoor_id}.pickle")
with open(filename, "wb") as f: with open(filename, "wb") as f:
pickle.dump(backdoor, f) pickle.dump(backdoor, f)