エンジニア

Kinesis Video StreamsとRekognitionを使ってその人の名前をラズパイに喋らせてみる

2/7(金) 現在、開発合宿中の中嶋です。

今回、ハンズラボでは2/6-7の1泊2日で開発合宿をおこなっております。

私のチームは、「普段業務で使わないけど興味ある技術を使おう」をテーマに、
Kinesis Video StreamsとAmazon Rekognition、Amazon Pollyを使った、リアルタイム動画から人を認識させ、その人の名前をラズパイで喋らせることをやってみました。

ブログを書くまでが合宿だ!と、うちのチームでは言っていたので、ちゃんと書こうと思います。

前提

うちのチームメンバー全員、MacOS Catalinaで開発をしています

AWS構成図

ざっくりな構成図は以下のとおりです。

事前準備として、認識させたい画像をS3にアップロードし、
それをコレクションに追加します。

その後、Lambdaが読み取りやすいように、FaceIdと名前の辞書をDynamoDBに登録しておきます(ここはいま手動でやっている)

WebカメラからKinesis Video Streamsへ動画をストリーミングする

C++ プロデューサーライブラリの使用 – Amazon Kinesis Video Streams

WebカメラからKinesis Video Streamsへ映像をストリーミングするためには、上記のライブラリを使用します。
(他にはJava、Android版のプロデューサーライブラリがあるようです)

C++のプロデューサーライブラリにある、MacOS用のInstallマニュアルを順にやっていくと、ストリーミングができるようになります。

https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-cpp/blob/master/install-instructions-macos.md

こちらのマニュアルでやったこととしては、

  • ./install-scriptを使用してビルドせず、Homebrewで必要なものをインストールした後、./min-install-scriptでSDKをビルドした(ここの部分)
  • AWS関連と、GST_PLUGIN_PATHLD_LIBRARY_PATHのPATHを通した
  • ここのコマンドを叩いて、ストリーミング開始(今の所 raw video 用のコマンドのみ上手くいっている。 h264 encoded は上手くいかなかった)

またハマったポイントとして、

ここの部分で、各種PATHを通しますが、GST_PLUGIN_PATHをマニュアル通りにやっていくと、後々のテスト用のコマンドでkvssinkが見つからないとエラーが起きます。
(リポジトリの方の例では、AWS access_key、secret_access_keyまで指定していますが、export していれば不要でした)

$ gst-launch-1.0 -v avfvideosrc ! videoconvert ! vtenc_h264_hw allow-frame-reordering=FALSE realtime=TRUE max-keyframe-interval=45 ! kvssink name=sink stream-name="<STREAM NAME>"  osxaudiosrc ! audioconvert ! avenc_aac ! queue ! sink.
WARNING: erroneous pipeline: エレメント "kvssink" がありません

これは、gstreamerのパスの場所では、kvssinkがいなかったので、マニュアルとは別のパスを通す必要がありました。

$ find ./ -name *kvssink*
.//kinesis-video-native-build/CMakeFiles/gstkvssink.dir
.//kinesis-video-native-build/CMakeFiles/gstkvssink.dir/Users/nakajimaatsushi/Projects/amazon-kinesis-video-streams-producer-sdk-cpp/kinesis-video-gstreamer-plugin/src/gstkvssink.cpp.o
.//kinesis-video-native-build/libgstkvssink.so
.//kinesis-video-gstreamer-plugin/src/gstkvssink.cpp
.//kinesis-video-gstreamer-plugin/src/gstkvssink.h
$ export GST_PLUGIN_PATH=<your folder>/amazon-kinesis-video-streams-producer-sdk-cpp/kinesis-video-native-build:$GST_PLUGIN_PATH

これで、再度gst-launch-1.0を叩けば、動画がKinesis Videoへストリーミングされます。

Kinesis Video StreamsからRekognitionに映像渡して、解析データをLambdaで処理する

以下の記事を参考に(というかほぼパクリ)やってみました。

Amazon Kinesis Video Streamsの映像をAmazon Rekognition Videoで解析する #reinvent | Developers.IO

Kinesis Data Streamsまでデータが流れることは確認できたので、
それをトリガーとしたLambdaを作成します。

以下はserverless.ymlの一部抜粋です。

functions:
  kinesis_records_read:
    handler: handler.main
    event:
      - stream:
        type: kinesis
        arn: <Data Stream ARN>
        batchSize: 100
        startingPosition: LATEST
        maximumRetryAttempts: 0
        enabled: false

以下はLambda用のコードです

