こんにちは。AWS CLI が好きな福島です。
はじめに
CloudWatch Logs から Data Firehose 経由で S3 にログを保存する際に、 効率的に動的パーティショニングを行う方法を考えたため、ご紹介いたします。
今回は、EC2 の OS ログを出力する想定の内容になっておりますが、 それ以外のログでも同一構成であれば、参考になるかと存じます。
※ EC2 の OS ログを出力する想定でブログを記載しておりますが、CloudWatch Agent の設定については記載しません。
- はじめに
- 前提
- 参考:S3に保存されるファイルの中身
- 実装する動的パーティショニングについて
- ポイント
- 設定の流れ
- ①S3 作成
- ②Data Firehose 作成
- ③CloudWatch Logs 用の IAM ロール作成
- ④CloudWatch Logs のロググループの作成
- ⑤CloudWatch Logs のサブスクリプションフィルターの作成
- ⑥動作確認
- 補足:Athenaから確認
- 終わりに
前提
まず、CloudWatch Logs → Data Firehose → S3 の構成において、 特に考慮しなかった場合、CloudWatch Logs の全てのログストリームが S3 の同一のパーティションに保存され、 場合によっては、1ファイルに複数のログストリームの内容が保存されてしまいます。
これではログを確認したい際に、適切にパーティション分けされていないため、確認しづらい構成になってしまいます。 また、Athena で S3 のログを確認する際に、不要なコスト(ログの読み取り)が発生するかつログの確認に時間がかかってしまいます。
そのため今回は Data Firehose の動的パーティショニングにより以下のように設定します。詳細は後述します。
参考:S3に保存されるファイルの中身
参考ですが、特に考慮しない場合のS3に保存されるファイルの中身は以下の通りです。 1つのファイルに複数のログストリームの内容が保存されていることが分かります。
{ "messageType": "DATA_MESSAGE", "owner": "111111111111", "logGroup": "test-log-group", "logStream": "messages_i-056b95c8c44904fa1", "subscriptionFilters": [ "test" ], "logEvents": [ { "id": "38841131237568735052148959136084518525948404427449499648", "timestamp": 1741696561787, "message": "Mar 11 12:36:01 ip-10-88-0-25 systemd[1]: Starting refresh-policy-routes@ens5.service - Refresh policy routes for ens5..." }, { "id": "38841131237568735052148959136084518525948404427449499649", "timestamp": 1741696561787, "message": "Mar 11 12:36:01 ip-10-88-0-25 ec2net[1888]: Starting configuration refresh for ens5" }, ] } { "messageType": "DATA_MESSAGE", "owner": "111111111111", "logGroup": "test-log-group", "logStream": "messages_i-0c888976f25e2eac9", "subscriptionFilters": [ "test" ], "logEvents": [ { "id": "38841131558298052497416381175042897869571333098702241792", "timestamp": 1741696576169, "message": "Mar 11 12:36:15 ip-10-88-0-118 systemd[1]: Starting refresh-policy-routes@ens5.service - Refresh policy routes for ens5..." }, { "id": "38841131558298052497416381175042897869571333098702241793", "timestamp": 1741696576169, "message": "Mar 11 12:36:15 ip-10-88-0-118 ec2net[2621]: Starting configuration refresh for ens5" } ] }
実装する動的パーティショニングについて
今回、実装する動的パーティショニングは、以下の通りです。
s3://test-os-logs/【アカウントID】/【ログファイル】/YYYY/MM/DD/HH/【インスタンスID】
- 第1階層:アカウントID(マルチアカウント構成を考慮)
- 第2階層:ログファイル(messages, secure など)
- 第3-6階層:日付
- 第7階層:インスタンスID
上位階層ほどデータセットを大きく区分できる項目(アカウントID、ログファイル)を配置し、 下位に向かって徐々に細かい粒度(時間、インスタンスID)となるように考えてみましたが、 階層は実際の運用を考慮して設計すると良いかと思います。
ポイント
- S3のパスを動的にパーティションするために、Data Firehose の動的パーティショニングの機能を活用する
- Firehose で利用できるカスタムプレフィックスは以下の通り
- firehose
- timestamp
- partitionKeyFromQuery
- partitionKeyFromLambda
- 詳細:Amazon S3 オブジェクトのカスタムプレフィックスを理解する - Amazon Data Firehose
- 動的パーティショニングの有効化/無効化は、Data Firehose 作成時にのみ可能
- CloudWatch Logs から Data Firehose へ転送されるログは、base64 でエンコードされ、gzip 形式で圧縮される
サブスクリプションフィルターを介してサービスに送信されるログは、base64 でエンコードされ、gzip 形式で圧縮されます。
- ロググループレベルのサブスクリプションフィルター - Amazon CloudWatch Logs
- 上記仕様になっているため、動的パーティショニング機能を使うにあたり、Data Firehose の「Amazon CloudWatch Logs からソースレコードを解凍する」をオンにする必要がある
- オンにしないと、
Only UTF-8 encoded data is supported for dynamic partitioning records.
のエラーが出ます
- オンにしないと、
- 動的パーティショニングを使うにあたり、データレコードを解凍するため、Data Firehose で再度データレコードの圧縮を行い、S3 に保存する容量を抑える
- CloudWatch Logs のロググループごとにサブスクリプションフィルターの設定が必要なため、ロググループは1つだけ作成する
- ログストリーム名を
ログファイル_インスタンスID
のように設定し、_
を区切り文字として、ログファイルとインスタンスIDを取得し、パーティションに利用する- ログストリーム名は、EC2 に導入する CloudWatch Agent の設定である程度自由に変更できるため、好みの設定をするのもありです
設定の流れ
- ①S3 作成
- ②Data Firehose 作成
- ③CloudWatch Logs のロググループの作成
- ④CloudWatch Logs のサブスクリプションフィルターの作成
- ⑤動作確認
①S3 作成
S3を作成します。バケットポリシーなどの設定は特に不要です。
aws s3api create-bucket \ --bucket central-logs-$(aws sts get-caller-identity --query Account --output text) \ --create-bucket-configuration LocationConstraint=ap-northeast-1
上記コマンドで作成した S3 のバケット名は、「central-logs-アカウントID」になります。
②Data Firehose 作成
- Data Firehose のページに移動し、「Firehose ストリームを作成」を押下
- ソース を「Direct Put」、送信先に「Amazon S3」を指定
- Firehose ストリーム名を「test-data-firehose」(任意)に設定
- 「解凍をオンにする」をチェック
※ 「レコード形式を転換」機能を活用することで、JSON よりも効率的にクエリが可能なApache Parquet または Apache ORC 形式に変換も可能です
- 「S3 バケット」に作成したバケットの指定および「動的パーティショニング」の「有効」をチェック
- 「JSONのインライン解析」の「有効」をチェック
- 「動的パーティショングキー」には以下を設定
キー名 | JQ式 |
---|---|
owner | .owner |
logType | .logStream | split("_") | .[0] |
instanceId | .logStream | split("_") | .[1] |
設定したJQ式は、送られてくるログに対して実行されますが、 CloudWatch Logsの場合、以下のようなログ形式になっています。
ポイントにも記載した通り、logStream は EC2 の CloudWatch Agent の設定である程度制御できますが、
今回は、ログファイル_インスタンスID
のように設定し、_
を区切り文字として、ログファイルとインスタンスIDを取得します
{ "messageType": "DATA_MESSAGE", "owner": "111111111111", ★ "logGroup": "test-log-group", "logStream": "messages_i-0c888976f25e2eac9", ★ "subscriptionFilters": [ "test" ], "logEvents": [ { "id": "38841131558298052497416381175042897869571333098702241792", "timestamp": 1741696576169, "message": "Mar 11 12:36:15 ip-10-88-0-118 systemd[1]: Starting refresh-policy-routes@ens5.service - Refresh policy routes for ens5..." }, { "id": "38841131558298052497416381175042897869571333098702241793", "timestamp": 1741696576169, "message": "Mar 11 12:36:15 ip-10-88-0-118 ec2net[2621]: Starting configuration refresh for ens5" } ] }
- 以下の設定を行う
項目 | 設定値 |
---|---|
S3 バケットプレフィックス | !{partitionKeyFromQuery:owner}/!{partitionKeyFromQuery:logType}/!{timestamp:yyyy/MM/dd/HH}/!{partitionKeyFromQuery:instanceId}/ |
S3 バケットエラー出力プレフィックス | errors |
S3 バケットと S3 エラー出力プレフィックスタイムゾーン | Asia/Tokyo |
- 「データレコードの圧縮」の「GZIP」をチェックおよび「ファイル拡張子フォーマット」に「.json.gz」を指定
③CloudWatch Logs 用の IAM ロール作成
- IAM のページへ移動し、「ロールを作成」を押下
- カスタム信頼ポリシーを指定し、以下のポリシーを設定
{ "Version": "2012-10-17", "Statement": { "Effect": "Allow", "Principal": { "Service": "logs.ap-northeast-1.amazonaws.com" }, "Action": "sts:AssumeRole" } }
- ポリシーは付与せず、「次へ」を押下
- IAM ロール名に「logs-destination-role」(任意)を指定し、「ロールを作成」を押下
- 作成したロールの「許可を追加」>「インラインポリシーを作成」を押下
- 以下のポリシーを設定
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "firehose:ListDeliveryStreams", "Resource": "*" }, { "Effect": "Allow", "Action": [ "firehose:DescribeDeliveryStream", "firehose:PutRecord", "firehose:PutRecordBatch" ], "Resource": "arn:aws:firehose:ap-northeast-1:【アカウントID】:deliverystream/test-data-firehose" } ] }
- ポリシー名に「logs-destination-policy」(任意)を指定し、「ポリシーの作成」を押下
④CloudWatch Logs のロググループの作成
今回はテスト用のロググループを作成しますが、既に転送したいロググループが存在する場合、本手順はスキップします。
- CloudWatch Logs のページへ遷移し、「ロググループを作成」を押下
- ロググループ名に「test-log-group」(任意)を指定し、「作成」を押下
⑤CloudWatch Logs のサブスクリプションフィルターの作成
- 対象のロググループの「サブスクリプションフィルター」タブの「作成」を押下
- 「現在のアカウント」を指定し、「Amazon Data Firehose ストリーム」に作成した Data Firehose を設定
- 「アクセス許可を付与する」で「logs-destination-role」を設定
- サブスクリプションフィルター名に「test-subscription-filter」(任意)を設定
- 「ストリーミングを開始」を押下
- サブスクリプションフィルターの設定完了!
⑥動作確認
- ログストリームの作成およびテストイベントの出力
for log_type in "messages" "secure" do for instance_id in "i-aaaa" "i-bbbb" do aws logs create-log-stream \ --log-group-name test-log-group \ --log-stream-name ${log_type}_${instance_id} aws logs put-log-events \ --log-group-name test-log-group \ --log-stream-name ${log_type}_${instance_id}\ --log-events timestamp=$(date +%s)000,message="Test Log ${log_type}_${instance_id}" done done
上記を実行すると、以下の4つのログストリームが作成されます。
また、それぞれのログストリームに以下のようなテストメッセージを出力しています。
以下のコマンドでS3に出力されたオブジェクトのパスを確認し、想定通りにパーティションが切られていることを確認します。
aws s3 ls central-logs-$(aws sts get-caller-identity --query Account --output text) --recursive
- 想定パーティション
s3://【S3バケット名】/【アカウントID】/【ログファイル】/YYYY/MM/DD/HH/【インスタンスID】
実行結果例)
正しく設定できていれば、以下のように表示されるはずです!
$ aws s3 ls central-logs-$(aws sts get-caller-identity --query Account --output text) --recursive 2025-03-12 08:32:36 341 222222222222/messages/2025/03/12/08/i-aaaa/test-data-firehose-2-1-2025-03-12-08-25-02-12d105b9-ff7c-312c-8a41-82897da5e8fd.json.gz 2025-03-12 08:32:36 311 222222222222/secure/2025/03/12/08/i-aaaa/test-data-firehose-2-1-2025-03-12-08-25-59-8ac222aa-4403-3c1c-ba7f-862ae161f107.json.gz $
補足:Athenaから確認
以下のような SQL でテーブルを作成できます。 実際には、分析内容によって最適なテーブルの作成を検討しましょう。
CREATE EXTERNAL TABLE `dynamic_log`( `messagetype` string COMMENT 'from deserializer', `owner` string COMMENT 'from deserializer', `loggroup` string COMMENT 'from deserializer', `logstream` string COMMENT 'from deserializer', `subscriptionfilters` array<string> COMMENT 'from deserializer', `logevents` array<struct<id:string,timestamp:bigint,message:string>> COMMENT 'from deserializer') ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='logEvents,logGroup,logStream,messageType,owner,subscriptionFilters') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://central-logs-【アカウントID】/【アカウントID】/messages'
テーブルの中身を確認します。
SELECT * FROM "oslog"."dynamic_log" limit 10;
終わりに
今回は、CloudWatch Logs のログを Data Firehose の動的パーティショニングを活用し、適切に S3 に保存する方法をご紹介しました。 どなたかのお役に立てれば幸いです。