背景:MQTT Brokerを使ったゲームを作る必要あり
課題:無料で使えるPublic MQTT Brokerは遅延が大きかったり、Quotaが掛かっていて、性能上に不足である。一方、AWS IoTは有料であり、試行錯誤していると無料枠を超えて有料になるかもしれない。
取り組み:ローカル環境にMQTT Brokerを構築して、遅延ほぼ0で、Quotaもかかっていない条件下で試作する。完成したらAWS IoT等を使う
結論:ローカルに構築したmosquitto MQTT Brokerの場合、pub/subの遅延は4.45msec、1秒間に処理できるメッセージ数は、6896通*1
詳細:
先日デスクトップPCをUbuntu専用機にしたので、その上でDocker+MQTTで構築する。専用コンテナが提供されているので、docker-composeで起動してみる。
file: docker-compose.yml
version: '3'
services:
mosquitto:
image: eclipse-mosquitto:latest
container_name: mosquitto
ports:
- "1883:1883" # MQTTポート
volumes:
- ./config:/mosquitto/config
- ./data:/mosquitto/data
- ./log:/mosquitto/log
restart: unless-stoppedfile: mosquitto.conf
listener 1883 protocol mqtt persistence true persistence_location /mosquitto/data/ log_dest file /mosquitto/log/mosquitto.log log_dest stdout allow_anonymous true
docker compose up -dで起動する
docker compose up -d
最初ThingBoardが走っていて1883がぶつかったので、ThingBoardを止めて再実行、起動された
~/tech/docker/mosquitto$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 7c29776863af eclipse-mosquitto:latest "/docker-entrypoint.…" About a minute ago Up 12 seconds 0.0.0.0:1883->1883/tcp, [::]:1883->1883/tcp mosquitto
単純に自分でメッセージを発行して、メッセージを受信するプログラムで、応答時間を計測した。何度か計測して集計したところ、中央値(Median)は、4.45msecであった。ローカルだから超早い。*2
秒間どれぐらい処理できるのか測ってみた。 2000通処理するのに、0.29秒かかった。1秒に処理できるメッセージ数は、6896通(あくまでも理想値だが)
Elapsed time 2000 publish: 0.29008930000054534 seconds
ご参考に計測したプログラムは以下(NotePC上で走らせています)(処理遅延やメッセージ消失を考慮していない乱暴なコードですがまぁオーダを知るぐらいには使えるだろうと)
#!/usr/bin/python3
import paho.mqtt.client as mqtt
import time
BROKER = '192.168.10.125'
start_time = 0
def on_connect(client, userdata, flag, rc):
print("Connected with result code " + str(rc))
client.subscribe("mqtt/echo")
def on_disconnect(client, userdata, rc):
if rc != 0:
print("Unexpected disconnection.")
import pdb
time_of_1 = None
time_of_2000 = None
def on_message(client, userdata, msg):
global time_of_1
global time_of_2000
payload = str(msg.payload)
print("Received message '" + payload + "' on topic '" + msg.topic)
end_time = time.perf_counter()
elapsed_time = end_time - start_time
print(f"Elapsed time: {elapsed_time} seconds")
if msg.payload == b'hello 2':
print('--- Start !! ---')
time_of_1 = time.perf_counter()
if msg.payload == b'hello 2000':
print('--- end !! ---')
time_of_2000 = time.perf_counter()
client = mqtt.Client()
if 'ID' in globals():
if ID:
client.username_pw_set(ID, PWD)
client.on_connect = on_connect
client.on_disconnect = on_disconnect
client.on_message = on_message
client.connect(BROKER, 1883, 60)
client.loop_start()
time.sleep(2)
print(BROKER)
for i in range(2001):
start_time = time.perf_counter()
client.publish('mqtt/echo', f'hello {i}')
#time.sleep(0.001) #wait 2msec
time.sleep(2)
print(time_of_1, time_of_2000)
elapsed_time = time_of_2000 - time_of_1
print(f"Elapsed time 2000 publish: {elapsed_time} seconds")
client.loop_stop()
client.disconnect()ローカル環境のmosquitto MQTT Brokerであれば、ゲームの処理性能上全く問題はない。あと調べたいのは、マイコンでMQTTメッセージ発行した場合、秒間どれぐらいメッセージ発行できるのか?というところ。
ESP32 S3において、1000通のメッセージを送受信するのにどれぐらい時間がかかるかを調べた。システムクロックは240MHzに設定した。結果は以下
start to end (us): 14222878 start to end (ms): 14222.88 start to end (s): 14.22288
1000通処理するのに、14.2秒なので、1秒の処理数は、70通であった。
この性能だと、、仮にGame端末が10台だとすると、秒あたり7メッセージまで処理できる。秒10通で送られる場合、Game端末は7台まで接続可能か。
ご参考に、グサグサのプログラムですが、ESP32用MicroPythonのMQTT送受信テストプログラムは以下
import time
import random
import json
import math
from umqtt.simple import MQTTClient
MQTT_BROKER = "192.168.10.125"
CLIENT_ID = 'RPi-pico-001'
USER_ID = "pico000"
PASSWD = "zzz"
# MQTT over TCP
PORT = 1883
TOPIC = "rpi-pico-001/sensor"
MSG = '{"data" : 123}'
def connect():
print('Connected to MQTT Broker')
client = MQTTClient(CLIENT_ID, MQTT_BROKER, PORT, USER_ID, PASSWD)
client.connect()
return client
def reconnect():
print('Failed to connect to MQTT broker, Reconnecting...')
time.sleep(5)
client.reconnect()
send_time = None
last_message = None
def on_message(topic, msg):
global send_time
global last_message
global end_time
receive_time = time.ticks_us()
print('on_message', topic, msg)
print('send back(ms):', time.ticks_diff(receive_time,send_time))
print('now:', time.localtime()[5])
if msg == b'{"data": 1000}':
last_message = True
end_time = time.ticks_us()
try:
client = connect()
except OSError as e:
reconnect()
import machine
machine.freq(240_000_000)
client.set_callback(on_message)
client.subscribe(TOPIC)
start_time = None
end_time = None
def test_loop():
global start_time
count = 0
send_time = 0
value = 123
start_time = time.ticks_us()
for i in range(1001):
client.check_msg()
msg = {"data" : i}
print(f'send message {msg} on topic: {TOPIC}')
send_time = time.ticks_us()
client.publish(TOPIC, json.dumps(msg), qos=0)
test_loop()
# loop until last message (max loop 2000)
for i in range(2000):
if last_message:
break
print(i)
client.check_msg()
print('start to end (us):', time.ticks_diff(end_time, start_time))
print('start to end (ms):', time.ticks_diff(end_time, start_time)/1000)
print('start to end (s):', time.ticks_diff(end_time, start_time)/(1000*1000))