全てはスマートメータのための泥沼なのだが、、AWS IoTを介して、RPiとESP32をPub/Subで接続する。
メッセージの発行はRPi側で、ESP32は購読のみ。
RPi側のソース
#!/usr/bin/python
import paho.mqtt.client as mqtt
import json
import ssl
host = 'a3bxxxxxkf7t-ats.iot.ap-northeast-1.amazonaws.com'
port = 8883
cacert = './cert/AmazonRootCA1.pem'
clientCert = './cert/16f31xxxxxxxxxxxxxxx324f-certificate.pem.crt'
clientKey = './cert/16f31xxxxxxxxxxxxxxxx324f-private.pem.key'
id = "sensor01"
def _connect(client, userdata, flags, respons_code):
print("connected")
def _publish(client, userdata, mid):
client.disconnect()
client = mqtt.Client(client_id= id, protocol=mqtt.MQTTv311)
client.tls_set(cacert,
certfile = clientCert,
keyfile = clientKey,
tls_version = ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
client.on_connect = _connect
client.on_publish = _publish
client.connect(host, port=port, keepalive=6)
topic = 'topic_1'
msg = "hello from RPi"
import time
while True:
time.sleep(5)
print("-----------------------------------------")
print(f"publish msg:{msg}")
client.publish(topic, json.dumps({"msg" : msg}), qos=1)
EPS32側のソース(バグがあるようでMQTTの接続が切れる)
from umqtt.simple import MQTTClient
import json
ENDPOINT = b'axxxxxxxx7t-ats.iot.ap-northeast-1.amazonaws.com'
ID='basicPubSub'
def _cb(topic, msg):
print("call back")
print(topic)
print(msg)
keyfile = '/certs/private.pem.key'
with open(keyfile, 'r') as f:
key = f.read()
certfile = "/certs/certificate.pem.crt"
with open(certfile, 'r') as f:
cert = f.read()
# SSL certificates.
SSL_PARAMS = {'key': key, 'cert': cert, 'server_side': False}
client = MQTTClient(client_id=ID, server=ENDPOINT, port=8883, ssl=True, ssl_params=SSL_PARAMS)
client.set_callback(_cb)
gc.collect()
client.connect()
client.subscribe(topic="topic_1")
while True:
client.wait_msg()
print("------------------")
気休めだけど、connect()の実行前に、GCを走らせたら安定している(今は)。GCした場合、しなかった場合でもう少しどう変わるか確認が必要
接続が切れるのは、Pahoで実装したRPi側のPublishしている方だった。RPiだからリソースも十分なはずなのに。。
以下はどうにかこうにか動いている画面(上がRPi、下がESP32)
Publishするだけなら、MQTTでなくても、AWS IoTが提供しているHTTPS/POSTでいいのだが。。
■追記
日が変わって製作を再開したが、昨日は出なかったエラーが出る。ESP32側なのでかなり厄介(代替え手段がほとんどない、デバッグ手段がない)。ソースを確認するか。。
Traceback (most recent call last):
File "<stdin>", line 31, in <module>
File "umqtt/simple.py", line 184, in wait_msg
OSError: -1
>>>
micropython-lib/simple.py at master · micropython/micropython-lib · GitHub
def wait_msg(self):
res = self.sock.read(1)
self.sock.setblocking(True)
if res is None:
return None
if res == b"":
raise OSError(-1)
エラーが出ている原因は、sock.read(1)で""が返却されるため。""が返るのはどんな時なのか。。sock.read(1)の正しい動きは、データか、NULLが返却される仕様なんだろう。""が返るのはsessionがcloseされているからかもしれない。
CloudWatchのログを確認すると、ClientIDの重複エラーが発生している。AWSサーバ(MQTT ブローカ)側から通信を遮断したと考えられる。
{
"timestamp": "2022-04-23 07:36:21.348",
"logLevel": "ERROR",
"traceId": "016a2392-d772-e030-456b-399da25017a9",
"accountId": "365701690774",
"status": "Failure",
"eventType": "Disconnect",
"protocol": "MQTT",
"clientId": "basicPubSub",
"principalId": "16f31f1aaffde60f2ac508f99b9604b3aaf601c6531534fc99919d52aa83324f",
"sourceIp": "121.118.240.85",
"sourcePort": 57235,
"reason": "DUPLICATE_CLIENT_ID",
"details": "A new connection was established with the same client ID",
"disconnectReason": "DUPLICATE_CLIENTID"
}
IDを変えれば解決するはず。昨日はどうだったのか?? 昨日は異なるIDを使っている。昨日と今日とでは、一部ソースが変わってしまっていて、ID重複を起こしてしまったようだ。何も変えていないつもりが変わっているという。。