import base64
import json
import time
import boto3
from datetime import datetime
client = boto3.client('dynamodb')
iot = boto3.client('iot-data')
table_name = <Table Name>
def main(event, context):
    print(datetime.now().isoformat())
    records_count = len(event['Records'])
    print(f'kinesis recourds count: {records_count}')
    # dynamodb読み込み
    res_prossess_current_face_id = client.get_item(
      TableName=table_name,
      Key={
      'FaceId' : {
        'S': 'ProcessCurrentFaceId'
        },
      },
     )
    before_face_id=''
    if ('Item' in res_prossess_current_face_id):
      before_face_id = res_prossess_current_face_id['Item']['UserName']['S']
    for record in event['Records']:
        print(record)
        b64_data = record['kinesis']['data']
        data = base64.b64decode(b64_data).decode('utf-8')
        json_data = json.loads(data)
        face_search_response_list = json_data['FaceSearchResponse']
        if len(face_search_response_list) == 0:
          continue
        matched_faces_list = face_search_response_list[0]['MatchedFaces']
        if len(matched_faces_list) != 1:
          continue
        # 一人しか認識しない仕様
        face_id = matched_faces_list[0]['Face']['FaceId']
        # 前回と同じuser_nameはループを抜ける
        if before_face_id == face_id:
          continue
        before_face_id = face_id
        # Dynamodb書き込みをする
        # DataStreamの量が膨大な為、連続で同じ人の名前を取得してしまう対策
        update_item_res = client.update_item(
          TableName=table_name,
          Key={
            'FaceId': {
              'S': 'ProcessCurrentFaceId'
            },
          },
          UpdateExpression='set UserName = :FaceId',
          ExpressionAttributeValues = {
            ':FaceId': {
              'S': face_id
            },
          },
        )
        print('**************************update_item_res: ', update_item_res)
        # DynamoDBからUserNameを取得
        get_item_res = client.get_item(
          TableName=table_name,
          Key={
            'FaceId': {
              'S': face_id
            },
          },
        )
        if (not ('Item' in get_item_res)):
          print('======get_item_res[Item] not found======')
          continue
        user_name = get_item_res['Item']['UserName']['S']
        payload = {
          'user_name': user_name,
          'face_id': face_id
        }
        try:
          # メッセージをPublish
          iot.publish(
            topic='raspberry_pi/pub',
            qos=0,
            payload=json.dumps(payload, ensure_ascii=False)
          )
          print('user_name: ', user_name, 'face_id: ', face_id)
          return 'Succeeeded.'
        except Exception as e:
          print(e)
          return 'Failed.'

やっていることはシンプルで、
Kinesis Data Streamsから来たデータを受け取り、Recordごとに入っているデータをBase64でデコードしています。

コレクションに追加された顔に一致しているものがあった場合、FaceIdを取得することができるため、このFaceIdが誰であるかを、DynamoDBへ問い合わせしています。

その結果、誰がカメラに映ったのかが判別できるため、その名前をAWS IoTのTopicへPublishします。

AWS IoTのTopicの購読結果を元に、Amazon Pollyに投げて音声データを生成し、ラズパイに喋らせる

こちらは以下の記事を参考に、Topicに対してLambdaが判別した名前をPublishし、それをラズパイが購読できるようになりました。

Raspberry PiでAWS IoT Coreと接続し、GPIO制御をしてみた – Qiita

SensorTag のデータを Amazon Polly で読み上げる – Tech Blog by Akanuma Hiroaki

以下はTopicを購読して受け取った名前をPollyに投げて生成したmp3を再生するコードです。

# coding: UTF-8
import json
import boto3
import os
import time
from boto3 import Session
from boto3 import resource
from contextlib import closing
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
from mutagen.mp3 import MP3 as mp3
import pygame
# AWS認証情報の設定
REGION         = 'ap-northeast-1'
session = Session(aws_access_key_id=os.environ['AWS_ACCESS_KEY_ID'],
                  aws_secret_access_key=os.environ['AWS_SECRET_ACCESS_KEY'],
                  region_name=REGION)
# AWS IoT
CLIENT_ID = "*****" # ここは適当な値でOK
ENDPOINT = "*****" # AWS IoT のカスタムエンドポイント
PORT = 8883
ROOT_CA = "./cert/AmazonRootCA1.pem"
PRIVATE_KEY = "./cert/private.pem.key"
CERTIFICATE = "./cert/certificate.pem.crt.txt"
TOPIC = "raspberry_pi/pub"
# Polly
polly   = session.client('polly')
def synthesize_speech(msg):
    response = polly.synthesize_speech(
        Text=msg,
        OutputFormat='mp3',
        VoiceId='Mizuki'
    )
    return response['AudioStream']
def subscribe_iot_topic():
    # https://s3.amazonaws.com/aws-iot-device-sdk-python-docs/sphinx/html/index.html
    client = AWSIoTMQTTClient(CLIENT_ID)
    client.configureEndpoint(ENDPOINT, PORT)
    client.configureCredentials(ROOT_CA, PRIVATE_KEY, CERTIFICATE)
    client.configureAutoReconnectBackoffTime(1, 32, 20)
    client.configureOfflinePublishQueueing(-1)
    client.configureDrainingFrequency(2)
    client.configureConnectDisconnectTimeout(10)
    client.configureMQTTOperationTimeout(5)
    client.connect()
    client.subscribe(TOPIC, 1, subscribe_callback)
