47,000件のbatch putを16秒で処理
以前に 大量のエンティティを処理するデザインパターン - GeekFactory を紹介しましたが、シングルスレッドのバッチ処理なのでスループットが頭打ちになる問題がありました。コンカレントに処理する方法を思いついたので実装してみました。
シングルスレッドではこんな流れでした。
- S3QueryResultListでn件のエンティティを取得する。
- エンティティをバッチ処理する。
- t秒以内であれば上記を繰り返す。
- 次のタスクにカーソルを渡す。
ここで、エンティティを取得するタスク(Splitter)とエンティティをバッチ処理するタスク(Mapper)を分けてみます。
- Splitterタスク
- S3QueryResultListでn件のエンティティを取得する。
- エンティティをmemcacheに入れて*1、Mapperタスクに渡す。
- t秒以内であれば上記を繰り返す。
- Mapperタスク
- memcacheからエンティティを取得する。消失していたら再クエリを投げる。
- エンティティをバッチ処理する。
- 終わり。
SplitterタスクとMapperタスクは並列に実行されるため、Mapperの内容に関係なく安定したスループットを確保できます。Producer-Consumerですね。
実験
nullプロパティを削除する(missingにする)ため、getしたエンティティをそのままputします。対象は47,110件のエンティティです。
以下の条件を設定しました。
- Splitterタスクは、500件ごとにMapperタスクを生成する。
- Splitterタスクは、10秒ごとに新しいタスクに生まれ変わる。
- Mapperタスクは、Datastore.put()で500件のエンティティを更新する。
結果として、約16秒で処理されました。
管理コンソールを確認したところ、同時に3インスタンスが動いていました。今回は単純なbatch putを試しましたが、もっと複雑な処理を行うとタスクの粒度が大きくなるのでスケールアウトするかもしれません。
11-18 09:12AM 43.078 enqueueFirstMapperTask: Queued the first mapper task
11-18 09:12AM 43.085 enqueueSplitterTask: Queued the splitter task of offset 500
11-18 09:12AM 59.277 runMapper: The mapper task finished. Processed entities: 47110
11-18 09:12AM 59.422 runMapper: The mapper task finished. Processed entities: 47000
ソースコードはこちらからどうぞ。URLが間違っていたので修正しました(11/19 13:17)。
- QueryTask.java(フレームワーク)
- M20101118CountController.java(実験で使ったMapper)
*1:エンティティはSerializableである必要があります。