Amazon Kinesis Data Firehoseに、ついにDynamic Partitioningが実装されました!
Introducing Dynamic Partitioning in Amazon Kinesis Data Firehose
というわけで早速試していこうと思います。
FirehoseにおけるS3転送時の問題点
今回実装されたDynamic Partitioningの話に入る前に、そもそもFirehoseを利用する上で困っていた仕様を簡単におさらいします。
FirehoseではStreamに溜まったデータをS3に転送する際、Firehoseが処理を行った時間で自動的にディレクトリ構造を構成し、その配下にファイルを出力していました。
この仕様だと、S3に保存したデータをAthena等で分析する際、実際のデータ(各レコード内のタイムスタンプなど)と、S3上のディレクトリ構造であるパーティションに差異が生まれるため、それを考慮したクエリを発行するか、別途パーティション内のデータを整頓する処理が必要になっていました。
また、何らかのIDなど、時間以外でのパーティション構成を採ることができない点も問題でした。
そもそもこの仕様があるため、StreamにFirehoseを使わないアーキテクチャを採用するケースもあったように思います。
それが今回のDynamic Partitioningの導入により、FirehoseのStreamに流入したレコードの、任意のデータをパーティションのキーとして設定できるようになりました。
これにより、従来であれば別途パーティションのために行っていた整頓処理などが、Firehose単体で完結できるようになります。
Dynamic Partitioning
パーティションキー設定の方法
現在のところDynamic Partitioningでパーティションキーを指定する方法として、2つの方法が提供されているようです。
1 inline parse
jq を使用してレコードからパーティションキーを抽出します
2 AWS Lambda
既存のデータ変換Lambdaの仕組みを流用します
簡易な要件はinline parseでおおむね対応できそうです。
パーティションキーに複雑な要件が必要であったり、既にデータ変換Lambdaを利用している場合、またinline parseの処理による料金計算(詳細は後述)を嫌う場合などは、Lambdaを用いてパーティションキーを設定する方法が良さそうです。
ではそれぞれのパーティションキー設定を順に試してみます。
手順
今回は以下のデータ構造のサンプルデータを用います(Dynamic Partitioningの公式DocやFirehoseの作成画面で提示されているもの)。
個人的に日時データはISO8601の形式で扱うことが多いので、そのサンプルデータを末尾に付与した版です。
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, "region": "pdx", "createdAt": "2021-09-01T13:24:10Z" }
(キー名に統一感がなくちぐはぐですがご容赦ください😫)
構成はIoT Coreを用いた以下の構成とします。
S3バケット作成
まず任意の名前でバケットを用意します。
以下、作成手順で特に言及のない部分はデフォルト値を使用します。
Name: firehose-dynamic-partition
Firehose作成
次にFirehoseを作成していきます。
Kinesisのコンソールを開き > 配信ストリームを作成
- Source:
Direct PUT
- Destination:
S3
- Delivery stream name: (任意)
- S3 bucket:
firehose-dynamic-partition
(先ほど作成したS3バケット) - Dynamic partitioning:
Enabled
- New line delimiter:
Enabled
- Inline parsing for JSON:
Enabled
- Dynamic partitioning keys を以下の画像のように設定します。
createdAt
の省略されている部分は.createdAt | fromdateiso8601 | strftime("%Y-%m-%d %H:%M:%S")
です
- 入力後、
Add dynamic partitioning key
ボタンを押すと、S3 bucket prefix
の欄に設定したパーティションキーの設定が出力されます。- S3 bucket prefix:
!{partitionKeyFromQuery:customer_id}/!{partitionKeyFromQuery:device}/!{partitionKeyFromQuery:year}/!{partitionKeyFromQuery:month}/!{partitionKeyFromQuery:day}/!{partitionKeyFromQuery:createdAt}/
- S3 bucket prefix:
- New line delimiter:
- Buffer interval:
60
※作成画面の公式の jq
の例だと strftime
はシングルクォーテーションで表記されていましたが、実行時jqのパースエラーになったためダブルクォーテーションにしています。
※createdAt
のパーティションに特に意味はありません。ISO8601形式の例示目的です。
IoT Core rule作成
IoT Coreのコンソールを開き > ACT > ルール > 作成
- Name:
firehose_dynamic_partition
(任意) - ルールクエリステートメント:
SELECT * FROM 'iot/firehose'
- アクションの追加
- Amazon Kinesis Firehose ストリームにメッセージを送信する
- ストリーム名: (先ほど作成したFirehose stream)
- Separator: 区切り文字なし
- ロールの作成:任意の名前をつける
- アクションの追加
- Amazon Kinesis Firehose ストリームにメッセージを送信する
- ルールの作成
動作確認
以上の作業でinline parseの構成はできたので、動作確認をしていきます。
IoT Core > テスト > MQTTテストクライアント
を押下し、テストクライアントページを開き、【トピックに公開する】タブを選択する。
トピック名: iot/firehost
下記のメッセージペイロードを入力し、発行ボタンを押します。
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, "region": "pdx", "createdAt": "2021-09-01T13:24:10Z" }
試しに何度か発行します。
FirehoseのInterval設定を60秒にしたので、約1分後にS3内に指定したパーティション(ディレクトリ構造)でファイルが出力されていることが確認できます。
Lambda版の動的パーティション
続いて、Lambdaでのパーティションキー設定を試していきます。
先程作成したFirehoseを編集します。
Firehose > 作成したStream > Configuration > Transform and convert records > Edit
- Data transformation:
Enabled
- Create function
- General Kinesis Data Firehose Processing
- 関数名:
firehose-dynamic-partition-lambda
(任意) - 基本的な Lambda アクセス権限で新しいロールを作成 を選択して関数の作成
- Create function
Lambda Functionが作成されたら、以下のコードを貼り付けてデプロイします。
console.log('Loading function'); exports.handler = async (event, context) => { /* 各レコードにpartitionKeysのmetadataを設定する */ const records = event.records.map((record) => { const payload = JSON.parse( Buffer.from(record.data, 'base64').toString('utf-8') ); console.log('Decoded payload:', payload); const d = new Date(payload.event_timestamp * 1000); // epoch time(sec)をmsでinitialize const partitionKeys = { customer_id: payload.customer_id, device: payload.type.device, year: d.getUTCFullYear(), month: ('00' + (d.getUTCMonth() + 1)).slice(-2), // getUTCMonthは0-11を返すため day: ('00' + d.getUTCDate()).slice(-2), createdAt: payload.createdAt.replace('T', ' ').replace('Z', ''), // 2021-09-01T13:24:10Z -> 2021-09-01 13:24:10 }; return { recordId: record.recordId, data: record.data, result: 'Ok', metadata: { partitionKeys }, }; }); console.log(`Processing completed. Successful records ${records.length}.`); return { records }; };
パーティションキーの設定がinline parseと同様になるように多少処理を実装していますが、基本的には、StreamのEventとして渡ってきた records
に対して、 metadata
として partitionKeys
の値を追記した records
を作成し、returnしてあげることでLambdaでのパーティションキー設定は完了です。
Firehoseの編集画面に戻り、Browseから先程作成したLambdaを選択します。
その他の設定はデフォルトのまま Save changesを押下します。
Lambdaの変換処理でpartitionKeyを設定するようにしたため、jqを使用したinline parseの設定をオフにします。
Firehose > 作成したStream > Configuration > Destination settings > Edit
Inline parsing for JSON: Disabled
※Dynamic Partitioningを有効にした場合、データ変換用のLambdaを設定していないと、以下のようにinline parseを無効化することができません。
S3 bucket prefixを以下のように、Lambdaで設定したPartition Keyを使用するように修正します。
!{partitionKeyFromLambda:customer_id}/!{partitionKeyFromLambda:device}/!{partitionKeyFromLambda:year}/!{partitionKeyFromLambda:month}/!{partitionKeyFromLambda:day}/!{partitionKeyFromLambda:createdAt}/
修正後、Save changesを押下します。
動作確認で実施した手順と同様に、以下のメッセージペイロードをIoT Coreのテストクライアントから送信し、処理が実行されるまでの時間待機します。
(createdAt
の値のみ最初のデータの1時間後にしています)
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, "region": "pdx", "createdAt": "2021-09-01T14:24:10Z" }
無事指定したパーティションにファイルが出力されました!
料金
Dynamic Partitioningの利用には追加の料金がかかるため注意が必要です。
詳細は末尾の公式pricingを参考にして頂くとして、主に以下3つの合計が料金になります。
公式の試算結果を参照するに、あまり大きな額ではないかと思いますが、利用方法によって価格が変わってきますので、自身のユースケースで試算してみるのがよいかと思います。
パーティションキー設定のために別途利用していた料金と比べて、高額になることはあまりないのかなと思いました。
アーキテクチャもシンプルになるし一石二鳥ですね。
注意点
以下目についた注意事項です。
詳しくは公式Docをご参照ください。
- 動的パーティションを有効化できるのは新規のFirehose streamのみ
- 動的パーティションで作成していれば、後からパーティションキーを変更することは可能
- 動的パーティションで作成したFirehose streamの動的パーティションを無効化することもできない
- 同時に処理可能なパーティションの最大は500件
- それを超えたパーティションキーのデータはエラー行き
所感
IoT Analyticsでは今回Firehoseに追加されたDynamic Partitioningに似たことができていたため、Firehoseでは何故できないのだ……と頭を抱えていたのですが、無事機能がリリースされてとても嬉しいです。
ここ2週間くらいFirehose + S3 + Athenaの構成で悩んでいたため、リリースページを見たときは興奮のあまり失神しそうになりましたが、よく考えると最初から用意しておいてよという気持ちも……。
不良が良いことすると、とてもよく見える理論ですかね。
ともかく、今後Firehoseを用いた構成がシンプルかつ利用しやすくなるのでとても良かったです!