From fa86f12a4810b1e2db8ec4ad0c76ac9568aa367c Mon Sep 17 00:00:00 2001 From: tritium0041 Date: Tue, 14 May 2024 21:02:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=E6=B7=BB=E5=8A=A0=E4=BA=86pickle=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E6=89=AB=E6=8F=8F=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- detection/pickle_detection.py | 157 ++++++++++++++++++++++++++++++++++ detection/test.pkl | Bin 0 -> 33 bytes 2 files changed, 157 insertions(+) create mode 100644 detection/pickle_detection.py create mode 100644 detection/test.pkl diff --git a/detection/pickle_detection.py b/detection/pickle_detection.py new file mode 100644 index 0000000..b833c1c --- /dev/null +++ b/detection/pickle_detection.py @@ -0,0 +1,157 @@ +import io +import os +import pickletools +import pickle +import nt + +class _Unframer: + + def __init__(self, file_read, file_readline, file_tell=None): + self.file_read = file_read + self.file_readline = file_readline + self.current_frame = None + + def readinto(self, buf): + if self.current_frame: + n = self.current_frame.readinto(buf) + if n == 0 and len(buf) != 0: + self.current_frame = None + n = len(buf) + buf[:] = self.file_read(n) + return n + if n < len(buf): + raise pickle.UnpicklingError( + "pickle exhausted before end of frame") + return n + else: + n = len(buf) + buf[:] = self.file_read(n) + return n + + def read(self, n): + if self.current_frame: + data = self.current_frame.read(n) + if not data and n != 0: + self.current_frame = None + return self.file_read(n) + if len(data) < n: + raise pickle.UnpicklingError( + "pickle exhausted before end of frame") + return data + else: + return self.file_read(n) + + def readline(self): + if self.current_frame: + data = self.current_frame.readline() + if not data: + self.current_frame = None + return self.file_readline() + if data[-1] != b'\n'[0]: + raise pickle.UnpicklingError( + "pickle exhausted before end of frame") + return data + else: + return self.file_readline() + + def load_frame(self, frame_size): + if self.current_frame and self.current_frame.read() != b'': + raise pickle.UnpicklingError( + "beginning of a new frame before end of current frame") + self.current_frame = io.BytesIO(self.file_read(frame_size)) + + + + + + +dangerous_modules = ["os", "subprocess","builtins","nt"] +dangerous_names = ["system", "popen", "run", "call", "check_output", "check_call",] + +class pickleScanner(): + + ReduceCount = 0 + maliciousModule = [] + dispatch = {} + + def __init__(self, file, *, fix_imports=True, + encoding="ASCII", errors="strict", buffers=None): + self._buffers = iter(buffers) if buffers is not None else None + self._file_readline = file.readline + self._file_read = file.read + self.memo = {} + self.encoding = encoding + self.errors = errors + self.proto = 0 + self.fix_imports = fix_imports + self.file = file + + def find_class(self, module, name): + print(module, name) + if module.decode() in dangerous_modules or name.decode() in dangerous_names: + # self.maliciousCount += 1 + self.maliciousModule.append((module, name)) + + def load(self): + self._unframer = _Unframer(self._file_read, self._file_readline) + self.read = self._unframer.read + self.readinto = self._unframer.readinto + self.readline = self._unframer.readline + self.seek = self.file.seek + self.metastack = [] + self.stack = [] + self.append = self.stack.append + self.proto = 0 + read = self.read + dispatch = self.dispatch + # 扫描所有的opcodes + opcode = self.read(1) + while opcode: + if opcode == b'c': + self.seek(-2,1) + codeN1 = self.read(1) + if 65<= ord(codeN1) <=90 or 97<= ord(codeN1) <=122 or ord(codeN1) == 0: + self.read(1) + else: + self.read(1) + module = self.readline()[:-1] + name = self.readline()[:-1] + self.find_class(module, name) + elif opcode in self.unsafe_opcodes: + self.ReduceCount += 1 + opcode = self.read(1) + + unsafe_opcodes = { + b'r', # REDUCE - call a callable with arguments + b'R', # REDUCE - same as 'r', but for args tuple + } + + + def output(self): + if self.ReduceCount > 0 or len(self.maliciousModule) > 0: + print("The pickle file maybe contains malicious code") + print(f"The number of REDUCE opcodes is {self.ReduceCount}") + print("The malicious options are: ", self.maliciousModule) + else: + print("The pickle file is safe") + + + + +class test: + a = 1 + b = 2 + def __reduce__(self): + return (__import__("os").system,('calc',)) + + +data = pickle.dumps(test(),protocol=2) +print(data) +print(pickletools.dis(data)) +with open("test.pkl", "wb") as file: + file.write(data) + +with open("test.pkl", "rb") as file: + pickscan = pickleScanner(file) + pickscan.load() +pickscan.output() \ No newline at end of file diff --git a/detection/test.pkl b/detection/test.pkl new file mode 100644 index 0000000000000000000000000000000000000000..30c49be8d5204a05722eb8901cdb2eb5afb0575f GIT binary patch literal 33 ocmZo*O3o|cDy}RpNzLUdWQbs4U|>j2%t