Merge branch 'tests/final-tests' of https://git.mamahaha.work/sangge/BackDoorBuster into tests/final-tests
This commit is contained in:
commit
f6fa95ba16
@ -34,6 +34,7 @@ def find_dangerous_functions(
|
|||||||
r"\bos\.kill\b": "high",
|
r"\bos\.kill\b": "high",
|
||||||
r"\bos\.popen\b": "medium",
|
r"\bos\.popen\b": "medium",
|
||||||
r"\bos\.spawn\b": "medium",
|
r"\bos\.spawn\b": "medium",
|
||||||
|
r"\bsubprocess": "medium",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
risk_patterns = patterns.get(file_extension, {})
|
risk_patterns = patterns.get(file_extension, {})
|
||||||
@ -43,7 +44,9 @@ def find_dangerous_functions(
|
|||||||
clean_line = remove_comments(line, file_extension)
|
clean_line = remove_comments(line, file_extension)
|
||||||
if not clean_line:
|
if not clean_line:
|
||||||
continue
|
continue
|
||||||
|
# 消除换行符,避免影响正则匹配
|
||||||
|
clean_line = clean_line.replace("\\n", "")
|
||||||
for pattern, risk_level in risk_patterns.items():
|
for pattern, risk_level in risk_patterns.items():
|
||||||
if re.search(pattern, clean_line):
|
if re.search(pattern, clean_line, re.MULTILINE | re.DOTALL):
|
||||||
classified_results[risk_level].append((line_number, clean_line))
|
classified_results[risk_level].append((line_number, clean_line))
|
||||||
return classified_results
|
return classified_results
|
||||||
|
@ -6,8 +6,11 @@ from reportlab.lib.styles import getSampleStyleSheet
|
|||||||
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
|
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
|
||||||
|
|
||||||
from detection.pickle_detection import pickleDataDetection
|
from detection.pickle_detection import pickleDataDetection
|
||||||
|
|
||||||
|
from .requirements_detection import requirement_detection
|
||||||
from .Regexdetection import find_dangerous_functions
|
from .Regexdetection import find_dangerous_functions
|
||||||
from .GPTdetection import detectGPT,GPTdetectFileList
|
from .GPTdetection import detectGPT, GPTdetectFileList
|
||||||
|
|
||||||
# from .cngptdetection import detectGPT,GPTdetectFileList
|
# from .cngptdetection import detectGPT,GPTdetectFileList
|
||||||
from .pyc_detection import disassemble_pyc
|
from .pyc_detection import disassemble_pyc
|
||||||
from .utils import *
|
from .utils import *
|
||||||
@ -30,6 +33,8 @@ ORDERS = [
|
|||||||
"__getattribute__",
|
"__getattribute__",
|
||||||
"getattr",
|
"getattr",
|
||||||
"child_process",
|
"child_process",
|
||||||
|
"kill",
|
||||||
|
"fork",
|
||||||
]
|
]
|
||||||
|
|
||||||
# Initialize colorama
|
# Initialize colorama
|
||||||
@ -147,8 +152,6 @@ def generate_text_content(results: Dict[str, List[Tuple[int, str]]]) -> str:
|
|||||||
text_output += line_text
|
text_output += line_text
|
||||||
text_output += "\n"
|
text_output += "\n"
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
return text_output
|
return text_output
|
||||||
|
|
||||||
|
|
||||||
@ -371,9 +374,14 @@ def checkModeAndDetect(mode: str, filePath: str, fileExtension: str, pycdc_addr:
|
|||||||
|
|
||||||
|
|
||||||
def process_path(
|
def process_path(
|
||||||
path: str, output_format: str, mode: str, pycdc_addr: str, output_file=None
|
path: str,
|
||||||
|
output_format: str,
|
||||||
|
mode: str,
|
||||||
|
pycdc_addr: str,
|
||||||
|
output_file=None,
|
||||||
|
requirement_path=None,
|
||||||
):
|
):
|
||||||
results = {"high": [], "medium": [], "low": [], "none": [],"pickles": []}
|
results = {"high": [], "medium": [], "low": [], "none": [], "pickles": []}
|
||||||
if os.path.isdir(path):
|
if os.path.isdir(path):
|
||||||
# 使用rglob获取所有文件
|
# 使用rglob获取所有文件
|
||||||
all_files = [
|
all_files = [
|
||||||
@ -392,10 +400,7 @@ def process_path(
|
|||||||
if file_extension in [".pkl",".pickle"]:
|
if file_extension in [".pkl",".pickle"]:
|
||||||
# print("识别到pickle")
|
# print("识别到pickle")
|
||||||
res = pickleDataDetection(str(file_path), output_file)
|
res = pickleDataDetection(str(file_path), output_file)
|
||||||
results["pickles"].append({
|
results["pickles"].append({"file": str(file_path), "result": res})
|
||||||
"file": str(file_path),
|
|
||||||
"result": res
|
|
||||||
})
|
|
||||||
continue
|
continue
|
||||||
file_results = checkModeAndDetect(
|
file_results = checkModeAndDetect(
|
||||||
mode, str(file_path), file_extension, pycdc_addr
|
mode, str(file_path), file_extension, pycdc_addr
|
||||||
@ -413,10 +418,7 @@ def process_path(
|
|||||||
file_extension = os.path.splitext(path)[1]
|
file_extension = os.path.splitext(path)[1]
|
||||||
if file_extension in [".pkl", ".pickle"]:
|
if file_extension in [".pkl", ".pickle"]:
|
||||||
res = pickleDataDetection(str(path), output_file)
|
res = pickleDataDetection(str(path), output_file)
|
||||||
results["pickles"].append({
|
results["pickles"].append({"file": str(path), "result": res})
|
||||||
"file": str(path),
|
|
||||||
"result": res
|
|
||||||
})
|
|
||||||
elif file_extension in SUPPORTED_EXTENSIONS:
|
elif file_extension in SUPPORTED_EXTENSIONS:
|
||||||
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
|
file_results = checkModeAndDetect(mode, path, file_extension, pycdc_addr)
|
||||||
if file_results is not None:
|
if file_results is not None:
|
||||||
@ -434,7 +436,8 @@ def process_path(
|
|||||||
else:
|
else:
|
||||||
print("Invalid path.")
|
print("Invalid path.")
|
||||||
sys.exit(1)
|
sys.exit(1)
|
||||||
|
if requirement_path is not None:
|
||||||
|
requirement_detection(requirement_path, output_file)
|
||||||
output_results(results, output_format, output_file)
|
output_results(results, output_format, output_file)
|
||||||
|
|
||||||
|
|
||||||
@ -455,6 +458,18 @@ def main():
|
|||||||
help="Path to pycdc.exe to decompile",
|
help="Path to pycdc.exe to decompile",
|
||||||
default=os.getenv("PATH"),
|
default=os.getenv("PATH"),
|
||||||
)
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-P",
|
||||||
|
"--Pickle",
|
||||||
|
help="Path to pickle file to analyze",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"-r",
|
||||||
|
"--requirement",
|
||||||
|
help="Path to requirement file to analyze",
|
||||||
|
default=None,
|
||||||
|
)
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
output_format = "txt" # Default output format
|
output_format = "txt" # Default output format
|
||||||
output_file = None
|
output_file = None
|
||||||
@ -470,7 +485,9 @@ def main():
|
|||||||
)
|
)
|
||||||
output_file = args.output.rsplit(".", 1)[0] + ".txt"
|
output_file = args.output.rsplit(".", 1)[0] + ".txt"
|
||||||
# 如果未指定输出文件,则输出到 stdout;否则写入文件
|
# 如果未指定输出文件,则输出到 stdout;否则写入文件
|
||||||
process_path(args.path, output_format, args.mode, args.pycdc, output_file)
|
process_path(
|
||||||
|
args.path, output_format, args.mode, args.pycdc, output_file, args.requirement
|
||||||
|
)
|
||||||
if PYCDC_FLAG == False:
|
if PYCDC_FLAG == False:
|
||||||
print(
|
print(
|
||||||
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."
|
"ERROR: Detected Python 3.11 or above .pyc files. You need to install pycdc and compile it yourself to obtain pycdc."
|
||||||
|
@ -3,6 +3,16 @@ import requests
|
|||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from packaging.version import Version, InvalidVersion
|
from packaging.version import Version, InvalidVersion
|
||||||
import sys
|
import sys
|
||||||
|
from reportlab.lib.pagesizes import letter
|
||||||
|
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
|
||||||
|
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer
|
||||||
|
from colorama import Fore, Style, init
|
||||||
|
from tqdm import tqdm
|
||||||
|
import html
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
init(autoreset=True) # 初始化colorama,并在每次打印后自动重置颜色
|
||||||
|
|
||||||
|
|
||||||
def fetch_html(url: str) -> str:
|
def fetch_html(url: str) -> str:
|
||||||
@ -55,7 +65,6 @@ def version_in_range(version, range_str: str) -> bool:
|
|||||||
except InvalidVersion:
|
except InvalidVersion:
|
||||||
return False
|
return False
|
||||||
else:
|
else:
|
||||||
# 如果没有给版本号,默认使用最新版本
|
|
||||||
if range_str[-2] == ",":
|
if range_str[-2] == ",":
|
||||||
return True
|
return True
|
||||||
|
|
||||||
@ -77,22 +86,18 @@ def version_in_range(version, range_str: str) -> bool:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def check_vulnerabilities(requirements: list, base_url: str, output_file: str):
|
def check_vulnerabilities(requirements: list, base_url: str) -> str:
|
||||||
with open(output_file, "w") as out_file:
|
results = []
|
||||||
for req in requirements:
|
for req in tqdm(requirements, desc="Checking vulnerabilities", unit="dependency"):
|
||||||
version = ""
|
version = ""
|
||||||
# 如果有版本
|
|
||||||
if "==" in req:
|
if "==" in req:
|
||||||
package_name, version = req.split("==")
|
package_name, version = req.split("==")
|
||||||
# 没有版本
|
|
||||||
else:
|
else:
|
||||||
package_name, version = req, None
|
package_name, version = req, None
|
||||||
# 拼接URL
|
|
||||||
url = f"{base_url}{package_name}"
|
url = f"{base_url}{package_name}"
|
||||||
print(f"Fetching data for {package_name} from {url}")
|
# print(f"\nFetching data for {package_name} from {url}")
|
||||||
html_content = fetch_html(url)
|
html_content = fetch_html(url)
|
||||||
if html_content:
|
if html_content:
|
||||||
# 解析hmtl
|
|
||||||
extracted_data = parse_html(html_content)
|
extracted_data = parse_html(html_content)
|
||||||
if extracted_data:
|
if extracted_data:
|
||||||
relevant_vulns = []
|
relevant_vulns = []
|
||||||
@ -100,41 +105,164 @@ def check_vulnerabilities(requirements: list, base_url: str, output_file: str):
|
|||||||
if version_in_range(version, vuln["chip"]):
|
if version_in_range(version, vuln["chip"]):
|
||||||
relevant_vulns.append(vuln)
|
relevant_vulns.append(vuln)
|
||||||
if relevant_vulns:
|
if relevant_vulns:
|
||||||
out_file.write(f"Vulnerabilities found for {package_name}:\n")
|
result = f"Vulnerabilities found for {package_name}:\n"
|
||||||
for vuln in relevant_vulns:
|
for vuln in relevant_vulns:
|
||||||
out_file.write(f" - {vuln['link']}\n")
|
result += f" - {vuln['link']}\n"
|
||||||
out_file.write("\n")
|
results.append(result)
|
||||||
|
return "\n".join(results)
|
||||||
|
|
||||||
|
|
||||||
|
def save_to_file(output_path: str, data: str):
|
||||||
|
if output_path.endswith(".html"):
|
||||||
|
save_as_html(output_path, data)
|
||||||
|
elif output_path.endswith(".pdf"):
|
||||||
|
save_as_pdf(output_path, data)
|
||||||
|
elif output_path.endswith(".md"):
|
||||||
|
save_as_markdown(output_path, data)
|
||||||
else:
|
else:
|
||||||
print(f"No relevant data found for {package_name}.")
|
save_as_txt(output_path, data)
|
||||||
else:
|
|
||||||
print(f"Failed to fetch data for {package_name}.")
|
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def save_as_html(output_path: str, data: str):
|
||||||
parser = argparse.ArgumentParser(
|
escaped_data = html.escape(data)
|
||||||
description="Check project dependencies for vulnerabilities."
|
html_content = f"""
|
||||||
)
|
<html>
|
||||||
parser.add_argument(
|
<head>
|
||||||
"-r",
|
<meta charset="UTF-8">
|
||||||
"--requirement",
|
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||||
help="Path to the requirements file of the project",
|
<link rel="icon" href="https://s2.loli.net/2024/05/30/WDc6MekjbuCU9Qo.png">
|
||||||
required=True,
|
<title>Vulnerability Report</title>
|
||||||
)
|
<style>
|
||||||
parser.add_argument(
|
body {{
|
||||||
"-o",
|
font-family: Arial, sans-serif;
|
||||||
"--output",
|
background-image: url('https://s2.loli.net/2024/05/30/85Mv7leB2IRWNp6.jpg');
|
||||||
help="Output file path with extension, e.g., './output/report.txt'",
|
background-size: cover;
|
||||||
required=True,
|
color: #333;
|
||||||
)
|
margin: 0;
|
||||||
args = parser.parse_args()
|
padding: 0;
|
||||||
|
display: flex;
|
||||||
|
justify-content: center;
|
||||||
|
align-items: center;
|
||||||
|
height: 100vh;
|
||||||
|
}}
|
||||||
|
.container {{
|
||||||
|
background: rgba(255, 255, 255, 0.8);
|
||||||
|
border-radius: 10px;
|
||||||
|
padding: 20px;
|
||||||
|
box-shadow: 0 0 10px rgba(0, 0, 0, 0.1);
|
||||||
|
max-width: 800px;
|
||||||
|
width: 100%;
|
||||||
|
margin: 20px;
|
||||||
|
overflow-y: auto;
|
||||||
|
max-height: 90vh;
|
||||||
|
}}
|
||||||
|
.title {{
|
||||||
|
font-size: 24px;
|
||||||
|
font-weight: bold;
|
||||||
|
text-align: center;
|
||||||
|
margin-bottom: 20px;
|
||||||
|
}}
|
||||||
|
pre {{
|
||||||
|
white-space: pre-wrap;
|
||||||
|
word-wrap: break-word;
|
||||||
|
font-size: 14px;
|
||||||
|
line-height: 1.5;
|
||||||
|
color: #333;
|
||||||
|
background: #f4f4f4;
|
||||||
|
padding: 10px;
|
||||||
|
border-radius: 5px;
|
||||||
|
border: 1px solid #ddd;
|
||||||
|
overflow: auto;
|
||||||
|
font-weight: bold;
|
||||||
|
}}
|
||||||
|
</style>
|
||||||
|
</head>
|
||||||
|
<body>
|
||||||
|
<div class="container">
|
||||||
|
<div class="title">Vulnerability Report</div>
|
||||||
|
<pre>{escaped_data}</pre>
|
||||||
|
</div>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
with open(output_path, "w", encoding="utf-8") as file:
|
||||||
|
file.write(html_content)
|
||||||
|
|
||||||
|
|
||||||
|
def save_as_pdf(output_path: str, data: str):
|
||||||
|
doc = SimpleDocTemplate(output_path, pagesize=letter)
|
||||||
|
story = []
|
||||||
|
styles = getSampleStyleSheet()
|
||||||
|
|
||||||
|
# Add the title centered
|
||||||
|
title_style = ParagraphStyle(
|
||||||
|
"Title",
|
||||||
|
parent=styles["Title"],
|
||||||
|
alignment=1, # Center alignment
|
||||||
|
fontSize=24,
|
||||||
|
leading=28,
|
||||||
|
spaceAfter=20,
|
||||||
|
fontName="Helvetica-Bold",
|
||||||
|
)
|
||||||
|
title = Paragraph("Vulnerability Report", title_style)
|
||||||
|
story.append(title)
|
||||||
|
|
||||||
|
# Normal body text style
|
||||||
|
normal_style = ParagraphStyle(
|
||||||
|
"BodyText", parent=styles["BodyText"], fontSize=12, leading=15, spaceAfter=12
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add the vulnerability details
|
||||||
|
for line in data.split("\n"):
|
||||||
|
if line.strip(): # Skip empty lines
|
||||||
|
story.append(Paragraph(line, normal_style))
|
||||||
|
|
||||||
|
doc.build(story)
|
||||||
|
|
||||||
|
|
||||||
|
def save_as_markdown(output_path: str, data: str):
|
||||||
|
with open(output_path, "w") as file:
|
||||||
|
file.write("## Vulnerability Report: \n\n")
|
||||||
|
file.write(data)
|
||||||
|
|
||||||
|
|
||||||
|
def save_as_txt(output_path: str, data: str):
|
||||||
|
with open(output_path, "w") as file:
|
||||||
|
file.write("Vulnerability Report: \n\n")
|
||||||
|
file.write(data)
|
||||||
|
|
||||||
|
|
||||||
|
def print_separator(title, char="-", length=50, padding=2):
|
||||||
|
print(f"{title:^{length + 4*padding}}") # 居中打印标题,两侧各有padding个空格
|
||||||
|
print(char * (length + 2 * padding)) # 打印分割线,两侧各有padding个字符的空格
|
||||||
|
|
||||||
|
|
||||||
|
def modify_file_name(file_path: str) -> str:
|
||||||
|
"""
|
||||||
|
Modify the file name by adding '-re' before the file extension.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
file_path (str): The original file path.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The modified file path.
|
||||||
|
"""
|
||||||
|
directory, file_name = os.path.split(file_path)
|
||||||
|
name, ext = os.path.splitext(file_name)
|
||||||
|
new_file_name = f"{name}-re{ext}"
|
||||||
|
new_file_path = os.path.join(directory, new_file_name)
|
||||||
|
return new_file_path
|
||||||
|
|
||||||
|
|
||||||
|
def requirement_detection(requirement_path, output_path=None):
|
||||||
base_url = "https://security.snyk.io/package/pip/"
|
base_url = "https://security.snyk.io/package/pip/"
|
||||||
# 分析项目依赖,包括名称和版本(如果有的话)
|
requirements = load_requirements(requirement_path)
|
||||||
requirements = load_requirements(args.requirement)
|
results = check_vulnerabilities(requirements, base_url)
|
||||||
# 传入依赖信息,url前缀,扫描结果输出位置
|
if output_path is not None:
|
||||||
check_vulnerabilities(requirements, base_url, args.output)
|
new_path = modify_file_name(output_path)
|
||||||
print("Vulnerability scan complete. Results saved to", args.output)
|
save_to_file(new_path, results)
|
||||||
|
print(f"Vulnerability scan complete. Results saved to {output_path}")
|
||||||
|
print(f"Requirements scan complete. Results saved to {new_path}")
|
||||||
if __name__ == "__main__":
|
else:
|
||||||
main()
|
print_separator("\nVulnerability Report", "=", 40, 5)
|
||||||
|
print(results)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user