リモート XBee IO データを MQTT Broker に送信、XBee-MQTT Gateway (その1)

●概要

今回の記事では、リモートから送信したXBee IO サンプリングデータを MQTT ブローカに自動送信するシステムを構築します。

XBee(Series1) や XBee-ZB(Series2) デバイスには A/D や DIO の値を定期的に送信するIOサンプリング機能が予め備わっています。このシステムでは XBee から送信された IO サンプリングデータを JSON 形式に変換して、MQTT ブローカに送信するシステムを構築します。

リモートに設置した複数の XBee(Series1) や XBee-ZB(Series2) デバイスから IO サンプリングパケットは、DeviceServer に接続したXBee デバイスで受信します。

Series1 の場合にはスター接続の中央に配置されたコントローラデバイスで、Series2 の場合には Coordinator デバイスが DeviceServer に接続されています。DeviceServer で受信したIO サンプリングパケットはイベントハンドラ中で解析して、JSON 文字列に変換した後 MQTT ブローカに Publish メッセージとして配信します。

複数のリモート XBee デバイスからの IO サンプリングデータを、 MQTT ブローカの購読側で簡単に選別できるように、トピック名をデバイス名(NodeIdentifier) を含んだ毎にユニークな文字列 “/<”xbee” | “zb”>/<NodeIdentifier>/”io” にして送信します。またメッセージ内容は JSON 形式で下記のフォーマットで送信します

{“ad0″:<ad0_value>, .. ,”ad1″:<ad1_value>, … ,”dio0″:<dio0_value>,”dio1″:<dio1_value>,… }

JSON 形式にすることで、IO サンプリングデータを MQTT メッセージ購読側のアプリで利用しやすくなります。また、XBee デバイスからは複数の IO 値が1つのサンプリングパケットにまとめて送信されますので、MQTT ブローカに送信するときも IO ポート毎にトピックを分解しないほうが元のデータを正しく表現することができると思います。

●MQTT ブローカを準備

センサデータを配信する MQTT ブローカをセットアップします。今回は RaspberryPi にインストールした mosquitto を使用します。

セットアップの方法は此方の記事を参照にしてください。

これ以外にも LAN やインターネット上に設置された MQTT ブローカを使用できます。送信するセンサデータは JSON 文字列で数十バイト程度なので、もし利用する MQTT ブローカでペイロードサイズの制限があったとしても引っかかることはないと思います。

ただ、XBee  IO サンプリングデータの送信頻度を百ミリ秒未満にする場合にはMQTT ブローカ間とのネットワーク遅延などを考慮してください。

●リモート側XBee デバイスを設置

ここではリモート側の XBee デバイスに XBee(Series1) を使用していますが、XBee-ZB(Series2)を使用することもできます。光センサ(フォトダイオード) を XBee デバイスの AD0 に接続しています。

フォトダイオードは 浜松ホトニクス S9648 を 負荷抵抗 10KΩで使用しています。またフィルタ用に 0.1μ コンデンサを並列に接続しています。XBee の Vcc と GND の他に、Vref 端子を忘れないように配線してください(Series1 の場合)。

●DeviceServer 側のエンドポイントを作成

DeviceServer 側に MQTT ブローカとの接続を管理するためのエンドポイントを作成します。今回のアプリケーションでは、エンドポイントを XBee から受信した IO サンプリングデータを MQTT ブローカに送信(Publish) する時のみに使用しています。

DeviceServer のサーバー設定プログラムを起動します。

“次へ” ボタンを押して DeviceServer の設定を行います。

MQTT タブを選択して、MQTT クライアント機能の設定とエンドポイントの作成を行います。ここで “MQTT 機能を有効にする” にチェックを付けます。KeepAliveTimer はデフォルトの 60秒のままにしておいて下さい。ここで設定した間隔で、DeviceServer 側から MQTT ブローカに対して PINGREQ メッセージを定期的に送信します。また、エンドポイントのソケット接続でエラーを検出したときには再接続処理が自動で行われます。

“追加” ボタンを押して、新規のエンドポイントを作成します。

エンドポイントのタイトルを上記のように設定します。Broker のホスト名と TCP/IP ソケット・ポート番号をここで入力します。その他の設定は空白のままで構いません。

このエンドポイントは XBee のIO サンプリングデータを MQTT ブローカに送信するためだけに使用する予定なので、”起動時に購読するトピック” 項目などは空白のままで大丈夫です。

“OK” を押してエンドポイントを作成した後、サーバー設定プログラムの “次へ” ボタンを押すとDeviceServer が再起動します。同時に、エンドポイントに設定した MQTT ブローカへの接続が自動的に開始されます。

●XBee のイベントハンドラスクリプト設定

