from Crypto.Util.number import * # type: ignore from Crypto.Cipher import AES from Crypto.Util.Padding import pad,unpad import socketserver import signal import string import random import os from flag import flag class Task(socketserver.BaseRequestHandler): def _recvall(self): BUFF_SIZE = 2048 data = b'' while True: part = self.request.recv(BUFF_SIZE) data += part if len(part) < BUFF_SIZE: break return data.strip() def send(self, msg, newline=True): try: if newline: msg += b'\n' self.request.sendall(msg) except: pass def recv(self, prompt=b'[-] '): self.send(prompt, newline=False) return self._recvall() def dhke(self): # p is a large prime number used for modulo operations in the Diffie-Hellman key exchange p = 327824197795087630552811243153730025469 # g is the base used for generating public keys in the Diffie-Hellman key exchange g = 5 # alice is Alice's private key, an integer chosen by Alice alice = 22751 # bob is Bob's private key, an integer chosen by Bob bob = 39494 # Bob calculates his public key as g^bob mod p and assigns it to Bob (uppercase to distinguish from private key) Bob = pow(g, bob, p) # The shared secret key is calculated by Alice using Bob's public key, raised to the power of Alice's private key mod p key = long_to_bytes(pow(Bob, alice, p)) # The random seed is initialized with 8 bytes of random data from the OS's cryptographically secure random generator random.seed(os.urandom(8)) # secret is a random string consisting of 20 alphanumeric characters, used as the secret message secret = ''.join( [random.choice(string.ascii_letters+string.digits) for _ in range(20)]) self.send(b"[+] Alice said :") self.send(self.encrypt(secret.encode(), key)) message = self.recv(b"[+] Now tell me what are they talking about: ") if message != secret.encode(): return False self.send(b"[+] Try to say 'HackedBy0xfa' to them") self.send(b"[+] As a hacker, you should use their key to encrypt") hacked = self.recv(b"[+] Tell me the cipher:") if self.decrypt(hacked, key) != b"HackedBy0xfa": return False # If all checks pass, return True indicating the exchange was successful and not intercepted return True def encrypt(self, plain_text:bytes, key:bytes)->bytes: cipher = AES.new(key, AES.MODE_ECB) cipher_text = cipher.encrypt(pad(plain_text, AES.block_size)) return cipher_text def decrypt(self, encrypt_text:bytes, key:bytes)->bytes: cipher = AES.new(key, AES.MODE_ECB) plain_text = unpad(cipher.decrypt(encrypt_text), AES.block_size) return plain_text def handle(self): signal.alarm(60) if not self.dhke(): self.send(b'[!] Wrong!') return self.send(b'here is your flag') self.send(flag) class ThreadedServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass class ForkedServer(socketserver.ForkingMixIn, socketserver.TCPServer): pass if __name__ == "__main__": HOST, PORT = '0.0.0.0', 10001 server = ForkedServer((HOST, PORT), Task) server.allow_reuse_address = True print(HOST, PORT) server.serve_forever()