Add front-end functionality and improved server.py and node.py
All checks were successful
Test CI / test speed (push) Successful in 17s

This commit is contained in:
2024-10-15 17:05:39 +08:00
parent 24a5e91382
commit bf6e25b722
21 changed files with 20491 additions and 29 deletions

View File

@@ -42,12 +42,17 @@ logger = logging.getLogger("uvicorn")
# 向中心服务器发送自己的IP地址,并获取自己的id
def send_ip():
url = server_address + "/get_node?ip=" + ip # type: ignore
url = f"http://{server_address}/server/get_node?ip={ip}" # 添加 http:// 协议
# ip = get_local_ip() # type: ignore
global id
id = requests.get(url, timeout=3)
logger.info(f"中心服务器返回节点ID为: {id}")
print("中心服务器返回节点ID为: ", id)
try:
response = requests.get(url, timeout=3)
response.raise_for_status() # 检查请求是否成功
data = response.json() # 将响应内容解析为 JSON 格式
global id
id = data.get("id") # 假设返回的 JSON 包含 id 字段
logger.info(f"中心服务器返回节点ID为: {id}")
except requests.exceptions.RequestException as e:
logger.error(f"请求中心服务器失败: {e}")
# 用环境变量获取本机ip
@@ -85,7 +90,7 @@ def clear():
async def send_heartbeat_internal() -> None:
timeout = 30
global ip
url = server_address + "/heartbeat?ip=" + ip # type: ignore
url = f"http://{server_address}/server/heartbeat?ip={ip}" # 添加 http:// 协议
while True:
# print('successful send my_heart')
try:
@@ -191,6 +196,6 @@ wallet_pk = "ae66ae3711a69079efd3d3e9b55f599ce7514eb29dfe4f9551404d3f361438c6"
if __name__ == "__main__":
import uvicorn
threading.Thread(target=log_message).start()
# threading.Thread(target=log_message).start()
uvicorn.run("node:app", host="0.0.0.0", port=8001, reload=True, log_level="debug")

View File

