10. Lambda関数解説

本ハンズオンで利用したLambda関数の解説をします。

10.1. Lambdaハンドラー、イベントデータ

Kinesis FirehoseでLambda関数を呼び出しますが、そのときに実行されるのがハンドラーです。

event – Lambda はこのパラメーターを使用してイベントデータをハンドラーに渡します。Kinesis Firehoseから受け取ったデータは、event[‘records’]に格納されています。

context – AWS Lambda はこのパラメーターを使用してランタイム情報をハンドラーに提供します。

def lambda_handler(event, context):

10.2. 目的のセンサーからのデータだけを処理する

ゲートウェイは、受信したBeaconをAWS IoTにパブリッシュします。Lambda関数では、登録したセンサーのMACアドレスののみを処理するようにしています。 下記のmy_temphumid_sensor_macで、今回使用する温湿度センサーのMACアドレスを登録し、配列 acceptable_sensorsに含まれているMACアドレスだけを処理します。マグネットセンサー、3軸加速度センサーも登録されていますが、このハンズオンでは使用しません。

my_temphumid_sensor_mac = "FFEAD7E638B8"
my_magnet_sensor_mac    = "C02C61CB558F"
my_3d_accel_sensor_mac  = "D9890D33C5E1"

acceptable_sensors = [my_temphumid_sensor_mac , my_magnet_sensor_mac, my_3d_accel_sensor_mac]

下記の処理で、受信したセンサーデータのMACアドレス(src_mac)が配列 acceptable_sensorsに含まれているが確認しています。

if( src_mac not in acceptable_sensors ):
    print( "skip this sensor data; src_mac[" + src_mac + "]" )
    continue

10.3. CSVデータの取得

Kinesis Firehoseから渡される加工前のデータは、下記のCSV形式のデータです。

$GPRP,C02C61CB558F,AC83F3A041D2,-44,02010612FF590080BC440104FFFFFFFFFFFFFFFFFFFF,1493638145

Kinesis は、データをBase64にエンコードして保持するので、デコードして変数 playload に格納します。

”,” 区切りなので、”,”をでリミッターとして分割し、配列 listに格納します。

payload = base64.b64decode(record['data'])

list = payload.split(",")

10.4. 必要な要素の抜き出し

list[]に格納された各要素を取り出します。特に ble_payload は、Beaconの生データで、更に加工が必要です。

src_mac = list[1]
dst_mac = list[2]
rssi = int(list[3])
ble_payload = list[4]
unixtime = int(list[5])

10.5. タイムスタンプ

ゲートウェイは、AWS IoTにセンサーデータを送る際、Unix time(基準時刻 (1970/01/01(木) 00:00:00 UTC) からの経過秒数)を付加します。Pyhonのモジュールで”2017-05-11T08:34:43”のフォーマットに変換しています。

timestamp = datetime.datetime.fromtimestamp(unixtime)
timestamp = timestamp.strftime("%Y-%m-%dT%H:%M:%S")

10.6. ble_payloadから温度、湿度を抜き出す

温度はble_payloadの24〜27バイト目、湿度はble_payloadの28〜31バイト目に存在します。この部分を抜き出し、エンディアン変換しています。

if( src_mac == my_temphumid_sensor_mac ):
    tempreature = int(ble_payload[24:28], 16)
    tempreature = int(struct.unpack(">H",struct.pack("<H",tempreature))[0])/100.00
    humidity    = int(ble_payload[28:32], 16)
    humidity    = struct.unpack(">H",struct.pack("<H",humidity))[0]

10.7. Python辞書型のオブジェクトに変換

加工したデータを一度、Python辞書型のオブジェクトに変換します。後ほどJSON形式に変換します。

sensor_data_field = {
   '@timestamp':  timestamp,
   'unixtime':   unixtime,
    'src_mac':    src_mac,
    'dst_mac':    dst_mac,
    'rssi':       rssi,
    'ble_payload':ble_payload,
    'vbat':       vbat,
    'model':      ble_payload[10:18],
}

10.8. Kinesis Firehoseへ戻すデータの作成

Kinesis Firehoseにデータを戻す場合、recordIdとresult付加する必要があります。sensor_data_fieldは、Base64で再度エンコードしています。最後にreturn { }で加工したデータをKinesis Firehoseに返しています。

output_record = {
    'recordId': record['recordId'],
    'result': 'Ok',
    'data': base64.b64encode(json.dumps(sensor_data_field))
}
output.append(output_record)
return {'records': output}

10.9. その他

このlambda関数は、レンジャーシステムズの他のセンサー(3軸加速度、マグネットセンサー)にも対応しています。my_magnet_sensor_mac、my_3d_accel_sensor_mac に対象のセンサーを登録すればフォーマット変換します。