全てはスマートメータのための泥沼なのだが、、AWS IoTを介して、RPiとESP32をPub/Subで接続する。
メッセージの発行はRPi側で、ESP32は購読のみ。
RPi側のソース
#!/usr/bin/python import paho.mqtt.client as mqtt import json import ssl host = 'a3bxxxxxkf7t-ats.iot.ap-northeast-1.amazonaws.com' port = 8883 cacert = './cert/AmazonRootCA1.pem' clientCert = './cert/16f31xxxxxxxxxxxxxxx324f-certificate.pem.crt' clientKey = './cert/16f31xxxxxxxxxxxxxxxx324f-private.pem.key' id = "sensor01" def _connect(client, userdata, flags, respons_code): print("connected") def _publish(client, userdata, mid): client.disconnect() client = mqtt.Client(client_id= id, protocol=mqtt.MQTTv311) client.tls_set(cacert, certfile = clientCert, keyfile = clientKey, tls_version = ssl.PROTOCOL_TLSv1_2) client.tls_insecure_set(True) client.on_connect = _connect client.on_publish = _publish client.connect(host, port=port, keepalive=6) topic = 'topic_1' msg = "hello from RPi" import time while True: time.sleep(5) print("-----------------------------------------") print(f"publish msg:{msg}") client.publish(topic, json.dumps({"msg" : msg}), qos=1)
EPS32側のソース(バグがあるようでMQTTの接続が切れる)
from umqtt.simple import MQTTClient import json ENDPOINT = b'axxxxxxxx7t-ats.iot.ap-northeast-1.amazonaws.com' ID='basicPubSub' def _cb(topic, msg): print("call back") print(topic) print(msg) keyfile = '/certs/private.pem.key' with open(keyfile, 'r') as f: key = f.read() certfile = "/certs/certificate.pem.crt" with open(certfile, 'r') as f: cert = f.read() # SSL certificates. SSL_PARAMS = {'key': key, 'cert': cert, 'server_side': False} client = MQTTClient(client_id=ID, server=ENDPOINT, port=8883, ssl=True, ssl_params=SSL_PARAMS) client.set_callback(_cb) gc.collect() client.connect() client.subscribe(topic="topic_1") while True: client.wait_msg() print("------------------")
気休めだけど、connect()の実行前に、GCを走らせたら安定している(今は)。GCした場合、しなかった場合でもう少しどう変わるか確認が必要
接続が切れるのは、Pahoで実装したRPi側のPublishしている方だった。RPiだからリソースも十分なはずなのに。。
以下はどうにかこうにか動いている画面(上がRPi、下がESP32)
Publishするだけなら、MQTTでなくても、AWS IoTが提供しているHTTPS/POSTでいいのだが。。
■追記
日が変わって製作を再開したが、昨日は出なかったエラーが出る。ESP32側なのでかなり厄介(代替え手段がほとんどない、デバッグ手段がない)。ソースを確認するか。。
Traceback (most recent call last): File "<stdin>", line 31, in <module> File "umqtt/simple.py", line 184, in wait_msg OSError: -1 >>>
micropython-lib/simple.py at master · micropython/micropython-lib · GitHub
def wait_msg(self): res = self.sock.read(1) self.sock.setblocking(True) if res is None: return None if res == b"": raise OSError(-1)
エラーが出ている原因は、sock.read(1)で""が返却されるため。""が返るのはどんな時なのか。。sock.read(1)の正しい動きは、データか、NULLが返却される仕様なんだろう。""が返るのはsessionがcloseされているからかもしれない。
CloudWatchのログを確認すると、ClientIDの重複エラーが発生している。AWSサーバ(MQTT ブローカ)側から通信を遮断したと考えられる。
{ "timestamp": "2022-04-23 07:36:21.348", "logLevel": "ERROR", "traceId": "016a2392-d772-e030-456b-399da25017a9", "accountId": "365701690774", "status": "Failure", "eventType": "Disconnect", "protocol": "MQTT", "clientId": "basicPubSub", "principalId": "16f31f1aaffde60f2ac508f99b9604b3aaf601c6531534fc99919d52aa83324f", "sourceIp": "121.118.240.85", "sourcePort": 57235, "reason": "DUPLICATE_CLIENT_ID", "details": "A new connection was established with the same client ID", "disconnectReason": "DUPLICATE_CLIENTID" }
IDを変えれば解決するはず。昨日はどうだったのか?? 昨日は異なるIDを使っている。昨日と今日とでは、一部ソースが変わってしまっていて、ID重複を起こしてしまったようだ。何も変えていないつもりが変わっているという。。