题目
This commit is contained in:
102
crypto/easy_dhke/easy_dhke.py
Normal file
102
crypto/easy_dhke/easy_dhke.py
Normal file
@@ -0,0 +1,102 @@
|
||||
from Crypto.Util.number import * # type: ignore
|
||||
from Crypto.Cipher import AES
|
||||
from Crypto.Util.Padding import pad,unpad
|
||||
import socketserver
|
||||
import signal
|
||||
import string
|
||||
import random
|
||||
import os
|
||||
|
||||
|
||||
class Task(socketserver.BaseRequestHandler):
|
||||
def _recvall(self):
|
||||
BUFF_SIZE = 2048
|
||||
data = b''
|
||||
while True:
|
||||
part = self.request.recv(BUFF_SIZE)
|
||||
data += part
|
||||
if len(part) < BUFF_SIZE:
|
||||
break
|
||||
return data.strip()
|
||||
|
||||
def send(self, msg, newline=True):
|
||||
try:
|
||||
if newline:
|
||||
msg += b'\n'
|
||||
self.request.sendall(msg)
|
||||
except:
|
||||
pass
|
||||
|
||||
def recv(self, prompt=b'[-] '):
|
||||
self.send(prompt, newline=False)
|
||||
return self._recvall()
|
||||
|
||||
# def proof_of_work(self):
|
||||
# random.seed(os.urandom(8))
|
||||
# proof = ''.join(
|
||||
# [random.choice(string.ascii_letters+string.digits) for _ in range(20)])
|
||||
# _hexdigest = sha256(proof.encode()).hexdigest()
|
||||
# self.send(f"[+] sha256(XXXX+{proof[4:]}) == {_hexdigest}".encode())
|
||||
# x = self.recv(prompt=b'[+] Plz tell me XXXX: ')
|
||||
# if len(x) != 4 or sha256(x+proof[4:].encode()).hexdigest() != _hexdigest:
|
||||
# return False
|
||||
# return True
|
||||
|
||||
def dhke(self):
|
||||
p = 327824197795087630552811243153730025469
|
||||
g = 5
|
||||
alice = 22751
|
||||
bob = 39494
|
||||
Bob = pow(g, bob, p)
|
||||
key = long_to_bytes(pow(Bob, alice, p))
|
||||
random.seed(os.urandom(8))
|
||||
secret = ''.join(
|
||||
[random.choice(string.ascii_letters+string.digits) for _ in range(20)])
|
||||
self.send(b"[+] Alice said :")
|
||||
self.send(self.encrypt(secret.encode(),key))
|
||||
message = self.recv(b"[+] Now tell me what are they talking about: ")
|
||||
if message != secret.encode():
|
||||
return False
|
||||
self.send(b"[+] Try to say 'HackedBy0xfa' to them")
|
||||
self.send(b"[+] As a hacker, you should use their key to encrypt")
|
||||
hacked = self.recv(b"[+] Tell me the cipher:")
|
||||
if self.decrypt(hacked, key) != b"HackedBy0xfa":
|
||||
return False
|
||||
return True
|
||||
|
||||
def encrypt(self, plain_text:bytes, key:bytes)->bytes:
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
cipher_text = cipher.encrypt(pad(plain_text, AES.block_size))
|
||||
return cipher_text
|
||||
|
||||
def decrypt(self, encrypt_text:bytes, key:bytes)->bytes:
|
||||
cipher = AES.new(key, AES.MODE_ECB)
|
||||
plain_text = unpad(cipher.decrypt(encrypt_text), AES.block_size)
|
||||
return plain_text
|
||||
|
||||
def handle(self):
|
||||
signal.alarm(60)
|
||||
if not self.dhke():
|
||||
self.send(b'[!] Wrong!')
|
||||
return
|
||||
|
||||
self.send(b'here is your flag')
|
||||
self.send(flag)
|
||||
|
||||
|
||||
class ThreadedServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
|
||||
pass
|
||||
|
||||
|
||||
class ForkedServer(socketserver.ForkingMixIn, socketserver.TCPServer):
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# flag = bytes(os.getenv("FLAG"),"utf-8")
|
||||
flag = b"flag{coooloooool}"
|
||||
HOST, PORT = '0.0.0.0', 10001
|
||||
server = ForkedServer((HOST, PORT), Task)
|
||||
server.allow_reuse_address = True
|
||||
print(HOST, PORT)
|
||||
server.serve_forever()
|
Reference in New Issue
Block a user