def subscribe_callback(client, userdata, message):
    # message.payloadはbyte
    # message.payload.decode('utf-8')でstr
    print(message.payload.decode('utf-8'))
    payload = json.loads(message.payload.decode('utf-8'))
    face_owner = payload.get('user_name', '')
    face_id = payload.get('face_id', 'polly')
    if face_owner != '':
        print(face_owner)
        speech_file_path = f'speech/{face_id}.mp3'
        if os.path.exists(speech_file_path):
            play_speech(speech_file_path)
        else:
            synthesis_speech_and_play(face_owner, face_id)
def synthesis_speech_and_play(face_owner, face_id):
    script = f'{face_owner}さん、おはようございます。今日も1日張り切っていきましょう。'
    audio_stream = synthesize_speech(script)
    filepath = f"speech/{face_id}.mp3"
    print("synthesied")
    with closing(audio_stream) as stream:
      data = stream.read()
      fw = open(filepath, "wb")
      fw.write(data)
      fw.close()
    play_speech(filepath)
# 音源再生は、↓以下の記事を参考にさせて頂きました
# https://qiita.com/kekeho/items/a0b93695d8a8ac6f1028
def play_speech(speech_file_path):
    if pygame.mixer.get_init() == None:
        pygame.mixer.init()
    if pygame.mixer.get_busy() == False:
        pygame.mixer.music.load(speech_file_path) #音源を読み込み
        mp3_length = mp3(speech_file_path).info.length #音源の長さ取得
        pygame.mixer.music.play(1) #再生開始。1の部分を変えるとn回再生(その場合は次の行の秒数も×nすること)
        time.sleep(mp3_length + 0.25) #再生開始後、音源の長さだけ待つ(0.25待つのは誤差解消)
        pygame.mixer.music.stop() #音源の長さ待ったら再生停止
if __name__ == "__main__":
    subscribe_iot_topic()
    start = time.time()
    while True:
        elapsed_time = time.time() - start
        if round(elapsed_time % 5) == 0:
            print(f"subscribing: {elapsed_time:.1f}秒経過")
            time.sleep(1)
        time.sleep(0.1)

subscribe_iot_topic()の中でclient.subscribeし、その際に受け取ったFaceIdに紐づくmp3データがあれば、そのまま再生。なければ、Pollyに渡してmp3を生成してもらってから再生をします。

ここでのハマった・悩んだポイントとして、

  • AWS IoTで証明書を発行すると、「このモノの証明書」「パブリックキー」「プライベートキー」の3つが生成されてややこしい
  • Pythonで音声を読み上げる方法であるpygameのインストール時での失敗
  • ラズパイ(thing)の方へポリシーをアタッチするフローが参考記事と若干変わっていて、アタッチ忘れをして全くsubscribeができなかった

デモっぽいやつ

ラズパイ側で購読開始した状態をお見せします。

データが来た時だけ、判別した人の名前が表示され、その名前を元に、音声が再生されます。

合宿を終えて(執筆時ではまだ終わってません)

今回の合宿では普段の業務で関わらないメンバーたちと一緒に、まったく使ったことのないサービスを使用して、なんとか形にすることができました。

1泊2日でありましたが、自分たちの触ってみたい技術を、時間の許す限り触りまくるのは刺激的な経験でしたし、自然とモブプロ形式でお互いの得意分野を教え合うのは良い動きが出来たなと感じました。

あとは純粋に普段一緒に働くメンバーの意外な一面を見れるのも、面白かったです。

今回作ったものは人を認識させるだけでしたが、物体も認識させるようにできると、今後応用ができそうだなって密かに思っております。

それでは以上となります。
湯河原のおんやど恵からお送りしました!

参考記事

Kinesis DataStreamをトリガーに起動するLambdaは、「1秒毎に起動するとは限らない」という話 | Developers.IO

Amazon Kinesis Video Streamsの映像をAmazon Rekognition Videoで解析する #reinvent | Developers.IO

Amazon Kinesis Video Streamsを使ってストリーム映像をAWS上に流してみる【MacBook Pro】– ClassmethodサーバーレスAdvent Calendar 2017 #serverless #adventcalendar #reinvent | Developers.IO

https://github.com/awslabs/amazon-kinesis-video-streams-producer-sdk-cpp/blob/master/install-instructions-macos.md

リファレンス: Kinesis 顔認識レコード – Amazon Rekognition

Raspberry PiでAWS IoT Coreと接続し、GPIO制御をしてみた – Qiita

SensorTag のデータを Amazon Polly で読み上げる – Tech Blog by Akanuma Hiroaki

PythonでMP3音源を再生する – Qiita

raspberry pi上のpython3にpygameを入れる際のエラーを解決 – Qiita

一覧に戻る