chakokuのブログ(rev4)

日々のごった煮ブログです

AWS IoT Coreを使ってみる (デバイスからMQTTでPub/Sub)

AWS Iot Coreを調査中。まずはラズパイをセンサーとしてPublishしてみる。
AWS IoTでは、MQTT通信する際にはClient証明書で認証するようである。だから、デバイスごとにClient証明書の発行が必要。AWSのIoTコンソール画面を使って、端末(RaspberryPi)用のClient証明書を発行する。端末に付与される権限(各デバイスがどんなPub/Subをやっていいのか?の許可)はClient証明書に紐づいて管理される。だから、、与えたい権限をAWS IoT>Policiesの画面を用いてPolicyとして設定して、Client証明書に紐づける。サーバ側の準備はこれでいいのだが、、ラズパイからAWSに向けてMQTTで通信するためにMQTTライブラリが必要。AWSが推奨するライブラリを使うのが無難なので、AWS IoT SDKのライブラリをPIPコマンドでインストールする。以下は自分が実行した例。 直接pipコマンドを入力してライブラリインストールするとpython2のPIPが走ったりするので、python3からモジュールとしてpipを呼び出しています。以下操作でPython3用のAWS Iot SDKがラズパイ上のUbuntu入る。

sudo python3 -m pip install awsiotsdk

Python版のMQTTで記事を投稿(Publish)する短いサンプル(引用元は、脚注:引用元記事をご参照のこと)

#!/usr/bin/python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time as t
import json

# Define ENDPOINT, CLIENT_ID, PATH_TO_CERT, PATH_TO_KEY, PATH_TO_ROOT, MESSAGE, TOPIC, and RANGE
ENDPOINT = "a3XXXXXXXXf7t-ats.iot.ap-northeast-1.amazonaws.com"
CLIENT_ID = "basicPubSub"

PATH_TO_ROOT = "ca/root-CA.crt"
PATH_TO_CERT = "ca/upkpi.cert.pem"
PATH_TO_KEY = "ca/upkpi.private.key"
MESSAGE = "Hello World"
TOPIC = "sensor/room01"
RANGE = 20

# Spin up resources
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=ENDPOINT,
            cert_filepath=PATH_TO_CERT,
            pri_key_filepath=PATH_TO_KEY,
            client_bootstrap=client_bootstrap,
            ca_filepath=PATH_TO_ROOT,
            client_id=CLIENT_ID,
            clean_session=False,
            keep_alive_secs=6
            )
print("Connecting to {} with client ID '{}'...".format(
        ENDPOINT, CLIENT_ID))
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")
# Publish message to server desired number of times.
print('Begin Publish')
i=0
while True:
    i=i+1
    data = "{} [{}]".format(MESSAGE, i+1)
    message = {"message" : data}
    mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
    print("Published: '" + json.dumps(message) + "' to the topic: " + TOPIC)
    t.sleep(3)
print('Publish End')
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()

Subscribeするためのサンプル

#!/usr/bin/python3

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.

import argparse
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import sys
import threading
import time
from uuid import uuid4


endpoint = "a3XXXXXXXXf7t-ats.iot.ap-northeast-1.amazonaws.com"

root_ca = "ca/root-CA.crt"
cert = "ca/upkpi.cert.pem"
key = "ca/upkpi.private.key"
topic = "sensor/room01"
client_id = "basicPubSub"
verbosity=io.LogLevel.NoLogs.name
count = 10


io.init_logging(getattr(io.LogLevel, verbosity), 'stderr')

received_count = 0
received_all_event = threading.Event()

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
        resubscribe_results = resubscribe_future.result()
        print("Resubscribe results: {}".format(resubscribe_results))

        for topic, qos in resubscribe_results['topics']:
            if qos is None:
                sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))
    global received_count
    received_count += 1

