KCLで理解するマルチストリーム処理の仕組み
はじめに
前回の記事では、KCL(Kinesis Client
Library)v3系を使ったマルチストリーム処理の実装について紹介しました。
しかし実装メインの内容だったので、「そもそもKCLって何?」「マルチストリーム処理って何がいいの?」といった基本的な部分は駆け足で進んでしまいました。
今回はその辺りをじっくり解説していきます。
Kinesis Data Streamsとは
KCLを理解する前に、まずはKinesis Data Streamsの基本をおさえておきましょう。
Kinesis Data Streamsは、大量のデータをリアルタイムで処理するためのAWSのマネージドサービスです。データはストリーム
という単位で管理され、その中をレコードが流れていきます。
Kinesis Data Streamsの基本的な構成は、Producer(データ生成側)、Stream(データ保存)、Consumer(データ処理側)
の3つの要素で成り立っています。
Kinesis Data Streamsの基本用語
用語 | 説明 | 役割 |
---|---|---|
Producer | データを生成してStreamに送信する側 | Webアプリ、モバイルアプリ、IoTデバイスなど |
Stream | データが流れる論理的な容器 | 複数のシャードで構成される |
Consumer | Streamからデータを取得して処理する側 | 分析アプリ、保存処理、アラート処理など |
シャード | Streamの物理的な処理単位 | 読み書き性能の上限あり |
パーティションキー | データをシャードに振り分けるキー | 同一キーは同じシャードに配置される |
データレコード | Streamに流れる個々のデータ | シーケンス番号、パーティションキー、データBLOBで構成 |
シャードとパーティションキーの仕組み
Kinesis Data Streamsでは、データはシャードという単位で管理されます。各シャードには処理能力の上限があります。
項目 | 上限値 | 備考 |
---|---|---|
読み取り | 5トランザクション/秒、2MB/秒 | Consumer側の制限 |
書き込み | 1000レコード/秒、1MB/秒 | Producer側の制限 |
データを送信する際はパーティションキーを指定し、同じパーティションキーを持つレコードは必ず同じシャードに振り分けられます。これにより、順序性が保たれます。
KCLとは
Kinesisストリーム処理で必要なこと
まず、KCLの価値を理解するために、Kinesisストリーム処理で何をやらないといけないのかを整理します。
Kinesisストリームからデータを継続的に処理するには、以下のことを実装する必要があります。
必要な処理 | 詳細 | なぜ必要? |
---|---|---|
データ取得 | 各シャードからレコードを継続的に取得 | ストリーミングデータの基本 |
処理位置の記録 | どこまで処理したかを永続化 | 障害時の重複・欠損防止 |
シャード変化への対応 | 分割・結合されたシャードの検知・対応 | ストリームのスケーリング対応 |
複数インスタンス間の調整 | どのインスタンスがどのシャードを処理するか | 負荷分散・可用性確保 |
障害処理 | プロセス停止時の処理引き継ぎ | サービス継続性確保 |
KCLを使わない場合の課題
「Kinesisって生のAPIでも使えるのでは?KCLなんて必要なの?」
と思うかもしれませんが、上記の処理を全て生のKinesis APIで実装するのは結構大変です。
// 生のKinesis APIを使った場合(大変な実装例)
val kinesisClient = KinesisClient.builder().build()
// 1. ストリーム情報を取得してシャード一覧を取得
val streamDescription = kinesisClient.describeStream(
DescribeStreamRequest.builder().streamName("user-events").build()
)
streamDescription.streamDescription().shards().forEach { shard ->
// 2. 各シャードのイテレーターを取得
val shardIterator = kinesisClient.getShardIterator(
GetShardIteratorRequest.builder()
.streamName("user-events")
.shardId(shard.shardId())
.shardIteratorType(ShardIteratorType.LATEST)
.build()
).shardIterator()
// 3. 継続的にレコードを取得・処理
var currentIterator = shardIterator
while (currentIterator != null) {
val response = kinesisClient.getRecords(
GetRecordsRequest.builder()
.shardIterator(currentIterator)
.build()
)
// 4. レコード処理
response.records().forEach { record ->
try {
processRecord(record)
// 5. 処理位置を手動で記録(DynamoDBなどに)
saveCheckpoint(shard.shardId(), record.sequenceNumber())
} catch (e: Exception) {
// エラーハンドリングも自前実装
handleProcessingError(record, e)
}
}
currentIterator = response.nextShardIterator()
// 6. シャードの分割・結合をチェック
if (isShardClosed(shard)) {
handleShardClosure(shard)
}
// 7. 他のインスタンスとの協調処理
if (shouldGiveUpShard(shard)) {
transferShardToOtherInstance(shard)
}
Thread.sleep(1000) // ポーリング間隔
}
}
見ての通り、生のAPIだと実装すべきことが山盛りです。主な課題は以下の通りです。
課題 | 詳細 | 具体的な実装が必要な内容 | 影響 |
---|---|---|---|
チェックポイント管理 | どこまで処理したかを手動で記録・管理 | DynamoDBテーブル設計、シーケンス番号保存、障害時復旧ロジック | 障害時のデータ重複・欠損リスク |
シャード管理 | シャードの分割・結合への動的対応 | シャード監視、親子関係追跡、新シャードへの切り替え | スケーラビリティの問題 |
負荷分散 | 複数コンシューマー間でのワーク分散 | リース管理、インスタンス間協調、シャード再配布 | リソース効率の悪化 |
障害処理 | リトライ・エラーハンドリング・フェイルオーバー | エラー分類、リトライ戦略、デッドレター処理 | 可用性の問題 |
順序保証 | シャード内・シャード間での処理順序管理 | 親シャード完了待ち、処理順序制御 | データ整合性の問題 |
KCLを使うことで得られる恩恵
KCLは、これらの面倒な部分を全て引き受けてくれ、開発者がビジネスロジックだけに集中できるようにしてくれます。
// KCLを使った場合
class MyRecordProcessor : ShardRecordProcessor {
override fun processRecords(processRecordsInput: ProcessRecordsInput) {
// ビジネスロジックだけに集中できる
processRecordsInput.records().forEach { record ->
when (record.eventType) {
"USER_LOGIN" -> handleUserLogin(record)
"PURCHASE" -> processPurchase(record)
"PAGE_VIEW" -> trackPageView(record)
else -> logger.warn("Unknown event: ${record.eventType}")
}
}
// チェックポイントも1行で完了
processRecordsInput.checkpointer().checkpoint()
}
}
KCLが自動でやってくれること
機能 | KCLの恩恵 | 開発者のメリット |
---|---|---|
チェックポイント管理 | DynamoDBに自動保存・復旧 | 障害回復を気にしなくてOK |
シャード監視 | 分割・結合を自動検知・対応 | スケーリングが自動 |
負荷分散 | 複数ワーカー間で自動分散 | リソース効率が最適化 |
順序保証 | 親シャード完了後に子シャード処理 | データ整合性が保証 |
障害回復 | 自動リトライ・フェイルオーバー | 高可用性を実現 |
KCLの重要な用語
KCLを理解するには、これらの用語を知っておく必要があります。図と併せて確認してください。
KCLの主要コンポーネント
用語 | 説明 | 役割 |
---|---|---|
Worker | KCLアプリケーションの実行単位 | 通常1プロセス=1Worker、複数のRecordProcessorを管理 |
Scheduler | 各Workerが持つ制御クラス | シャード割り当て・データ配信を制御 |
Record Processor | ビジネスロジック実装の場所 | シャードごとに1つ、実際の処理を担当 |
KCLの管理情報
用語 | 説明 | 保存場所 | 役割 |
---|---|---|---|
Lease | Workerとシャードの割り当て情報 | DynamoDB | 各シャードは1つのWorkerにのみバインド |
Checkpoint | 最後に処理したレコード位置 | DynamoDB | 障害時の復旧ポイント |
リーステーブル | Lease・Checkpoint情報の保存先 | DynamoDB | 分散処理の状態管理 |
マルチストリーム処理について
ここまでで、KCLがいかに便利で強力なライブラリかを理解していただけたと思います。しかし、KCLの真価はここからです。
KCLでは、v2.3以降でマルチストリーム処理がサポートされ、v3系でさらに性能と安定性が向上しました。従来の「1つのアプリケーションで1つのストリーム」という制約を超えて、
1つのアプリケーションで複数のストリームを同時に処理することができるようになりました。これがマルチストリーム処理です。
なぜこの機能が重要なのか、従来の方式と比較しながら見ていきましょう。
従来の単一ストリーム処理の限界
従来、多くのシステムでは「1アプリケーション = 1ストリーム」で設計されていました。
[ユーザーイベント] → [Kinesis Stream A] → [Application A]
[システムログ] → [Kinesis Stream B] → [Application B]
[メトリクス] → [Kinesis Stream C] → [Application C]
しかし、これには以下のような問題があります。
問題 | 詳細 | 影響 |
---|---|---|
リソースの無駄 | 各アプリが独立してリソースを消費 | コスト増、効率悪化 |
運用コスト増 | デプロイ・監視対象が多数に | 運用負荷の増大 |
データ統合困難 | 関連データの処理が分散 | 相関分析が困難 |
マルチストリーム処理のメリット
KCLでは、v2.3以降でマルチストリーム処理がサポートされています。v3系では、この機能がさらに安定し、1つのアプリケーションで複数のストリームを同時に処理できます。
マルチストリーム処理の恩恵
メリット | 詳細 | 効果 |
---|---|---|
リソース効率化 | 1つのアプリで複数ストリームを処理 | インフラコスト削減 |
データ相関分析 | 関連データをリアルタイムで突合 | 高度な分析が可能 |
運用コスト削減 | 管理対象の統合 | 運用負荷軽減 |
処理統合 | 複数ストリームの組み合わせ処理 | 新しい価値創造 |
例えば、ユーザーのログインイベント(ユーザーイベントStream)とエラーログ(システムログStream)を組み合わせて、「ログイン後にエラーが多発しているユーザー」をリアルタイムで検知するといったことが可能になります。
まとめ
今回は、KCLの基本概念とマルチストリーム処理の仕組みについて詳しく解説しました。
本記事のポイント
Kinesis Data Streamsの基本
- Producer、Stream、Consumerの3つの要素で構成
- シャードとパーティションキーによる分散処理
- 順序性とスケーラビリティを両立
KCLの価値
- 生のAPIでは複雑なチェックポイント管理やシャード制御が必要
- KCLがこれらの面倒な処理を全て自動化
- 開発者はビジネスロジックに集中できる
マルチストリーム処理の恩恵
- 従来の「1アプリ=1ストリーム」の制約を解放
- リソース効率化と運用コスト削減を実現
- 複数ストリームのデータ相関分析が可能
KCLを活用する際の次のステップ
この理論的な理解を踏まえて、実際のプロジェクトでKCLを活用する際は以下を検討してみてください。
- 要件整理:どのようなデータストリームを処理する必要があるか
- アーキテクチャ設計:マルチストリーム処理が有効かどうか
- 監視・運用設計:DynamoDBのリーステーブルやメトリクス監視
KCL v3系とマルチストリーム処理は、リアルタイムデータ処理の可能性を大きく広げてくれる強力な技術です。ぜひ、皆さんのプロジェクトでも活用してみてください。
Discussion