BackDoorBuster/detection/pickle_detection.py

157 lines
4.7 KiB
Python

import io
import os
import pickletools
import pickle
import nt
class _Unframer:
def __init__(self, file_read, file_readline, file_tell=None):
self.file_read = file_read
self.file_readline = file_readline
self.current_frame = None
def readinto(self, buf):
if self.current_frame:
n = self.current_frame.readinto(buf)
if n == 0 and len(buf) != 0:
self.current_frame = None
n = len(buf)
buf[:] = self.file_read(n)
return n
if n < len(buf):
raise pickle.UnpicklingError(
"pickle exhausted before end of frame")
return n
else:
n = len(buf)
buf[:] = self.file_read(n)
return n
def read(self, n):
if self.current_frame:
data = self.current_frame.read(n)
if not data and n != 0:
self.current_frame = None
return self.file_read(n)
if len(data) < n:
raise pickle.UnpicklingError(
"pickle exhausted before end of frame")
return data
else:
return self.file_read(n)
def readline(self):
if self.current_frame:
data = self.current_frame.readline()
if not data:
self.current_frame = None
return self.file_readline()
if data[-1] != b'\n'[0]:
raise pickle.UnpicklingError(
"pickle exhausted before end of frame")
return data
else:
return self.file_readline()
def load_frame(self, frame_size):
if self.current_frame and self.current_frame.read() != b'':
raise pickle.UnpicklingError(
"beginning of a new frame before end of current frame")
self.current_frame = io.BytesIO(self.file_read(frame_size))
dangerous_modules = ["os", "subprocess","builtins","nt"]
dangerous_names = ["system", "popen", "run", "call", "check_output", "check_call",]
class pickleScanner():
ReduceCount = 0
maliciousModule = []
dispatch = {}
def __init__(self, file, *, fix_imports=True,
encoding="ASCII", errors="strict", buffers=None):
self._buffers = iter(buffers) if buffers is not None else None
self._file_readline = file.readline
self._file_read = file.read
self.memo = {}
self.encoding = encoding
self.errors = errors
self.proto = 0
self.fix_imports = fix_imports
self.file = file
def find_class(self, module, name):
print(module, name)
if module.decode() in dangerous_modules or name.decode() in dangerous_names:
# self.maliciousCount += 1
self.maliciousModule.append((module, name))
def load(self):
self._unframer = _Unframer(self._file_read, self._file_readline)
self.read = self._unframer.read
self.readinto = self._unframer.readinto
self.readline = self._unframer.readline
self.seek = self.file.seek
self.metastack = []
self.stack = []
self.append = self.stack.append
self.proto = 0
read = self.read
dispatch = self.dispatch
# 扫描所有的opcodes
opcode = self.read(1)
while opcode:
if opcode == b'c':
self.seek(-2,1)
codeN1 = self.read(1)
if 65<= ord(codeN1) <=90 or 97<= ord(codeN1) <=122 or ord(codeN1) == 0:
self.read(1)
else:
self.read(1)
module = self.readline()[:-1]
name = self.readline()[:-1]
self.find_class(module, name)
elif opcode in self.unsafe_opcodes:
self.ReduceCount += 1
opcode = self.read(1)
unsafe_opcodes = {
b'r', # REDUCE - call a callable with arguments
b'R', # REDUCE - same as 'r', but for args tuple
}
def output(self):
if self.ReduceCount > 0 or len(self.maliciousModule) > 0:
print("The pickle file maybe contains malicious code")
print(f"The number of REDUCE opcodes is {self.ReduceCount}")
print("The malicious options are: ", self.maliciousModule)
else:
print("The pickle file is safe")
class test:
a = 1
b = 2
def __reduce__(self):
return (__import__("os").system,('calc',))
data = pickle.dumps(test(),protocol=2)
print(data)
print(pickletools.dis(data))
with open("test.pkl", "wb") as file:
file.write(data)
with open("test.pkl", "rb") as file:
pickscan = pickleScanner(file)
pickscan.load()
pickscan.output()