chakokuのブログ(rev4)

テック・コミック・DTM・・・ごくまれにチャリ

AWS IoTを介して RPiとESP32を接続

全てはスマートメータのための泥沼なのだが、、AWS IoTを介して、RPiとESP32をPub/Subで接続する。
メッセージの発行はRPi側で、ESP32は購読のみ。
RPi側のソース

#!/usr/bin/python

import paho.mqtt.client as mqtt
import json
import ssl

host = 'a3bxxxxxkf7t-ats.iot.ap-northeast-1.amazonaws.com'
port = 8883
cacert = './cert/AmazonRootCA1.pem'
clientCert = './cert/16f31xxxxxxxxxxxxxxx324f-certificate.pem.crt'
clientKey = './cert/16f31xxxxxxxxxxxxxxxx324f-private.pem.key'

id = "sensor01"

def _connect(client, userdata, flags, respons_code):
    print("connected")

def _publish(client, userdata, mid):
    client.disconnect()
    
client = mqtt.Client(client_id= id, protocol=mqtt.MQTTv311)
client.tls_set(cacert,
        certfile = clientCert,
        keyfile = clientKey,
        tls_version = ssl.PROTOCOL_TLSv1_2)

client.tls_insecure_set(True)
client.on_connect = _connect
client.on_publish = _publish
client.connect(host, port=port, keepalive=6)

topic = 'topic_1'
msg = "hello from RPi"

import time
while True:
   time.sleep(5)
   print("-----------------------------------------")
   print(f"publish msg:{msg}")
   client.publish(topic, json.dumps({"msg" : msg}), qos=1)

EPS32側のソース(バグがあるようでMQTTの接続が切れる)

from umqtt.simple import MQTTClient
import json

ENDPOINT = b'axxxxxxxx7t-ats.iot.ap-northeast-1.amazonaws.com'
ID='basicPubSub'

def _cb(topic, msg):
    print("call back")
    print(topic)
    print(msg)

keyfile = '/certs/private.pem.key'
with open(keyfile, 'r') as f:
    key = f.read()

certfile = "/certs/certificate.pem.crt"
with open(certfile, 'r') as f:
    cert = f.read()

# SSL certificates.
SSL_PARAMS = {'key': key, 'cert': cert, 'server_side': False}
client = MQTTClient(client_id=ID, server=ENDPOINT, port=8883, ssl=True, ssl_params=SSL_PARAMS)

client.set_callback(_cb)
gc.collect()
client.connect()
client.subscribe(topic="topic_1")

while True:
   client.wait_msg()
   print("------------------")

気休めだけど、connect()の実行前に、GCを走らせたら安定している(今は)。GCした場合、しなかった場合でもう少しどう変わるか確認が必要
接続が切れるのは、Pahoで実装したRPi側のPublishしている方だった。RPiだからリソースも十分なはずなのに。。
以下はどうにかこうにか動いている画面(上がRPi、下がESP32)

Publishするだけなら、MQTTでなくても、AWS IoTが提供しているHTTPS/POSTでいいのだが。。

■追記
日が変わって製作を再開したが、昨日は出なかったエラーが出る。ESP32側なのでかなり厄介(代替え手段がほとんどない、デバッグ手段がない)。ソースを確認するか。。

Traceback (most recent call last):
  File "<stdin>", line 31, in <module>
  File "umqtt/simple.py", line 184, in wait_msg
OSError: -1
>>>

micropython-lib/simple.py at master · micropython/micropython-lib · GitHub

    def wait_msg(self):
        res = self.sock.read(1)
        self.sock.setblocking(True)
        if res is None:
            return None
        if res == b"":
            raise OSError(-1)

エラーが出ている原因は、sock.read(1)で""が返却されるため。""が返るのはどんな時なのか。。sock.read(1)の正しい動きは、データか、NULLが返却される仕様なんだろう。""が返るのはsessionがcloseされているからかもしれない。
CloudWatchのログを確認すると、ClientIDの重複エラーが発生している。AWSサーバ(MQTT ブローカ)側から通信を遮断したと考えられる。

{
    "timestamp": "2022-04-23 07:36:21.348",
    "logLevel": "ERROR",
    "traceId": "016a2392-d772-e030-456b-399da25017a9",
    "accountId": "365701690774",
    "status": "Failure",
    "eventType": "Disconnect",
    "protocol": "MQTT",
    "clientId": "basicPubSub",
    "principalId": "16f31f1aaffde60f2ac508f99b9604b3aaf601c6531534fc99919d52aa83324f",
    "sourceIp": "121.118.240.85",
    "sourcePort": 57235,
    "reason": "DUPLICATE_CLIENT_ID",
    "details": "A new connection was established with the same client ID",
    "disconnectReason": "DUPLICATE_CLIENTID"
}

IDを変えれば解決するはず。昨日はどうだったのか?? 昨日は異なるIDを使っている。昨日と今日とでは、一部ソースが変わってしまっていて、ID重複を起こしてしまったようだ。何も変えていないつもりが変わっているという。。