feat:更改代码分布,实现模块化添加功能
Some checks failed
Python application test / build (pull_request) Failing after 12m34s

This commit is contained in:
2024-04-28 21:53:43 +08:00
parent 9e6b13d80e
commit 18454a0228
3 changed files with 54 additions and 50 deletions

View File

@@ -4,10 +4,6 @@ from .utils import *
import openai import openai
def detect_gpt(filename: str):
content = read_file_content(filename)
return detectGPT(content)
def detectGPT(content: str): def detectGPT(content: str):
client = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY")) client = openai.OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
text = content text = content

View File

@@ -0,0 +1,37 @@
import re
from typing import Dict, List, Tuple
from .utils import remove_comments
def find_dangerous_functions(
file_content: str, file_extension: str
) -> Dict[str, List[Tuple[int, str]]]:
patterns = {
".py": {
r"\bsystem\(": "high",
r"\bexec\(": "high",
r"\bpopen\(": "medium",
r"\beval\(": "high",
r"\bsubprocess\.run\(": "medium",
r"\b__getattribute__\(": "high",
r"\bgetattr\(": "medium",
r"\b__import__\(": "high",
},
".js": {
r"\beval\(": "high",
r"\bexec\(": "high",
r"\bchild_process\.exec\(": "high",
},
".cpp": {
r"\bsystem\(": "high",
},
}
risk_patterns = patterns.get(file_extension, {})
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for line_number, line in enumerate(file_content.split("\n"), start=1):
clean_line = remove_comments(line, file_extension)
if not clean_line:
continue
for pattern, risk_level in risk_patterns.items():
if re.search(pattern, clean_line):
classified_results[risk_level].append((line_number, clean_line))
return classified_results

View File

@@ -5,49 +5,14 @@ from reportlab.pdfgen import canvas
from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.styles import getSampleStyleSheet
from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate
from reportlab.lib import colors from reportlab.lib import colors
from .Regexdetection import find_dangerous_functions
from .GPTdetection import detectGPT
from .utils import * from .utils import *
SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"} SUPPORTED_EXTENSIONS = {".py", ".js", ".cpp"}
OUTPUT_FORMATS = ["html", "md", "txt", "pdf"] OUTPUT_FORMATS = ["html", "md", "txt", "pdf"]
def find_dangerous_functions(
file_content: str, file_extension: str
) -> Dict[str, List[Tuple[int, str]]]:
patterns = {
".py": {
r"\bsystem\(": "high",
r"\bexec\(": "high",
r"\bpopen\(": "medium",
r"\beval\(": "high",
r"\bsubprocess\.run\(": "medium",
r"\b__getattribute__\(": "high",
r"\bgetattr\(": "medium",
r"\b__import__\(": "high",
},
".js": {
r"\beval\(": "high",
r"\bexec\(": "high",
r"\bchild_process\.exec\(": "high",
},
".cpp": {
r"\bsystem\(": "high",
},
}
risk_patterns = patterns.get(file_extension, {})
classified_results = {"high": [], "medium": [], "low": [], "none": []}
for line_number, line in enumerate(file_content.split("\n"), start=1):
clean_line = remove_comments(line, file_extension)
if not clean_line:
continue
for pattern, risk_level in risk_patterns.items():
if re.search(pattern, clean_line):
classified_results[risk_level].append((line_number, clean_line))
return classified_results
def generate_text_content(results): def generate_text_content(results):
text_output = "Security Analysis Report\n" text_output = "Security Analysis Report\n"
for risk_level, entries in results.items(): for risk_level, entries in results.items():
@@ -153,7 +118,15 @@ def output_text(results: Dict[str, List[Tuple[int, str]]], file_name=None):
return text_output return text_output
def process_path(path: str, output_format: str, output_file=None): def checkModeAndDetect(mode: str,filePath: str,fileExtension: str):
#TODO:添加更多方式,这里提高代码的复用性和扩展性
if mode == "regex":
return find_dangerous_functions(read_file_content(filePath), fileExtension)
elif mode == "llm":
return detectGPT(read_file_content(filePath))
def process_path(path: str, output_format: str, mode: str, output_file=None):
results = {"high": [], "medium": [], "low": [], "none": []} results = {"high": [], "medium": [], "low": [], "none": []}
if os.path.isdir(path): if os.path.isdir(path):
for root, dirs, files in os.walk(path): for root, dirs, files in os.walk(path):
@@ -161,9 +134,8 @@ def process_path(path: str, output_format: str, output_file=None):
file_extension = os.path.splitext(file)[1] file_extension = os.path.splitext(file)[1]
if file_extension in SUPPORTED_EXTENSIONS: if file_extension in SUPPORTED_EXTENSIONS:
file_path = os.path.join(root, file) file_path = os.path.join(root, file)
file_results = find_dangerous_functions(
read_file_content(file_path), file_extension file_results = checkModeAndDetect(mode,file_path,file_extension)
)
for key in file_results: for key in file_results:
if key != "none": # Exclude 'none' risk level if key != "none": # Exclude 'none' risk level
results[key].extend( results[key].extend(
@@ -175,9 +147,7 @@ def process_path(path: str, output_format: str, output_file=None):
elif os.path.isfile(path): elif os.path.isfile(path):
file_extension = os.path.splitext(path)[1] file_extension = os.path.splitext(path)[1]
if file_extension in SUPPORTED_EXTENSIONS: if file_extension in SUPPORTED_EXTENSIONS:
file_results = find_dangerous_functions( file_results = checkModeAndDetect(mode,path,file_extension)
read_file_content(path), file_extension
)
for key in file_results: for key in file_results:
if key != "none": # Exclude 'none' risk level if key != "none": # Exclude 'none' risk level
results[key].extend( results[key].extend(
@@ -202,6 +172,7 @@ def main():
parser = argparse.ArgumentParser(description="Backdoor detection tool.") parser = argparse.ArgumentParser(description="Backdoor detection tool.")
parser.add_argument("path", help="Path to the code to analyze") parser.add_argument("path", help="Path to the code to analyze")
parser.add_argument("-o", "--output", help="Output file path", default=None) parser.add_argument("-o", "--output", help="Output file path", default=None)
parser.add_argument("-m", "--mode", help="Mode of operation:[regex,llm]", default="regex")
args = parser.parse_args() args = parser.parse_args()
output_format = "txt" # Default output format output_format = "txt" # Default output format
output_file = None output_file = None
@@ -216,7 +187,7 @@ def main():
"Your input file format was incorrect, the output has been saved as a TXT file." "Your input file format was incorrect, the output has been saved as a TXT file."
) )
output_file = args.output.rsplit(".", 1)[0] + ".txt" output_file = args.output.rsplit(".", 1)[0] + ".txt"
process_path(args.path, output_format, output_file) process_path(args.path, output_format, args.mode, output_file)
if __name__ == "__main__": if __name__ == "__main__":