Kinesis FirehoseがS3に出力したファイルを1レコードずつ読み込む
{"ticker_symbol":"TGH","sector":"FINANCIAL","change":-0.6,"price":65.43}{"ticker_symbol":"JYB","sector":"HEALTHCARE","change":-2.27,"price":42.95}{"ticker_symbol":"WAS","sector":"RETAIL","change":0.53,"price":13.04}{"ticker_symbol":"NGC","sector":"HEALTHCARE","change":0.32,"price":5.41}
こういったJsonストリームデータをPythonで処理する時ってどうしたらいいんだろうと調べていたらjsonライブラリにJSONDecoderというのがあるのを見つけました。
下記サンプルです。
import json import boto3 s3 = boto3.resource('s3') bucket = s3.Bucket('backet name') obj = bucket.Object('key name') obj = obj.get() json_stream = obj['Body'].read() decoder = json.JSONDecoder() while len(json_stream) > 0: record, index = decoder.raw_decode(json_stream) json_stream = json_stream[index:] print record
これを実行すると下記のように出力されました〜。
{"ticker_symbol":"TGH","sector":"FINANCIAL","change":-0.6,"price":65.43} {"ticker_symbol":"JYB","sector":"HEALTHCARE","change":-2.27,"price":42.95} {"ticker_symbol":"WAS","sector":"RETAIL","change":0.53,"price":13.04} {"ticker_symbol":"NGC","sector":"HEALTHCARE","change":0.32,"price":5.41}
そうです、この記事・・・単にストリームデータの読み込み方についての記事です!KinesisFirehoseは完全に引きです!
でもストリームデータを扱うことは個人的には今後も増えそうなので今回調べておいてよかったです!
ではでは〜。