Apache KafkaをMacでコンテナを使わずにPythonで動かしてみる
- 2022.08.24
- IT
kafkaをローカル環境で、Pythonを使って試して見たかったので、その記録をメモします。
ゆくゆくはコンテナ化します。
目次
PythonのKafkaライブラリについて
候補としては2つ見つけました。
2番のライブラリは、最終更新が1年以上前の非アクティブなリポジトリでしたが、導入が簡単そうだったので、今回は2番を使いました。
1番も時間があれば使ってみようとは思います。
ローカル環境について
1 2 3 4 5 6 7 8 9 10 |
[22:43:05(^_^)~] sw_vers ProductName: macOS ProductVersion: 12.5.1 BuildVersion: 21G83 [22:44:01(^_^)~] sysctl machdep.cpu.brand_string machdep.cpu.brand_string: Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz [22:44:04(^_^)~] python -V Python 3.8.9 [22:44:07(^_^)~] pip -V pip 20.2.3 |
環境の構築について
まずは、2つのターミナルを開いて、下記コマンドでZookeeperとKafka Serverを起動します。
/usr/local/bin/zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg
/usr/local/bin/kafka-server-start /usr/local/etc/kafka/server.properties
Pythonのコードについて
サンプルコードをもとに、作っていきます。
送る側のプログラムについて
まずは、送る側のプログラムを作り、実行します。
送る方法は公式ドキュメントを見ると色々あったのですが、簡単な2つを試してみました。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
from kafka import KafkaProducer from kafka.errors import KafkaError import time producer = KafkaProducer(bootstrap_servers='localhost:9092') # 送り方1: time.sleepでSleepをいれないと、うまく送信ができない for i in range(100): send_msg="python"+str(i) producer.send('kafka-test-topic', send_msg.encode()) time.sleep(0.01) # 送り方2 # Threadを使っているようだが、無限ループのように使って良いのか分からない i=0 while True: send_msg="python"+str(i) producer.send('kafka-test-topic', send_msg.encode()) future = producer.send('kafka-test-topic', b'raw_bytes') try: record_metadata = future.get(timeout=10) # print(future) # <kafka.producer.future.FutureRecordMetadata object at 0x10243e190> except KafkaError: # Decide what to do if produce request failed... log.exception() pass time.sleep(1) i+=1 |
ちょっと、中身に深く入らずに書いたのですが、実際の実装を時間がある時にもう少し深堀りしたいなとおもっています。
受け手側のプログラムについて
こちらは簡単です。
下記をテキトウなPythonファイルに書いて実行するだけです。
1 2 3 4 5 6 |
from kafka import KafkaConsumer consumer = KafkaConsumer('kafka-test-topic') for msg in consumer: print (msg) # ConsumerRecord(topic='kafka-test-topic', partition=0, offset=242104, timestamp=1661315921962, timestamp_type=0, key=None, value=b'raw_bytes', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=9, serialized_header_size=-1) print (msg.value) |
以上です。