Jak używać Python-Socker.io z szybkim interfejsem API
# server.py
from typing import Any
import uvicorn
from fastapi import FastAPI
import socketio
sio: Any = socketio.AsyncServer(async_mode="asgi")
socket_app = socketio.ASGIApp(sio)
app = FastAPI()
async def test():
return "WORKS"
app.mount("/", socket_app) # Here we mount socket app to main fastapi app
async def connect(sid, env):
print("on connect")
async def direct(sid, msg):
print(f"direct {msg}")
await sio.emit("event_name", msg, room=sid) # we can send message to specific sid
async def broadcast(sid, msg):
print(f"broadcast {msg}")
await sio.emit("event_name", msg) # or send to everyone
async def disconnect(sid):
print("on disconnect")
if __name__ == "__main__":
kwargs = {"host": "", "port": 5000}
kwargs.update({"debug": True, "reload": True})
uvicorn.run("server:app", **kwargs)