if __name__ == '__main__':
    # Spin up resources
    event_loop_group = io.EventLoopGroup(1)
    host_resolver = io.DefaultHostResolver(event_loop_group)
    client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
            endpoint=endpoint,
            cert_filepath=cert,
            pri_key_filepath=key,
            client_bootstrap=client_bootstrap,
            ca_filepath=root_ca,
            on_connection_interrupted=on_connection_interrupted,
            on_connection_resumed=on_connection_resumed,
            client_id=client_id,
            clean_session=False,
            keep_alive_secs=6)

    print("Connecting to {} with client ID '{}'...".format(
        endpoint, client_id))

    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    # Subscribe
    print("Subscribing to topic '{}'...".format(topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received)

    subscribe_result = subscribe_future.result()
    print("Subscribed with {}".format(str(subscribe_result['qos'])))

    while True:
        time.sleep(1)
        print("{} message(s) received.".format(received_count))

    # Disconnect
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()
    print("Disconnected!")

分からない事:
自宅のネット環境に接続した、ラズパイとノートPCに、それぞれ、Pub役とSub役を実装して走らせると、エラーになる。片方ずつなら問題ない。なぜなのか?? client IDが basicPubSubを使ってるが、Pub/Subで重なっているから??それとも、エンドポイントへのMQTTの接続は、1つのグローバルIPからは1セッションしか接続できない?? わからん。。。

■追記
1系統を光回線でつないで、もう一つを携帯のテザリングでつないでネットワークを分けた。この状態で、異なるネットワークから、PubとSubを同時にやるとやはりエラーになった。となると、、client IDがbasicPubSubで同じだからエラーなのか??と。。しかし、client IDは自分で付けたのではないから変えられるのかどうか。。

■追記
client IDがどうやって決まっているのかわかっていなかった。ポリシィを再度読み直すと、client/basicPubSubとなっていて、ポリシィファイルのAllow Connectの段落で、許可IDを規定した(ここはAWS IoT クイックツアー風に乗っかって自動で作ってもらった所だから仕組みが分かっていなかった)。対応策として、、接続する側のデバイスには固有のclient idを持たせる必要がある。固有のIDに対応できるようポリシィを書き換えて、接続端末分のIDを追記するか、ワイルドカードで設定できるように書き換える必要がある。

    {
      "Effect": "Allow",
      "Action": [
        "iot:Connect"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:365701690774:client/sdk-java",
        "arn:aws:iot:ap-northeast-1:365701690774:client/basicPubSub",
        "arn:aws:iot:ap-northeast-1:365701690774:client/sdk-nodejs-*"
      ]
    }

上記学びに従ってPolicyを新たに作成した。MQTT接続するラズパイ、PC用にsensor01,sensor02を用意

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Publish",
        "iot:Receive"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:365701690774:topic/sensor/room01",
        "arn:aws:iot:ap-northeast-1:365701690774:topic/sensor/room02"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Subscribe"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:365701690774:topicfilter/sensor/room01",
        "arn:aws:iot:ap-northeast-1:365701690774:topicfilter/sensor/room02",
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Connect"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:365701690774:client/sensor01",
        "arn:aws:iot:ap-northeast-1:365701690774:client/sensor02",
        "arn:aws:iot:ap-northeast-1:365701690774:client/sensor03"
      ]
    }
  ]
}

先に作ったサンプルに対して、client-idをsensorNNに、Topicをsensor/room01等に変更する。
このように、各端末ごとにclient-idを個別に割り当ててclient-idを共用しないことで、複数端末からの同時操作が可能になった。

ここまではまぁMQTTなのでなんとかなったけど、、AWS IoTの電文をLambadとつないだり、さらには、Alexaとつないだりするのはワケ分からない。

引用元記事
https://aws.amazon.com/jp/premiumsupport/knowledge-center/iot-core-publish-mqtt-messages-python/

AWSIoTのライブラリは以下にGitHubにSample等もある
GitHub - aws/aws-iot-device-sdk-python-v2: Next generation AWS IoT Client SDK for Python using the AWS Common Runtime

