Merge pull request 'main' (#14) from dqy/mimajingsai:main into main

Reviewed-on: sangge/mimajingsai#14
This commit is contained in:
sangge 2023-10-21 20:27:20 +08:00
commit ad3f14ae1f
2 changed files with 113 additions and 19 deletions

1
.gitignore vendored
View File

@ -9,3 +9,4 @@ ReEncrypt.py
src/temp_message_file src/temp_message_file
src/temp_key_file src/temp_key_file
src/client.db src/client.db
src/server.db

View File

@ -1,44 +1,137 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from contextlib import asynccontextmanager
from typing import Tuple, Callable from typing import Tuple, Callable
import sqlite3
import asyncio
import time
import random
app = FastAPI() @asynccontextmanager
async def lifespan(app: FastAPI):
init()
yield
clean_env()
app = FastAPI(lifespan = lifespan)
# 连接到数据库(如果数据库不存在,则会自动创建)
conn = sqlite3.connect('server.db')
# 创建游标对象用于执行SQL语句
cursor = conn.cursor()
# 创建表: id: int; ip: TEXT
cursor.execute('''CREATE TABLE IF NOT EXISTS nodes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
ip TEXT NOT NULL,
last_heartbeat INTEGER
)''')
def init():
nothing = receive_heartbeat_internal()
def clean_env():
# 关闭游标和连接
cursor.close()
conn.close()
@app.get("/server/show_nodes")
async def show_nodes() -> list:
nodes_list = []
# 查询数据
cursor.execute("SELECT * FROM nodes")
rows = cursor.fetchall()
for row in rows:
nodes_list.append(row)
return nodes_list
@app.get("/server/get_node") @app.get("/server/get_node")
async def get_node(ip: str) -> int: async def get_node(ip: str) -> int:
''' '''
中心服务器与节点交互, 节点发送ip, 中心服务器接收ip存入数据库并将ip转换为int作为节点id返回给节点 中心服务器与节点交互, 节点发送ip, 中心服务器接收ip存入数据库并将ip转换为int作为节点id返回给节点
params: params:
ip: node ip ip: node ip
return: return:
id: ip按点分割成四部分, 每部分转二进制后拼接再转十进制作为节点id id: ip按点分割成四部分, 每部分转二进制后拼接再转十进制作为节点id
''' '''
# ip存入数据库, id = hash(int(ip))
ip_parts = ip.split(".") ip_parts = ip.split(".")
ip_int = 0 ip_int = 0
for i in range(4): for i in range(4):
ip_int += int(ip_parts[i]) << (24 - (8 * i)) ip_int += int(ip_parts[i]) << (24 - (8 * i))
# 获取当前时间
current_time = int(time.time())
# 插入数据
cursor.execute("INSERT INTO nodes (id, ip, last_heartbeat) VALUES (?, ?, ?)", (ip_int, ip, current_time))
conn.commit()
return ip_int return ip_int
@app.get("/server/delete_node") @app.get("/server/delete_node")
async def delete_node(ip: str) -> None: async def delete_node(ip: str) -> None:
# 按照节点ip遍历数据库, 删除该行数据 '''
param:
ip: 待删除节点的ip地址
return:
None
'''
# 查询要删除的节点
cursor.execute("SELECT * FROM nodes WHERE ip=?", (ip,))
row = cursor.fetchone()
if row is not None:
# 执行删除操作
cursor.execute("DELETE FROM nodes WHERE ip=?", (ip,))
conn.commit()
print(f"Node with IP {ip} deleted successfully.")
else:
print(f"Node with IP {ip} not found.")
@app.post("/server/send_nodes_list") # 接收节点心跳包
@app.post("/server/heartbeat")
async def receive_heartbeat(ip: str):
cursor.execute("UPDATE nodes SET last_heartbeat = ? WHERE ip = ?", (time.time(), ip))
return {"status": "received"}
async def receive_heartbeat_internal() -> int:
while 1:
print('successful delete1')
timeout = 10
# 删除超时的节点
cursor.execute("DELETE FROM nodes WHERE last_heartbeat < ?", (time.time() - timeout,))
conn.commit()
print('successful delete')
time.sleep(timeout)
return 1
@app.get("/server/send_nodes_list")
async def send_nodes_list(count: int) -> JSONResponse: async def send_nodes_list(count: int) -> JSONResponse:
''' '''
中心服务器与客户端交互, 客户端发送所需节点个数, 中心服务器从数据库中顺序取出节点封装成json格式返回给客户端 中心服务器与客户端交互, 客户端发送所需节点个数, 中心服务器从数据库中顺序取出节点封装成json格式返回给客户端
params: params:
count: 所需节点个数 count: 所需节点个数
return: return:
JSONResponse: {id: ip,...} JSONResponse: {id: ip,...}
''' '''
nodes_list = {} nodes_list = {}
for i in range(count):
# 访问数据库取出节点数据 # 查询数据库中的节点数据
node = (id, ip) cursor.execute("SELECT * FROM nodes LIMIT ?", (count,))
nodes_list[node[0]] = node[1] rows = cursor.fetchall()
for row in rows:
id, ip, last_heartbeat = row
nodes_list[id] = ip
json_result = jsonable_encoder(nodes_list) json_result = jsonable_encoder(nodes_list)
return JSONResponse(content=json_result) return JSONResponse(content=json_result)
@app.get("/server/clear_database")
async def clear_database() -> None:
cursor.execute("DELETE FROM nodes")
conn.commit()
if __name__ == "__main__":
import uvicorn # pylint: disable=e0401
uvicorn.run("server:app", host="0.0.0.0", port=8000, reload=True)