XBee デバイスから IO サンプリングデータを受信したときに、DeviceServer 側で実行されるイベントハンドラスクリプトを設定します。XBee(Series1) の場合には XBEE_IO_DATA.lua が自動起動される Lua スクリプトです。

このスクリプトを修正して、MQTT ブローカに JSON 形式で IO データを送信するようにします。スクリプトの内容は以下になります。最新の DeviceServer インストールキットでインストールすると、下記の内容が既にコメントで囲まれた状態で記述されていますので簡単に設定できます。

file_id = "XBEE_IO_DATA"

--[[

******************************************************************************
* イベントハンドラスクリプト実行時間について                                 *
******************************************************************************

一つのスクリプトの実行は長くても数秒以内で必ず終了するようにしてください。
処理に時間がかかると、イベント処理の終了を待つサーバー側でタイムアウトが発生します。

また、同時実行可能なスクリプトの数に制限があるため、他のスクリプトの実行開始が
待たされる原因にもなります。

頻繁には発生しないイベントで、処理時間がかかるスクリプトを実行したい場合は
スクリプトを別に作成して、このイベントハンドラ中から script_fork_exec() を使用して
別スレッドで実行することを検討してください。

******************************************************************************

XBEE_IO_DATA スクリプト起動時に渡される追加パラメータ

---------------------------------------------------------------------------------
キー値			値		            									値の例
---------------------------------------------------------------------------------

APIType			フレームデータ中のAPI Type(16進数2桁)					83

SourceAddress	フレームデータ中のSourceAddress
				16bit アドレスの場合(16進数4桁)							0A01
				64bit アドレスの場合(16進数16桁)						0013A200404AC397

SerialNumber	XBee デバイスの SerialNumber
				DeviceServer に保持されたマスターファイルを使用して、
				SourceAddress から変換した値が設定される。				0013A200404AC397

NodeIdentifier	XBee デバイスの NodeIdentifier。
				DeviceServer に保持されたマスターファイルを使用して、
				SourceAddress から変換した値が設定される。				Device1
				マスターにNodeIdentifier未登録の場合は"" が設定される

RSSI			フレームデータ中のRSSI(16進数2桁)						45

Options			フレームデータ中Options(16進数2桁)						00

SAMPLE_COUNT	I/O データのサンプル数									1

SAMPLE_DIO		I/O データ中のサンプル対象となったDIOビット番号リスト
				(10進数、カンマ区切り)									0,1,4

SAMPLE_ADC		I/O データ中のサンプル対象となったADCビット番号リスト
				(10進数、カンマ区切り)									2,3

SAMPLE_<Sample#>_<"DIO"|"ADC">_<Bit#>

				I/O サンプルデータ値。
				DIO の場合は、High で "1"、Low で "0"。					1
				ADC の場合は 10進数。									1023

				<Sample#> には 最大、SAMPLE_COUNT まで 1から順番に
				インクリメントされた値が入る。

				<"DIO"|"ADC">は、I/O サンプルデータが ADC
				もしくは、DIO のどちらであるかを示す。

				<Bit#>は、サンプルデータのビット番号。10進数。

		例:"SAMPLE_1_ADC_0" は、第一サンプルデータ中の #0番ポートのADC変換値を示す。

]]

--[[
log_msg("start..",file_id)
for key,val in pairs(g_params) do
	log_msg(string.format("g_params[%s] = %s",key,val),file_id)
end
]]

--[[
********************************************************************

XBee->MQTT Gateway 機能の実装例

リモート XBee から送信された I/O パケットデータを MQTT に登録します。
トピック名: "/xbee/<NodeIdentifier>/io"
メッセージ文字列:
 {"ad0":<ad0_value>, .. ,"ad1":<ad1_value>, ... ,"dio0":<dio0_value>,"dio1":<dio1_value>,... }

XBee の IO 設定で有効になっている A/D, DIO ポート値のみが上記のフォーマット
で格納されます。

送信先 MQTT ブローカは、サーバー設定プログラムを使用して
エンドポイントを登録して下さい。下記スクリプト中の end_point 変数に
エンドポイントのタイトル文字列または ClientID を設定します。

XBee デバイスの "Sample Before TX" の設定値を1以上にした場合には
最後にサンプリングしたデータのみを MQTT に送信しています

********************************************************************
]]

local end_point = "センサーデータ登録"
local qos = 1
local topic = "/xbee/" .. g_params["NodeIdentifier"] .. "/io"

local ad_list = csv_to_tbl(g_params["SAMPLE_ADC"])
local dio_list = csv_to_tbl(g_params["SAMPLE_DIO"])
local json_str = '{'
local first = true
for i,v in ipairs(ad_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"ad%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_ADC_" .. v])
end
for i,v in ipairs(dio_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"dio%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_DIO_" .. v])
end
json_str = json_str .. '}'
mqtt_publish(end_point,topic,json_str,qos)

