How to implement reactive communication between a Python server, an MQTT broker, and a web client using WebSockets?

  Kiến thức lập trình

I’m working on a project where I want to implement a system that can receive messages from an MQTT broker implemented with Mosquitto and forward them reactively to a web browser without resorting to polling. Specifically, messages sent by the MQTT broker are base64-encoded images and follow the below structure:

{'source': 'source_name', 'measure':'image(base64 encoded)', 'timestamp':timestamp}

The goal is to have a Python server that receives these messages reactively and forwards them to the web client through a WebSocket connection. Additionally, I want the web client to be able to send messages to the Python server.

Currently, I’m trying to implement this solution using FastAPI for the Python server, FastMQTT for managing the MQTT broker, and WebSocket for communication with the web client. Here’s the code I’ve written so far:

from fastapi import FastAPI, WebSocket
from fastapi.responses import HTMLResponse
from fastapi_mqtt.fastmqtt import FastMQTT
from fastapi_mqtt.config import MQTTConfig
import uvicorn

app = FastAPI()

mqtt_config = MQTTConfig(host="localhost", port=1883)

fast_mqtt = FastMQTT(config=mqtt_config)

fast_mqtt.init_app(app)

messages = []

@fast_mqtt.on_connect()
def connect(client, flags, rc, properties):
    print("Connected: ", client, flags, rc, properties)

@fast_mqtt.on_disconnect()
def disconnect(client, packet, exc=None):
    print("Disconnected")

@fast_mqtt.subscribe("room1/cam")
async def cam_handler(client, topic, payload, qos, properties):
    print("Received message on topic: ", topic)
    messages.append(payload.decode())

html = """
<!DOCTYPE html>
<html>
<head> 
    <title>WebSocket Test</title>
</head>
<body>
    <h1>WebSocket Test</h1>

    <script>
        document.addEventListener("DOMContentLoaded", function(event) {
            var ws = new WebSocket("ws://localhost:8080/ws");
            ws.onopen = function(event) {
                console.log("WebSocket connection established.");
            };
            ws.onmessage = function(event) {
                console.log("Message received:", event.data);
                // Handle incoming messages as needed
            };
            ws.onerror = function(event) {
                console.error("WebSocket error:", event);
            };
            ws.onclose = function(event) {
                console.log("WebSocket connection closed.");
            };
        });
    </script>
</body>
</html>
"""

@app.get("/")
async def root():
    return HTMLResponse(html)

@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    while messages:
        message = messages.pop(0)
        await websocket.send_text(message)
            

if __name__ == "__main__":
    uvicorn.run(app, host="localhost", port=8080)

However, I’ve encountered an issue: once the web client connects to the Python server, the while True loop in the WebSocket handler seems to be blocking, preventing the execution of other code blocks. This results in the blocking of MQTT message reception and the inability to process and send them to the web client.

I’m looking for a solution that allows the Python server to handle both MQTT message reception and communication with the web client reactively, avoiding polling and ensuring a continuous flow of data between the components.

I would appreciate any suggestions on how to solve this problem or more efficient alternatives for implementing communication between the MQTT broker, Python server, and web client.

Thanks in advance for your help!

LEAVE A COMMENT