embulkからApache Kafka にpushしてみた。
といっても、QUICKSTARTだけですが。
基本は上記の通りやりました。
kafka_2.13-3.7.0.tgzをダウンロードしてdocker imageで展開して使ってみました。
root@05493a980797:/work/kafka_2.13-3.7.0# bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092 This is my first event This is my second event
QUICKSTART通り、でした。
ここからが本題です。
embulkを使ってkafkaにメッセージをpushしてみました。
randomjでランダムにデータを1000件登録します。
in: type: randomj rows: 1000 threads: 1 primary_key: myid schema: - {name: myid, type: long} - {name: named, type: string} - {name: named_s, type: string, length: 8} - {name: x_flag, type: boolean} - {name: rate, type: double, max_value: 100, min_value: -100} - {name: score, type: long, max_value: 255, min_value: 100} - {name: time, type: timestamp, format: '%Y-%m-%d %H:%M:%S'} - {name: purchase, type: timestamp, format: '%Y/%m/%d'} out: type: kafka topic: "quickstart-events" serialize_format: json brokers: - "localhost:9092"
結果です。(一部省略)
{"myid":908,"named":"2Flfx52dngeMd8qXWd0HL74frAG8MUf4","named_s":"2UZSKL2H","x_flag":false,"rate":21.18840075995608,"score":220,"time":"2024-07-27T03:23:39Z","purchase":"2024-06-25T22:34:45Z"} {"myid":909,"named":"s6sei5apbrMzUv4OSOzOH2ExFLCp2N7B","named_s":"kEIDAKm9","x_flag":true,"rate":16.56382703365203,"score":129,"time":"2024-07-07T03:50:18Z","purchase":"2024-05-08T23:01:53Z"} {"myid":910,"named":"9Pc4x787a4sWpPGqzV6OQZVSqIvzO0rS","named_s":"KwQ2Eznr","x_flag":true,"rate":-73.35854935057456,"score":172,"time":"2024-07-21T13:35:40Z","purchase":"2024-07-23T19:32:20Z"} {"myid":413,"named":"rKAsCV8eLJfMcZZttVLdoRGCIlxppXJb","named_s":"SaZiEkCc","x_flag":false,"rate":-73.23947830054233,"score":170,"time":"2024-06-01T11:10:07Z","purchase":"2024-05-17T22:16:38Z"} {"myid":414,"named":"XDC42GogdTv6lrNCXgrclcqZROAUQHkU","named_s":"3MnMDUjQ","x_flag":false,"rate":76.38507681698047,"score":232,"time":"2024-05-16T19:21:39Z","purchase":"2024-06-20T08:24:36Z"} ...
一瞬でkafkaに登録できました。 randomjじゃなくてデータベースから取ってくれば簡単にpush通知ができそうな気がしてきました。もちろんkafkaじゃなくてredisのpub/subでもできます。 メリットデメリットでどちらを使うかを決めたら良いと思います
in: type: mysql host: localhost user: user password: password database: sample table: test select: id, name, email, body out: type: kafka topic: "quickstart-events" serialize_format: json brokers: - "localhost:9092"
などSQLをinにすれば、kafkaにpushする実装を自前で作る必要が無くなりそうですね。