上記スクリプト中の “end_point” 変数に設定している名前が、先にサーバー設定プログラムで作成したエンドポイントのタイトル名と一致していることを確認してください。

XBEE_IO_DATA イベントハンドラスクリプトでは、リモートの XBee(Series1) デバイスから送信された IO サンプリングパケットは自動的に解析されて、既にスクリプトパラメータに格納されtた状態で起動されます。

そのため、このスクリプトパラメータを利用してMQTT に送信する JSON 文字列を作成します。XBee デバイスの ADC, DIO ポートが有効になっていると、各々のポート値がスクリプトパラメータに設定されていますので、これを連結して JSON 形式に変換します。変換するときの詳しいフォーマットはスクリプト中のコメントを参照してください。

JSON 形式になったIO サンプリングデータは mqtt_publish() ライブラリ関数を使用して、MQTT ブローカに送信(Publish) します。このとき、トピック名には “/xbee/<NodeIdentifier>/io” を使用することで、購読側でデバイスを選択し易くしています。

続いて、XBee-ZB(Series2) のイベントハンドラスクリプト(ZB_IO_DATA.lua) も同様に設定します。

file_id = "ZB_IO_DATA"

--[[

******************************************************************************
* イベントハンドラスクリプト実行時間について                                 *
******************************************************************************

一つのスクリプトの実行は長くても数秒以内で必ず終了するようにしてください。
処理に時間がかかると、イベント処理の終了を待つサーバー側でタイムアウトが発生します。

また、同時実行可能なスクリプトの数に制限があるため、他のスクリプトの実行開始が
待たされる原因にもなります。

頻繁には発生しないイベントで、処理時間がかかるスクリプトを実行したい場合は
スクリプトを別に作成して、このイベントハンドラ中から script_fork_exec() を使用して
別スレッドで実行することを検討してください。

******************************************************************************

ZB_IO_DATA スクリプト起動時に渡される追加パラメータ

---------------------------------------------------------------------------------
キー値			値		            									値の例
---------------------------------------------------------------------------------
FrameType		フレームデータ中のFrame Type
				(16進数2桁)												92

SourceAddress	フレームデータ中のSourceAddress
				64bit アドレス(16進数16桁)								0013A200404AC397

NetworkAddress	フレームデータ中の SourceNetworkAddress
				16bit アドレス(16進数4桁)								D565

NodeIdentifier	XBee デバイスの NodeIdentifier。
				DeviceServerのマスターファイルを検索して設定される。	Node1
				マスターにNodeIdentifier未登録の場合は"" が設定される

DeviceType		XBee デバイスの Device Type
				DeviceServerのマスターファイルを検索して設定される。	01
				マスターにDeviceType未登録の場合は"" が設定される
				8bit値(16進数2桁)
				00: coordinator
                01: router
                02: end device

DeviceTypeID	XBee デバイスの Device Type Identifier
				DeviceServerのマスターファイルを検索して設定される。
				マスターにDeviceTypeID未登録の場合は"" が設定される
				32bit値(16進数8桁)										00030000

ReceiveOptions	フレームデータ中 ReceiveOptions							01
				8bit値(16進数2桁)

SAMPLE_COUNT	I/O データのサンプル数									1

SAMPLE_DIO		I/O データ中のサンプル対象となったDIOビット番号リスト
				(10進数、カンマ区切り)									0,1,4

SAMPLE_ADC		I/O データ中のサンプル対象となったADCビット番号リスト
				(10進数、カンマ区切り)									2,3

SAMPLE_<Sample#>_<"DIO"|"ADC">_<Bit#>

				I/O サンプルデータ値。
				DIO の場合は、High で "1"、Low で "0"。					1
				ADC の場合は 10進数。									1023

				<Sample#> には 最大、SAMPLE_COUNT まで 1から順番に
				インクリメントされた値が入る。

				<"DIO"|"ADC">は、I/O サンプルデータが ADC
				もしくは、DIO のどちらであるかを示す。

				<Bit#>は、サンプルデータのビット番号。10進数。

		例:"SAMPLE_1_ADC_0" は、第一サンプルデータ中の #0番ポートのADC変換値を示す。

]]

--[[
log_msg("start..",file_id)
for key,val in orderedPairs(g_params) do
	log_msg(string.format("g_params[%s] = %s",key,val),file_id)
end
]]

