From 2334cb68cfedc900130faa58f89aa650fa9aea3d Mon Sep 17 00:00:00 2001 From: ccyj <2384899431@qq.com> Date: Sat, 21 Oct 2023 20:36:58 +0800 Subject: [PATCH 1/2] =?UTF-8?q?build:=20=E8=8A=82=E7=82=B9=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E6=9E=84=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/node.py | 70 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/src/node.py b/src/node.py index e69de29..1f08167 100644 --- a/src/node.py +++ b/src/node.py @@ -0,0 +1,70 @@ +from fastapi import FastAPI +import requests +from contextlib import asynccontextmanager +import socket + +@asynccontextmanager +async def lifespan(app: FastAPI): + # Load the ML model + init() + yield + # Clean up the ML models and release the resources + clear() + +app = FastAPI(lifespan=lifespan) +server_address ="http://中心服务器IP地址:端口号/ip" +id = 0 + +# 向中心服务器发送自己的IP地址,并获取自己的id +def send_ip(ip: str): + url = server_address + # ip = get_local_ip # type: ignore + data = {"ip": ip} + response = requests.post(url, data=data) + data = response.json() + id = data['id'] + return id + +# 用socket获取本机ip +def get_local_ip(): + # 创建一个套接字对象 + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + # 连接到一个外部的服务器,这将自动绑定到本地IP地址 + s.connect(("8.8.8.8", 80)) + # 获取本地IP地址 + local_ip = s.getsockname()[0] + s.close() + return local_ip + + +id = int +def init(): + ip = get_local_ip() + global id + id = send_ip(ip) + +def clear(): + + pass + +@app.post("/heartbeat/") +async def receive_heartbeat(): + return {"status": "received"} + + + + + + + + +# 接收用户发来的消息,经过处理之后,再将消息发送给其他用户 +@app.post("/send_message") +async def send_message(message: str): + # 处理消息 + processed_message = message.upper() + # 发送消息给其他用户 + url = "http://其他用户IP地址:端口号/receive_message" + data = {"message": processed_message} + response = requests.post(url, data=data) + return response.json() From 4f2dd9727e5eeef260767409e25fffc39776ed4e Mon Sep 17 00:00:00 2001 From: sangge <2251250136@qq.com> Date: Sat, 21 Oct 2023 20:39:43 +0800 Subject: [PATCH 2/2] feat: add heartbeat package --- src/node.py | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/src/node.py b/src/node.py index 1f08167..c69be47 100644 --- a/src/node.py +++ b/src/node.py @@ -68,3 +68,52 @@ async def send_message(message: str): data = {"message": processed_message} response = requests.post(url, data=data) return response.json() + + +import requests + +def send_heartbeat(url: str) -> bool: + try: + response = requests.get(url, timeout=5) # 使用 GET 方法作为心跳请求 + response.raise_for_status() # 检查响应是否为 200 OK + + # 可选:根据响应内容进行进一步验证 + # if response.json() != expected_response: + # return False + + return True + except requests.RequestException: + return False + +# 使用方式 +url = "https://your-service-url.com/heartbeat" +if send_heartbeat(url): + print("Service is alive!") +else: + print("Service might be down or unreachable.") + + +import asyncio +from contextlib import asynccontextmanager +from fastapi import FastAPI + +async def receive_heartbeat_internal() -> int: + while True: + print('successful delete1') + timeout = 10 + # 删除超时的节点(假设你有一个异步的数据库操作函数) + await async_cursor_execute("DELETE FROM nodes WHERE last_heartbeat < ?", (time.time() - timeout,)) + await async_conn_commit() + print('successful delete') + await asyncio.sleep(timeout) + + return 1 + +@asynccontextmanager +async def lifespan(app: FastAPI): + task = asyncio.create_task(receive_heartbeat_internal()) + yield + task.cancel() # 取消我们之前创建的任务 + await clean_env() # 假设这是一个异步函数 + +# 其他FastAPI应用的代码...