前から作りたいと思っていた、MQTTで制御できるベルを作ってみる*1 。使うものは、IoT 1 Clickボタン、ESP32、サーボ、ベル。これらをAWS IoT Coreを使って接続して、IoT 1 Clickを押すとベルが鳴るというもの。
以下はIoT 1 Click(IoT Enterprise)ボタンで、AWS IoT 1-Clickサービスにメッセージを送ることができる。送信されるメッセージの内容は後半ご参照ください。
以下は100均で買った卓上ベル。上のボタンを押すとベルの内側のハンマーがベルを叩いて一回鳴る。結構音が大きい。
AWS IoT Coreを使って接続するので、IoT 1 Clickボタンでなくても、MQTTにPublishできるコマンドからベルを鳴らすのも可能
アーキテクチャ は以下
1 Clickボタンの受け側がIoT Core風のAWS IoT 1-CLickなので、そのままESP32にPublishしたらよさそうなものですが、1 Clickボタンからのイベントを受けてるIoT 1-Clickの仕様がよくわからないので、教科書通りにLamdaを介してIoT CoreにPublishしています。(AWS 側のポリシィとして、IoT 1 Clickのバックエンド側(AWS 内でのサーバ群)は、IoT Coreとしては使わせない設計と思われる。似ているからといって、そもそもMQTT/IoT Core使ってるかどうかも不明ですし)
デバイス セットアップ:
スマフォにアプリ(AWS IoT 1-Click)を入れて、AWS IoT 1Clickデバイス をセットアップする
AWS IoT 1-Click on the App Store
やることは、スマフォからBLE経由でAWS IoT 1 Clickに接続して、1Clickのファーム更新とWiFi 設定をする。
アプリからデバイス 登録もできるのか?と思ったが、登録は行えないようだ*2
IoT 1 Clickの設定:
AWS コンソール(アジアパシフィック)から、1-Clickのデバイス ID(DSN: Gnnnn)を打ち込んで登録した。
https://ap-northeast-1.console.aws.amazon.com/iot1click/home?region=ap-northeast-1
アカウントに正しく紐づけできると、AWS IoT 1-Clickのコンソール画面から、Manage>Devicesでたどると、登録したAWS IoT 1 Clickが表示される。デフォルトではDisableなのでEnableに変更する。
1-Clickをクリックすることで何か仕事をさせたい場合は、Projectを設定してProjectに連携内容を定義する。
Manual等しっかり読んでいないので多少ええ加減だが、ProjectはTemplateとPlaceの2種類あって、Templateだけ定義しただけでは、デバイス と処理が紐づかず、Placeで定義することで、デバイス と処理(Lambdaのキック等)が紐づく。
DeviceのEventログには以下のような記録があり、デバイス からはこういった情報が送られるのだろう。。MQTT?HTTP??プロトコル の詳細は分からないが。
{
"device": {
"type": "button",
"deviceId": "G030xxxxxxxxxJPG7",
"attributes": {},
"deviceArn": "arn:aws:iot1click:us-west-2:365701690774:devices/G030xxxxxxxJPG7"
},
"stdEvent": {
"clickType": "SINGLE",
"reportedTime": "2022-07-17T06:22:56.366Z",
"certificateId": "11272168e9a6b8fedc55f051e788eb161fc90a5370df0c888f0a92c5fcab017a",
"remainingLife": 97.95000000000023,
"testMode": true
}
} AWS Lambdaの設定(関数定義)
Lambdaには、素の関数を定義して、どのような値が渡されるのか引数を確認 (function名:aws _iot_one_click)
import json
def lambda_handler(event, context):
# TODO implement
print(event)
print(context)
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
} 引数eventに渡される値は以下であった
>||
{
'deviceInfo': {
'deviceId': 'G030xxxxxxJPG7',
'type': 'button',
'remainingLife': 97.8,
'attributes': {
'projectRegion': 'ap-northeast-1',
'projectName': 'ringBell',
'placementName': 'UPK_2F',
'deviceTemplateName': 'G030xxxxxxxxJPG7'
}
},
'deviceEvent': {
'buttonClicked': {
'clickType': 'SINGLE',
'reportedTime': '2022-07-17T06:59:21.544Z'
}
},
'placementInfo': {
'projectName': 'ringBell',
'placementName': 'UPK_2F',
'attributes': {
'UPK Room': '2F'
},
'devices': {
'G030xxxxxxxxJPG7': 'G030xxxxxxxJPG7'
}
}
} デバイス 情報に、プロジェクト情報や場所情報をくっつけてLambdaを呼んでくれているようだ。だから、どの家のどの部屋にあるボタンか?が分かると。
今は1 Clickボタン一つしかないので、デバイス の識別はせずに、Clickイベントがあれば、BELL鳴らすためのmessageをIoT CoreのTopicにPublishする。Publishはboto3を使って行えるようだ。これまでAWS IoT Coreを試作してきたが、場当たり的でTopic/messageの設計がええ加減だったので多少整備する。ベストかどうか全くわからないが、、とりあえず以下
Topic:
reg/<place>/<device_name> ....制御等の要求
rep/<place>/<device_name> ....センサ等のレポート メッセージ:
{
msg_type : request | report ,
request: <request_string> ,
report : <data>
} ベルを鳴らしたい場合は、、
Topic: req/upk/esp32_01
Message:
{
msg_type: request
request: ring_bell
} (String型のダブルクオート省略しています)
AWS IoT Coreに投げるため、Lambdaは以下のように定義
import json
import boto3
TOPIC = "req/upk/esp32_01"
def lambda_handler(event, context):
# TODO implement
print("called IoT 1 Click")
print(event)
iot = boto3.client('iot-data')
payload = {
"msg_type" : "request",
"request" : "ring_bell"
}
iot.publish(topic=TOPIC, payload=json.dumps(payload))
return {
'statusCode': 200,
'body': json.dumps('Hello from Lambda!')
} このままだと権限違反でIoT CoreにPublishできなので、LambdaのメソッドにIoT Core Full Access を付けました
(本当はFullAccess許可ではなく、Publishのみにすべきですが、、まぁ個人のテストなので)
ESP32側はおおよそ以下のソースでAWS IoT Coreと接続できる
from umqtt.simple import MQTTClient
import json
ENDPOINT = b'axxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
ID = 'esp32_01'
TOPIC = "req/upk/esp32_01"
KEYFILE = '/certs/private.pem.key'
CERTFILE = "/certs/certificate.pem.crt"
def _cb(topic, msg):
print("-------- call back ----------")
print(f"topic:{topic}")
print(f"msg:{msg}")
print("-----------------------------")
with open(KEYFILE, 'r') as f:
key = f.read()
with open(CERTFILE, 'r') as f:
cert = f.read()
# SSL certificates.
SSL_PARAMS = {'key': key, 'cert': cert, 'server_side': False}
client = MQTTClient(client_id=ID, server=ENDPOINT, port=8883, ssl=True, ssl_params=SSL_PARAMS)
client.set_callback(_cb)
client.connect()
client.subscribe(topic=TOPIC)
while True:
print("---loop---------------")
client.wait_msg() AWS IoT Coreの設定:
AWS IoT Coreでやるべきは、、Policy設定、デバイス の登録と証明書発行である。面倒だったら全部許可のPermissionを紐づけた証明書を発行すれば権限違反でエラーになることはない。動作確認した後で権限を最小にすると。。
後から権限を狭める前提で、、、試作段階では以下のように全許可のPolicyを作成
Policy_name: all_permit_policy
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "*",
"Resource": "*"
}
]
} デバイス 登録の際に、Key(秘密鍵 、公開鍵)とCert(Client証明書)が作成されるので、これをDLしてES32に入れる(秘密鍵 とClient証明書が必要)。上記のソースをREPLに貼り付けて実行
IoT 1 Clickのボタンを押すと、ESP32のMicroPythonのREPLに以下が表示された。これで、ESP32までは疎通できた。
---loop---------------
-------- call back ----------
topic:b'req/upk/esp32_01'
msg:b'{"msg_type": "request", "request": "ring_bell"}'
----------------------------- あとは、、サーボ制御を作って、サーボのアームを回してベルを押下させる処理を追加する。また、、一定時間経過後、アームを上げる必要がある。一定時間後ってのがまた面倒だが、、メインループがあるので、時間比較して、アームを戻す処理を追加するか。
MicroPythonの公式ドキュメントにPWMによるサーボ制御サンプルがあるので、これを参照
7. パルス幅変調 — MicroPython 1.19.1 ドキュメント
なお、、サーボのケーブルのうち、赤:+、茶:GND、黄色:信号、らしいので、赤は5V、茶は0Vに接続、黄色は、GPIO12に接続
コードは以下(公式ドキュメントで紹介された角度の最大・最小値とは少しずれており、34-110の幅で制御する)
#
# Remote Bell system w/ ESP32 + AWS IoT Core (20220717)
#
from umqtt.simple import MQTTClient
import machine
import utime
SERVO_PIN = 12
ARM_MIN = 34
ARM_MAX = 110
ENDPOINT = b'axxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
ID = 'esp32_01'
TOPIC = "req/upk/esp32_01"
KEYFILE = '/certs/6bb5.priv'
CERTFILE = "/certs/6bb5.crt"
servo = machine.PWM(machine.Pin(12), freq=50)
servo.duty(ARM_MIN)
def _cb(topic, msg):
print("-------- call back ----------")
print(f"topic:{topic}")
print(f"msg:{msg}")
print("-----------------------------")
servo.duty(ARM_MAX)
utime.sleep(0.3)
servo.duty(ARM_MIN)
with open(KEYFILE, 'r') as f:
key = f.read()
with open(CERTFILE, 'r') as f:
cert = f.read()
# SSL certificates.
SSL_PARAMS = {'key': key, 'cert': cert, 'server_side': False}
client = MQTTClient(client_id=ID, server=ENDPOINT, port=8883, ssl=True, ssl_params=SSL_PARAMS)
client.set_callback(_cb)
client.connect()
client.subscribe(topic=TOPIC)
while True:
print("---loop---------------")
client.wait_msg()
AWS IoT 1 clickボタンを押すとIoT Core経由でESP32に接続されたサーボが動くところまでは確認できた。あとは、アームとベルの位置を調整して、ベルのボタンを押せるようにすれば、鳴るはず。。機構の調整が面倒だ。
カムとかの機構を作るのはやったことなく、工具や部材もないので、アームに金属類をぶら下げて、アームをぐるっと回して金属部で叩くようにした。収まり感が非常に悪いが、鳴ることは鳴る。以下は出来上がったシステム全景(収まり感わるし)
ビデオ撮りました。応答速度等ご参考に。手間かかった*3 割には取れ高少ないです。。
VIDEO www.youtube.com
もっさり遅いので、どこで時間かかってるのか調べるつもりです。もっさり感に加え、サーボのギアの音が結構でかくて、ベル鳴らすまでもないという感じです。
■追記
試作段階では動くことを優先してIoT CoreのPolicyを全許可にしていた。動作確認できたので、少し権限を絞る。今は以下のPolicyを利用
接続許可デバイス 名:esp32_01
サブスクライブ許可トピック:req/upk/esp32_01
受信許可トピック:req/upk/esp32_01
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": "arn:aws:iot:ap-northeast-1:36xxxxxxx74:client/esp32_01"
},
{
"Effect": "Allow",
"Action": [
"iot:Subscribe"
],
"Resource": "arn:aws:iot:ap-northeast-1:36xxxxxxxx74:topicfilter/req/upk/esp32_01"
},
{
"Effect": "Allow",
"Action": [
"iot:Receive"
],
"Resource": "arn:aws:iot:ap-northeast-1:36xxxxxxx74:topic/req/upk/esp32_01"
}
]
} ■参考URL
mqtt simple2のサンプル
micropython-umqtt.simple2/example_sub.py at master · fizista/micropython-umqtt.simple2 · GitHub