chakokuのブログ(rev4)

テック・コミック・DTM・・・ごくまれにチャリ

ローカル環境にMQTT Brokerを構築して性能を調べる

背景:MQTT Brokerを使ったゲームを作る必要あり
課題:無料で使えるPublic MQTT Brokerは遅延が大きかったり、Quotaが掛かっていて、性能上に不足である。一方、AWS IoTは有料であり、試行錯誤していると無料枠を超えて有料になるかもしれない。
取り組み:ローカル環境にMQTT Brokerを構築して、遅延ほぼ0で、Quotaもかかっていない条件下で試作する。完成したらAWS IoT等を使う
結論:ローカルに構築したmosquitto MQTT Brokerの場合、pub/subの遅延は4.45msec、1秒間に処理できるメッセージ数は、6896通*1
詳細:

先日デスクトップPCをUbuntu専用機にしたので、その上でDocker+MQTTで構築する。専用コンテナが提供されているので、docker-composeで起動してみる。
file: docker-compose.yml

version: '3'
services:
  mosquitto:
    image: eclipse-mosquitto:latest
    container_name: mosquitto
    ports:
      - "1883:1883"  # MQTTポート
    volumes:
      - ./config:/mosquitto/config
      - ./data:/mosquitto/data
      - ./log:/mosquitto/log
    restart: unless-stopped

file: mosquitto.conf

listener 1883
protocol mqtt

persistence true
persistence_location /mosquitto/data/

log_dest file /mosquitto/log/mosquitto.log
log_dest stdout

allow_anonymous true

docker compose up -dで起動する

docker compose up -d

最初ThingBoardが走っていて1883がぶつかったので、ThingBoardを止めて再実行、起動された

~/tech/docker/mosquitto$ docker ps
CONTAINER ID   IMAGE                      COMMAND                  CREATED              STATUS          PORTS                                         NAMES
7c29776863af   eclipse-mosquitto:latest   "/docker-entrypoint.…"   About a minute ago   Up 12 seconds   0.0.0.0:1883->1883/tcp, [::]:1883->1883/tcp   mosquitto

単純に自分でメッセージを発行して、メッセージを受信するプログラムで、応答時間を計測した。何度か計測して集計したところ、中央値(Median)は、4.45msecであった。ローカルだから超早い。*2

秒間どれぐらい処理できるのか測ってみた。 2000通処理するのに、0.29秒かかった。1秒に処理できるメッセージ数は、6896通(あくまでも理想値だが)

Elapsed time 2000 publish: 0.29008930000054534 seconds

ご参考に計測したプログラムは以下(NotePC上で走らせています)(処理遅延やメッセージ消失を考慮していない乱暴なコードですがまぁオーダを知るぐらいには使えるだろうと)

#!/usr/bin/python3

import paho.mqtt.client as mqtt
import time

BROKER = '192.168.10.125'

start_time = 0

def on_connect(client, userdata, flag, rc):
  print("Connected with result code " + str(rc))
  client.subscribe("mqtt/echo")

def on_disconnect(client, userdata, rc):
  if  rc != 0:
    print("Unexpected disconnection.")

import pdb

time_of_1 = None
time_of_2000 = None
def on_message(client, userdata, msg):
  global time_of_1
  global time_of_2000
  payload = str(msg.payload)
  print("Received message '" + payload + "' on topic '" + msg.topic)
  end_time = time.perf_counter()
  elapsed_time = end_time - start_time
  print(f"Elapsed time: {elapsed_time} seconds")
  if msg.payload == b'hello 2':
     print('--- Start !! ---')
     time_of_1 = time.perf_counter()
  if msg.payload == b'hello 2000':
     print('--- end !! ---')
     time_of_2000 = time.perf_counter()

client = mqtt.Client()
if 'ID' in globals():
    if ID:
        client.username_pw_set(ID, PWD)

client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message

client.connect(BROKER, 1883, 60)
client.loop_start()
time.sleep(2)

print(BROKER)
for i in range(2001):
    start_time = time.perf_counter()
    client.publish('mqtt/echo', f'hello {i}')
    #time.sleep(0.001)  #wait 2msec

time.sleep(2)
print(time_of_1, time_of_2000)
elapsed_time = time_of_2000 - time_of_1
print(f"Elapsed time 2000 publish: {elapsed_time} seconds")

client.loop_stop()
client.disconnect()

ローカル環境のmosquitto MQTT Brokerであれば、ゲームの処理性能上全く問題はない。あと調べたいのは、マイコンでMQTTメッセージ発行した場合、秒間どれぐらいメッセージ発行できるのか?というところ。

ESP32 S3において、1000通のメッセージを送受信するのにどれぐらい時間がかかるかを調べた。システムクロックは240MHzに設定した。結果は以下

start to end (us): 14222878
start to end (ms): 14222.88
start to end (s): 14.22288

1000通処理するのに、14.2秒なので、1秒の処理数は、70通であった。
この性能だと、、仮にGame端末が10台だとすると、秒あたり7メッセージまで処理できる。秒10通で送られる場合、Game端末は7台まで接続可能か。
ご参考に、グサグサのプログラムですが、ESP32用MicroPythonのMQTT送受信テストプログラムは以下

import time
import random
import json
import math

from umqtt.simple import MQTTClient

MQTT_BROKER = "192.168.10.125"
CLIENT_ID = 'RPi-pico-001'
USER_ID = "pico000"
PASSWD = "zzz"

# MQTT over TCP
PORT = 1883


TOPIC = "rpi-pico-001/sensor"
MSG = '{"data" : 123}'

def connect():
    print('Connected to MQTT Broker')
    client = MQTTClient(CLIENT_ID, MQTT_BROKER, PORT, USER_ID, PASSWD)
    client.connect()
    return client

def reconnect():
    print('Failed to connect to MQTT broker, Reconnecting...')
    time.sleep(5)
    client.reconnect()

send_time = None
last_message = None
def on_message(topic, msg):
    global send_time
    global last_message
    global end_time
    receive_time = time.ticks_us()
    print('on_message', topic, msg)
    print('send back(ms):', time.ticks_diff(receive_time,send_time))
    print('now:', time.localtime()[5])
    if msg == b'{"data": 1000}':
        last_message = True
        end_time =  time.ticks_us()

try:
    client = connect()
except OSError as e:
    reconnect()


import machine
machine.freq(240_000_000)


client.set_callback(on_message)
client.subscribe(TOPIC)

start_time = None
end_time = None
def test_loop():
    global start_time
    count = 0
    send_time = 0
    value = 123
    start_time = time.ticks_us()
    for i in range(1001):
        client.check_msg()
        msg = {"data" : i} 
        print(f'send message {msg} on topic: {TOPIC}')
        send_time = time.ticks_us()
        client.publish(TOPIC, json.dumps(msg), qos=0)
    
test_loop()    

#  loop  until last message   (max loop 2000)
for i in range(2000):
    if last_message:
        break
    print(i)
    client.check_msg()

print('start to end (us):', time.ticks_diff(end_time, start_time))
print('start to end (ms):', time.ticks_diff(end_time, start_time)/1000)
print('start to end (s):', time.ticks_diff(end_time, start_time)/(1000*1000))

*1:PC側のプログラムがボトルネックになっている可能性あります

*2:インターネット上のMQTT Broker サービスは応答の早いサーバで160mec(EMQX Public MQTT Broker)であった。AWS IoTはもっと早いかも(有料だから調べていないない)