AWS IoT Core の開始方法 - AWS(AWS) IoT コア

■次の取り組み・・Lambdaとつなぐ
AWS IoT Rulesというサービスがあって、ruleに書いておくとLambdaが呼び出されるようだ。。
一旦Lambdaが呼ばれたらあとはまぁいろいろできるかと。。 Lambdaを呼ばずともトピックをそのままKinesis?とかに投げ込むこともできるのかもしれない(Kinesis・・・ストリーミングデータのリアルタイム処理・・・個人では使わないけど。。)。
Creating a rule with an AWS Lambda action - AWS IoT Core

IoT RulesはCLIで設定するらしい。まぁCLIでもやりますけど。。段々と道が険しくなってきた印象。。
Creating an AWS IoT rule - AWS IoT Core
と思っていたら、、Rulesの管理画面があるようだ。IF THENルールはSQL形式で書く必要がある。。これは前からそういう説明だったし。。OK
ナレコム様の記事より
https://recipe.kc-cloud.jp/archives/10566


■追記
IoT Ruleの画面から、新たにルールを作成して、以下のようなSQLライクな条件を設定

SELECT * FROM 'sensor/#'

以下のように書くと、device-idも取得できるらしい(ナレコム様の記事より)

SELECT clientid() as id , * FROM 'sensor/#'

条件として、sensor/#でマッチする記事が投稿されたら、、という条件、Actionを指定できるので、Lambdaの所定の関数を起動という風に関連づけると、Publishで記事が投げ込まれたら指定したLambdaが実行される。PermissionとかCredentialsとか設定した覚えがないのだが、勝手に許可設定をやってくれるのか、ラズパイからMQTTで投げ込まれれると紐づけしたLambdaの関数が呼び出されている。。 IoT Rulesのメッセージ送り先はLambda以外にもAWSの各種サービスが一通りそろっているようだ。だんだんとビジネスロジックが複雑になってくるとAWS内で連動するサービスが増えてきて、テストであまり気にせず記事を投げ込みまくってそのまま止めずに放置しておくと、料金がかさむかも。。
ご参考:IoT Rulesによる設定画面例

Lambdaが呼び出されいるのをCloudWatchで確認(まだダミー関数なので)

■追記

バイスとも接続でき、publishされたメッセージをLambdaにも接続できたので、だいたいこれでいけそうだと安心していたら、、ふと、LambdaからどうやったらデバイスにPublishできるのか??が気になり始めた。最悪はLambdaがデバイスのふりしてAWS IoTのエンドポイントに接続するか。。と思いつつ、ググってみると、その意図に沿った記事が見つかり、botoを使うことが分かった。boto3.client('iot-data')でオブジェクト生成するようだ。この情報も、例の、ナレコム様記事より。Thanks!
URL: https://recipe.kc-cloud.jp/archives/10348
上記記事を書かれたのは3年前だった。3年遅れてやっとAWS IoT Coreをさわり始めているという。。
Boto3のIoTDataPlaneの仕様書
IoTDataPlane — Boto3 Docs 1.16.9 documentation

ゆくゆくは、スマフォからボタンをポチっと操作すると、AWSのMQTT経由で宅内装置がカチっとなったらいいなと考えた時、スマフォ画面がメンドー、よくわからん、、しかし、こじゃれたフレームワークもありそうだし、、、と思っていたら、、AWS IoTではモバイル用のSDKも用意しているようだ。。うーん、オソロシやAWS..こんなに至れり尽くせりでは、絶対に勝てる気がしない。。もちろん戦ってすらいないが。。
AWS IoT Device and Mobile SDKs - AWS IoT Core
詳しくは読んでいないが、、スマフォ用のSDKとは、スマフォからMQTTで通信させるためのライブラリの様であった。。(最初、絵作りを助けてくれる、各種センサー等のウイジット+Web APIの組み合わさったものかと思ってしまった)