エンジニア

2017.03.02

Kinesis FirehoseがS3に出力したファイルを1レコードずつ読み込む

Kinesis FirehoseがS3に出力したファイルを1レコードずつ読み込む


皆々様、お久しぶりのブログ更新です。
Kinesis FirehoseがS3に出力したファイルをPythonでモニャモニャしたいと思っていたところ以下のことでつまづいてしまいました。
1つのファイルに複数レコード出力されている場合、改行(区切り文字)がなく1レコードずつ読み込めない!
改行とか入っていて、1行ずつ読み込むんだろう・・・みたいな想像を勝手にしていたのでちょっとつまずいてしまいました。
AWSのデモデータをKinesisFirehoseに流すと実際のファイルはの中身は下記のようになります。

{"ticker_symbol":"TGH","sector":"FINANCIAL","change":-0.6,"price":65.43}{"ticker_symbol":"JYB","sector":"HEALTHCARE","change":-2.27,"price":42.95}{"ticker_symbol":"WAS","sector":"RETAIL","change":0.53,"price":13.04}{"ticker_symbol":"NGC","sector":"HEALTHCARE","change":0.32,"price":5.41}

こういったJsonストリームデータをPythonで処理する時ってどうしたらいいんだろうと調べていたらjsonライブラリにJSONDecoderというのがあるのを見つけました。
下記サンプルです。

import json
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket('backet name')
obj = bucket.Object('key name')
obj = obj.get()
json_stream = obj['Body'].read()
decoder = json.JSONDecoder()
while len(json_stream) > 0:
    record, index = decoder.raw_decode(json_stream)
    json_stream = json_stream[index:]
    print record

これを実行すると下記のように出力されました〜。

{"ticker_symbol":"TGH","sector":"FINANCIAL","change":-0.6,"price":65.43}
{"ticker_symbol":"JYB","sector":"HEALTHCARE","change":-2.27,"price":42.95}
{"ticker_symbol":"WAS","sector":"RETAIL","change":0.53,"price":13.04}
{"ticker_symbol":"NGC","sector":"HEALTHCARE","change":0.32,"price":5.41}

そうです、この記事・・・単にストリームデータの読み込み方についての記事です!KinesisFirehoseは完全に引きです!
でもストリームデータを扱うことは個人的には今後も増えそうなので今回調べておいてよかったです!
ではでは〜。

一覧に戻る