style: format code style
	
		
			
	
		
	
	
		
	
		
			Some checks are pending
		
		
	
	
		
			
				
	
				Python application test / build (pull_request) Waiting to run
				
					
					
				
			
		
		
	
	
				
					
				
			
		
			Some checks are pending
		
		
	
	Python application test / build (pull_request) Waiting to run
				This commit is contained in:
		| @@ -178,9 +178,7 @@ def main(): | ||||
|     parser.add_argument( | ||||
|         "-m", "--mode", help="Mode of operation:[regex,llm]", default="regex" | ||||
|     ) | ||||
|     parser.add_argument( | ||||
|         "-p","--pickle",help="analyze the pickle file",default=None | ||||
|     ) | ||||
|     parser.add_argument("-p", "--pickle", help="analyze the pickle file", default=None) | ||||
|     args = parser.parse_args() | ||||
|     output_format = "txt"  # Default output format | ||||
|     output_file = None | ||||
|   | ||||
| @@ -2,6 +2,7 @@ import io | ||||
| import json | ||||
| import pickle | ||||
|  | ||||
|  | ||||
| class _Unframer: | ||||
|  | ||||
|     def __init__(self, file_read, file_readline, file_tell=None): | ||||
| @@ -18,8 +19,7 @@ class _Unframer: | ||||
|                 buf[:] = self.file_read(n) | ||||
|                 return n | ||||
|             if n < len(buf): | ||||
|                 raise pickle.UnpicklingError( | ||||
|                     "pickle exhausted before end of frame") | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return n | ||||
|         else: | ||||
|             n = len(buf) | ||||
| @@ -33,8 +33,7 @@ class _Unframer: | ||||
|                 self.current_frame = None | ||||
|                 return self.file_read(n) | ||||
|             if len(data) < n: | ||||
|                 raise pickle.UnpicklingError( | ||||
|                     "pickle exhausted before end of frame") | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return data | ||||
|         else: | ||||
|             return self.file_read(n) | ||||
| @@ -45,35 +44,40 @@ class _Unframer: | ||||
|             if not data: | ||||
|                 self.current_frame = None | ||||
|                 return self.file_readline() | ||||
|             if data[-1] != b'\n'[0]: | ||||
|                 raise pickle.UnpicklingError( | ||||
|                     "pickle exhausted before end of frame") | ||||
|             if data[-1] != b"\n"[0]: | ||||
|                 raise pickle.UnpicklingError("pickle exhausted before end of frame") | ||||
|             return data | ||||
|         else: | ||||
|             return self.file_readline() | ||||
|  | ||||
|     def load_frame(self, frame_size): | ||||
|         if self.current_frame and self.current_frame.read() != b'': | ||||
|         if self.current_frame and self.current_frame.read() != b"": | ||||
|             raise pickle.UnpicklingError( | ||||
|                 "beginning of a new frame before end of current frame") | ||||
|                 "beginning of a new frame before end of current frame" | ||||
|             ) | ||||
|         self.current_frame = io.BytesIO(self.file_read(frame_size)) | ||||
|  | ||||
|  | ||||
| dangerous_modules = ["os", "subprocess", "builtins", "nt"] | ||||
| dangerous_names = [ | ||||
|     "system", | ||||
|     "popen", | ||||
|     "run", | ||||
|     "call", | ||||
|     "check_output", | ||||
|     "check_call", | ||||
| ] | ||||
|  | ||||
|  | ||||
|  | ||||
|  | ||||
| dangerous_modules = ["os", "subprocess","builtins","nt"] | ||||
| dangerous_names = ["system", "popen", "run", "call", "check_output", "check_call",] | ||||
|  | ||||
| class pickleScanner(): | ||||
| class pickleScanner: | ||||
|  | ||||
|     ReduceCount = 0 | ||||
|     maliciousModule = [] | ||||
|     dispatch = {} | ||||
|  | ||||
|     def __init__(self, file, *, fix_imports=True, | ||||
|                  encoding="ASCII", errors="strict", buffers=None): | ||||
|     def __init__( | ||||
|         self, file, *, fix_imports=True, encoding="ASCII", errors="strict", buffers=None | ||||
|     ): | ||||
|         self._buffers = iter(buffers) if buffers is not None else None | ||||
|         self._file_readline = file.readline | ||||
|         self._file_read = file.read | ||||
| @@ -104,10 +108,14 @@ class pickleScanner(): | ||||
|         # 扫描所有的opcodes | ||||
|         opcode = self.read(1) | ||||
|         while opcode: | ||||
|             if opcode == b'c': | ||||
|                 self.seek(-2,1) | ||||
|             if opcode == b"c": | ||||
|                 self.seek(-2, 1) | ||||
|                 codeN1 = self.read(1) | ||||
|                 if 65<= ord(codeN1) <=90 or 97<= ord(codeN1) <=122 or ord(codeN1) == 0: | ||||
|                 if ( | ||||
|                     65 <= ord(codeN1) <= 90 | ||||
|                     or 97 <= ord(codeN1) <= 122 | ||||
|                     or ord(codeN1) == 0 | ||||
|                 ): | ||||
|                     self.read(1) | ||||
|                 else: | ||||
|                     self.read(1) | ||||
| @@ -119,21 +127,21 @@ class pickleScanner(): | ||||
|             opcode = self.read(1) | ||||
|  | ||||
|     unsafe_opcodes = { | ||||
|         b'r',  # REDUCE - call a callable with arguments | ||||
|         b'R', # REDUCE - same as 'r', but for args tuple | ||||
|         b"r",  # REDUCE - call a callable with arguments | ||||
|         b"R",  # REDUCE - same as 'r', but for args tuple | ||||
|     } | ||||
|  | ||||
|  | ||||
|     def output(self): | ||||
|         return { | ||||
|         "ReduceCount": self.ReduceCount, | ||||
|         "maliciousModule": self.maliciousModule | ||||
|     } | ||||
|             "ReduceCount": self.ReduceCount, | ||||
|             "maliciousModule": self.maliciousModule, | ||||
|         } | ||||
|  | ||||
| def pickleDataDetection(file,output_file=None): | ||||
|     ''' | ||||
|  | ||||
| def pickleDataDetection(file, output_file=None): | ||||
|     """ | ||||
|     :param file: pickle file path | ||||
|     ''' | ||||
|     """ | ||||
|     with open(file, "rb") as file: | ||||
|         pickscan = pickleScanner(file) | ||||
|         pickscan.load() | ||||
| @@ -144,5 +152,7 @@ def pickleDataDetection(file,output_file=None): | ||||
|     else: | ||||
|         print(json.dumps(res)) | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     pickleDataDetection("test.pkl") | ||||
|  | ||||
| if __name__ == "__main__": | ||||
|     pickleDataDetection("test.pkl") | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user