10. Lambda関数解説¶
本ハンズオンで利用したLambda関数の解説をします。
10.1. Lambdaハンドラー、イベントデータ¶
Kinesis FirehoseでLambda関数を呼び出しますが、そのときに実行されるのがハンドラーです。
event – Lambda はこのパラメーターを使用してイベントデータをハンドラーに渡します。Kinesis Firehoseから受け取ったデータは、event[‘records’]に格納されています。
context – AWS Lambda はこのパラメーターを使用してランタイム情報をハンドラーに提供します。
def lambda_handler(event, context):
10.2. 目的のセンサーからのデータだけを処理する¶
ゲートウェイは、受信したBeaconをAWS IoTにパブリッシュします。Lambda関数では、登録したセンサーのMACアドレスののみを処理するようにしています。 下記のmy_temphumid_sensor_macで、今回使用する温湿度センサーのMACアドレスを登録し、配列 acceptable_sensorsに含まれているMACアドレスだけを処理します。マグネットセンサー、3軸加速度センサーも登録されていますが、このハンズオンでは使用しません。
my_temphumid_sensor_mac = "FFEAD7E638B8"
my_magnet_sensor_mac = "C02C61CB558F"
my_3d_accel_sensor_mac = "D9890D33C5E1"
acceptable_sensors = [my_temphumid_sensor_mac , my_magnet_sensor_mac, my_3d_accel_sensor_mac]
下記の処理で、受信したセンサーデータのMACアドレス(src_mac)が配列 acceptable_sensorsに含まれているが確認しています。
if( src_mac not in acceptable_sensors ):
print( "skip this sensor data; src_mac[" + src_mac + "]" )
continue
10.3. CSVデータの取得¶
Kinesis Firehoseから渡される加工前のデータは、下記のCSV形式のデータです。
$GPRP,C02C61CB558F,AC83F3A041D2,-44,02010612FF590080BC440104FFFFFFFFFFFFFFFFFFFF,1493638145
Kinesis は、データをBase64にエンコードして保持するので、デコードして変数 playload に格納します。
”,” 区切りなので、”,”をでリミッターとして分割し、配列 listに格納します。
payload = base64.b64decode(record['data'])
list = payload.split(",")
10.4. 必要な要素の抜き出し¶
list[]に格納された各要素を取り出します。特に ble_payload は、Beaconの生データで、更に加工が必要です。
src_mac = list[1]
dst_mac = list[2]
rssi = int(list[3])
ble_payload = list[4]
unixtime = int(list[5])
10.5. タイムスタンプ¶
ゲートウェイは、AWS IoTにセンサーデータを送る際、Unix time(基準時刻 (1970/01/01(木) 00:00:00 UTC) からの経過秒数)を付加します。Pyhonのモジュールで”2017-05-11T08:34:43”のフォーマットに変換しています。
timestamp = datetime.datetime.fromtimestamp(unixtime)
timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%S")
10.6. ble_payloadから温度、湿度を抜き出す¶
温度はble_payloadの24〜27バイト目、湿度はble_payloadの28〜31バイト目に存在します。この部分を抜き出し、エンディアン変換しています。
if( src_mac == my_temphumid_sensor_mac ):
tempreature = int(ble_payload[24:28], 16)
tempreature = int(struct.unpack(">H",struct.pack("<H",tempreature))[0])/100.00
humidity = int(ble_payload[28:32], 16)
humidity = struct.unpack(">H",struct.pack("<H",humidity))[0]
10.7. Python辞書型のオブジェクトに変換¶
加工したデータを一度、Python辞書型のオブジェクトに変換します。後ほどJSON形式に変換します。
sensor_data_field = {
'@timestamp': timestamp,
'unixtime': unixtime,
'src_mac': src_mac,
'dst_mac': dst_mac,
'rssi': rssi,
'ble_payload':ble_payload,
'vbat': vbat,
'model': ble_payload[10:18],
}
10.8. Kinesis Firehoseへ戻すデータの作成¶
Kinesis Firehoseにデータを戻す場合、recordIdとresult付加する必要があります。sensor_data_fieldは、Base64で再度エンコードしています。最後にreturn { }で加工したデータをKinesis Firehoseに返しています。
output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': base64.b64encode(json.dumps(sensor_data_field))
}
output.append(output_record)
return {'records': output}
10.9. その他¶
このlambda関数は、レンジャーシステムズの他のセンサー(3軸加速度、マグネットセンサー)にも対応しています。my_magnet_sensor_mac、my_3d_accel_sensor_mac に対象のセンサーを登録すればフォーマット変換します。