@@ -1,12 +1,49 @@
from fastapi import FastAPI, HTTPException
from fastapi.responses import JSONResponse
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.websockets import WebSocketState
from fastapi.responses import JSONResponse, HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
import sqlite3
import asyncio
import time
import ipaddress
import logging
import os
import queue
app = FastAPI()
origins = [
"http://localhost:3000",
]
# 配置 CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:3000"],
allow_credentials=True,
allow_methods=["*"], # 允许所有方法
allow_headers=["*"], # 允许所有头
)
# 配置日志文件路径
log_dir = "logs"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
log_file = os.path.join(log_dir, "server_logs.log")
# 全局日志配置
logging.basicConfig(
level=logging.INFO, # 设置全局日志级别
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", # 日志格式
handlers=[
logging.FileHandler(log_file, encoding="utf-8"), # 输出到日志文件
logging.StreamHandler(), # 输出到控制台
],
)
# 获取日志记录器
logger = logging.getLogger(__name__)
@@ -17,7 +54,14 @@ async def lifespan(_: FastAPI):
clean_env()
# 获取当前文件所在的目录
current_dir = os.path.dirname(os.path.abspath(__file__))
# 定义 frontend 的绝对路径
frontend_dir = os.path.join(current_dir, "..", "frontend")
app = FastAPI(lifespan=lifespan)
app.mount("/frontend", StaticFiles(directory="frontend/build"), name="frontend")
def init():
@@ -61,9 +105,16 @@ async def show_nodes() -> list:
for row in rows:
nodes_list.append(row)
# TODO: use JSONResponse
logger.info("节点信息已成功获取")
return nodes_list
@app.get("/nodes", response_class=HTMLResponse)
async def get_nodes_page():
with open("frontend/public/index.html") as f:
return HTMLResponse(content=f.read(), status_code=200)
def validate_ip(ip: str) -> bool:
"""
Validate an IP address.
@@ -85,7 +136,7 @@ def validate_ip(ip: str) -> bool:
@app.get("/server/get_node")
async def get_node(ip: str) -> int:
async def get_node(ip: str) -> JSONResponse:
"""
中心服务器与节点交互, 节点发送ip, 中心服务器接收ip存入数据库并将ip转换为int作为节点id返回给节点
params:
@@ -102,13 +153,11 @@ async def get_node(ip: str) -> int:
for i in range(4):
ip_int += int(ip_parts[i]) << (24 - (8 * i))
# TODO: replace print with logger
print("IP", ip, "对应的ID为", ip_int)
logger.info(f"IP {ip} 对应的ID为 {ip_int}")
# 获取当前时间
current_time = int(time.time())
# TODO: replace print with logger
print("当前时间: ", current_time)
logger.info(f"当前时间: {current_time}")
with sqlite3.connect("server.db") as db:
# 插入数据
@@ -118,8 +167,10 @@ async def get_node(ip: str) -> int:
)
db.commit()
# TODO: use JSONResponse
return ip_int
# 使用 JSONResponse 返回节点ID和当前时间
logger.info(f"节点 {ip} 已成功添加到数据库")
content = {"id": ip_int, "current_time": current_time}
return JSONResponse(content, status_code=200)
# TODO: try to use @app.delete("/node")
@@ -132,6 +183,9 @@ async def delete_node(ip: str):
ip (str): The ip of the node to be deleted.
"""
if not validate_ip(ip):
logger.warning(f"收到无效 IP 格式的删除请求: {ip}")
raise HTTPException(status_code=400, detail="Invalid IP format")
with sqlite3.connect("server.db") as db:
# 查询要删除的节点
@@ -143,11 +197,10 @@ async def delete_node(ip: str):
db.execute("DELETE FROM nodes WHERE ip=?", (ip,))
db.commit()
# TODO: replace print with logger
print(f"Node with IP {ip} deleted successfully.")
return {"message", f"Node with IP {ip} deleted successfully."}
logger.info(f"节点 {ip} 已成功删除")
return {"message": f"Node with IP {ip} deleted successfully."}
else:
print(f"Node with IP {ip} not found.")
logger.warning(f"节点 {ip} 未找到")
raise HTTPException(status_code=404, detail=f"Node with IP {ip} not found.")
@@ -165,14 +218,15 @@ async def receive_heartbeat(ip: str):
"""
if not validate_ip(ip):
content = {"message": "invalid ip format"}
logger.warning(f"收到无效 IP 格式的心跳包: {ip}")
return JSONResponse(content, status_code=400)
print("收到来自", ip, "的心跳包")
logger.info("收到来自", ip, "的心跳包")
logger.info(f"收到来自 {ip} 的心跳包")
with sqlite3.connect("server.db") as db:
db.execute(
"UPDATE nodes SET last_heartbeat = ? WHERE ip = ?", (time.time(), ip)
)
logger.info(f"成功更新节点 {ip} 的心跳时间")
content = {"status": "received"}
return JSONResponse(content, status_code=200)
@@ -210,9 +264,7 @@ async def send_nodes_list(count: int) -> list:
_, ip, _ = row
nodes_list.append(ip)
print("收到来自客户端的节点列表请求...")
print(nodes_list)
# TODO: use JSONResponse
logger.info(f"已成功发送 {count} 个节点信息")
return nodes_list
@@ -220,6 +272,71 @@ def clear_database() -> None:
with sqlite3.connect("server.db") as db:
db.execute("DELETE FROM nodes")
db.commit()
logger.info("数据库已清空")
# WebSocket连接池
connected_clients = []
log_queue = queue.Queue() # 用于存储日志的队列
@app.websocket("/ws/logs")
async def websocket_logs(websocket: WebSocket):
await websocket.accept()
connected_clients.append(websocket) # 添加 WebSocket 客户端
try:
# 发送历史日志
while not log_queue.empty():
log_message = log_queue.get()
await websocket.send_json({"type": "log", "message": log_message})
# 实时日志发送
while True:
await asyncio.sleep(5) # 保证 WebSocket 持续连接
except Exception as e:
print(f"WebSocket connection closed with error: {e}")
finally:
if websocket in connected_clients:
connected_clients.remove(websocket)
await websocket.close()
class WebSocketLogHandler(logging.Handler):
def emit(self, record):
log_entry = self.format(record)
timestamp = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.created))
log_message = f"{timestamp} - {log_entry}"
log_queue.put(log_message) # 将日志消息放入队列
for client in connected_clients:
if client.application_state == WebSocketState.CONNECTED:
# 改为异步线程安全地发送日志
asyncio.run_coroutine_threadsafe(
self.safe_send_log(client, log_message), asyncio.get_event_loop()
)
async def safe_send_log(self, client, log_message):
try:
await client.send_json({"type": "log", "message": log_message})
except RuntimeError as e:
print(f"Error while sending log to {client.application_state}: {e}")
except Exception as e:
print(f"Unexpected error: {e}")
finally:
if client in connected_clients:
connected_clients.remove(client)
# 捕获 FastAPI 和 Uvicorn 的日志
uvicorn_logger = logging.getLogger("uvicorn")
uvicorn_logger.setLevel(logging.INFO)
# 捕获 FastAPI 的日志
fastapi_logger = logging.getLogger("fastapi")
fastapi_logger.setLevel(logging.INFO)
# 将日志输出到 WebSocket
uvicorn_logger.addHandler(WebSocketLogHandler())
fastapi_logger.addHandler(WebSocketLogHandler())
if __name__ == "__main__":