--[[
********************************************************************

XBee->MQTT Gateway 機能の実装例

リモート XBee から送信された I/O パケットデータを MQTT に登録します。
トピック名: "/zb/<NodeIdentifier>/io"
メッセージ文字列:
 {"ad0":<ad0_value>, .. ,"ad1":<ad1_value>, ... ,"dio0":<dio0_value>,"dio1":<dio1_value>,... }

XBee の IO 設定で有効になっている A/D, DIO ポート値のみが上記のフォーマット
で格納されます。

送信先 MQTT ブローカは、サーバー設定プログラムを使用して
エンドポイントを登録して下さい。下記スクリプト中の end_point 変数に
エンドポイントのタイトル文字列または ClientID を設定します。

********************************************************************
]]

local end_point = "センサーデータ登録"
local qos = 1
local topic = "/zb/" .. g_params["NodeIdentifier"] .. "/io"

local ad_list = csv_to_tbl(g_params["SAMPLE_ADC"])
local dio_list = csv_to_tbl(g_params["SAMPLE_DIO"])
local json_str = '{'
local first = true
for i,v in ipairs(ad_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"ad%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_ADC_" .. v])
end
for i,v in ipairs(dio_list) do
	if first then
		first = false
	else
		json_str = json_str .. ","
	end
	json_str = json_str .. string.format('"dio%s":%s',v,
				g_params["SAMPLE_" .. g_params["SAMPLE_COUNT"] .. "_DIO_" .. v])
end
json_str = json_str .. '}'
mqtt_publish(end_point,topic,json_str,qos)

スクリプトの内容は XBee(Series1) とほぼ同じですが、MQTT ブローカに送信するときのトピック名を “/zb/<NodeIdentifier>/io” に変更しています。

イベントハンドラスクリプトが完成したら、今回の XBee-MQTT Gateway の機能はほぼ実現していることになります。

●リモート側XBee デバイスの IO 設定

次に、光センサを接続したリモート側 XBee デバイスの IO 設定を変更して、IO サンプリングデータが送信されるようにします。

DeviceServer のクライアントを起動してログインします。

ログインしたら、”XBee” ツールボタンを押してPAN 内の XBee デバイス一覧を表示させます。

光センサを接続しているリモート側XBee (Device5) をダブルクリックするか、または選択した後に “設定変更” ツールボタンを押します。現在の XBee 設定値がリモートからロードされた後、詳細設定画面が表示されます。

ここで DIO0 ポートタブを選択して、ADC にチェックをつけます。これで AD0 が有効になりました。次に “Sample Before TX” を 1にして “Sample Rate” を 200ms に設定します。これによって XBee デバイスが 200ms 毎に IO サンプリングを実行して、その結果を DeviceServer に接続した XBee に送信するようになります。

その後 DeviceServer 側では、先ほど設定したイベントハンドラスクリプト中から MQTT ブローカに IO データが JSON 形式で送信されます。

複数の XBee デバイスからの IO サンプリングデータを、同時に MQTT ブローカに送信することもできます。その場合にはそれぞれの XBee デバイスの IO 設定を同様に変更してください。

“OK” を押すと同時に、リモート側 XBee デバイスの IO サンプリングが開始されて MQTT ブローカに送信されるようになります。

●RaspberryPi のコンソール上で、MQTT メッセージを購読して IO データを確認

ここで、MQTT ブローカに送信されている IO サンプリングを見てみましょう。RaspberryPi にログインして MQTT ブローカからメッセージを購読します。

mosquitto_sub コマンドを使用して、MQTT ブローカからメッセージを受信している様子です。XBee デバイスで有効になっている全ての IO 値が JSON 形式で送信されています。光センサに手をかざすと ad0 の値が変化するのを確認できます。

●動作例の動画

XBee-MQTT gateway が動作しているときの動画を載せましたのでご覧ください。(音量注意)

今回は、XBee データを MQTT に変換する例を紹介しましたが、同様のスクリプトを用意することで DeviceServer で取得した様々なセンサデータを MQTT に送信することが可能になります。

また、今回のシステムのセンサデータとは逆方向の Gateway も簡単に作成できます。DeviceServer 側に作成したエンドポイントで MQTT のブローカから特定のトピックを購読して、そのデータを元にリモート側の XBee のポートを変更させることもできます。この時には、MQTT ブローカからトピック・メッセージを受信するときに自動実行される MQTT_PUBLISH イベントハンドラに処理を記述します。

ここで紹介した DeviceServer の機能は、ABS-9000 DeviceServer インストールキットをダウンロードして、直ぐに使用することができます。イベントハンドラで使用したスクリプトファイルも全て格納されていますので是非お試しください。デモライセンスが添付されていますので直ちに使用可能です。こちらから最新のインストールキットやセットアップマニュアル、ユーザーマニュアルをダウンロードできます。

次の記事 “XBee-MQTT Gateway (その2)” では、今回作成したシステムで MQTT ブローカに配信されているセンサデータを WebSocket 経由で受信して、グラフ表示するWebアプリを紹介する予定です。

それではまた。

 

コメントは受け付けていません。