مفاهیم، نصب و راه اندازی kafka

جلسه دهم: 1400/05/19

چنداصطلاح در kafka:
- broker: سروری که kafka برروی آن نصب است
- producer:  کسی که پیام را تولید و به kafka ارسال می کند
- consumer: کسی که پیام ها را از kafka دریافت و مصرف می کند
روش انجام کار: زمانی که producer پیام را ارسال می کند، broker آن را در فضای logical بنام topic قرار می دهد (مثل table در یک rdbms).
نقش پارتیشن: kafka به هر producer پارتیشن خاص خود اختصاص می دهد (افزایش کارایی)
 نصب kafka 
-kafka دارای دو ویرایش یکی از apache و دیگری از confluent است. در اینجا به دلیل کامل تر بودن ویرایش confluent فایل confluent-5.4.0-2.12.tar.gz  را دانلود و پس از کپی در مسیر /opt/ با فرمان tar  -xzf باز کنید. 
-در ادامه مسیر را به kafka تغییر نام داده و در کنار آن فولدری باعنوان kafka_data نیز ایجاد کنید
- در مسیر /opt/kafka/etc/kafka/ فایل server.properties  را بازکرده و پارامترهای زیر را تنظیم کنید
zookeeper.connect=node-master:2181,node1:2181,node2:2181/kafka_znode
log.dirs=/opt/kafka-data
-برای اجرای kafka فرمان زیر را اجرا کنید؛ درصورت قراردادن پارامتر daemon- در دستور زیر، kafka در backgroud اجرا خواهد شد.
 /opt/kafka/bin/kafka-server-start /opt/kafka/etc/kafka/server.properties
نکته: درصورتیکه قصد حذف node در zookeeper را دارید از دستور زیر استفاده کنید، مسیر را به همراه زیر مجموعه های آن  حذف می کند
/opt/zookeeper/bin/zkCli.sh rmr /nodepath
- در ادامه مطابق فرمان زیر یک topic با عنوان person ایجاد کنید؛ 
/opt/kafka/bin/kafka-topics --create --zookeeper node-master:2181,node1:2181,node2:2181/kafka_znode --replication-factor 1 --partitions 5 --topic person
- حال درصورتیکه در zookeeper درخت partitions مربوط به topic person را ببنیم، چنین خروجی خواهد داشت:
*دستور زیر در zookeeper اجرا می شود
 /kafka_znode/brokers/topics/person/partitions
* خروجی
[0, 1, 2, 3, 4]
*با مراجعه به مسیر /opt/kafka_data/ نیز مشاهده می شود که فولدرهای زیر توسط kafka  ایجاد شده است:
person-0
person-1
person-2
person-3
person-4
- امکان مشاهده topic ها از طریق kafka به ترتیب زیر مقدور است:
/opt/kafka/bin/kafka-topics --list --zookeeper node-master:2181,node1:2181,node2:2181/kafka_znode
- حذف یک topic ها از طریق فرمان زیر انجام می شود.
/opt/kafka/bin/kafka-topics --delete --topic person --zookeeper node-master:2181,node1:2181,node2:2181/kafka_znode 

- ایجاد پیام 
/opt/kafka/bin/kafka-console-producer --broker-list node-master:9092 --topic person
-دریافت پیام
/opt/kafka/bin/kafka-console-consumer --bootstrap-server node-master:9092  --topic person --from-beginning
- ایجاد پیام (با cat)؛ ابتدا فایل device.json را در مسیر /home/hadoop/ قرار میدهیم.
cat Device.json | /opt/kafka/bin/kafka-console-producer --broker-list node-master:9092 --topic device_status
 *نکته : درصورتیکه topic وجود نداشته باشد kafka برای آن مسیر را خواهد ساخت (اینجا device_status)
توزیع کردن kafka
- اکنون فولدر kafka را برروی node1 و node2 کپی کنید
- در مسیر /opt/ از 2 نود بالا  فولدر data_kafka را بسازید
- اکنون فایل /opt/kafka/etc/kafka/server.properties از هریک از نودها را باز کرده و پارامتر broker.id=1 و broker.id=2 قرار می دهیم.
- حال با فرمان زیر kafka را برروی نودهای 1 و 2 استارت کنید
 /opt/kafka/bin/kafka-server-start /opt/kafka/etc/kafka/server.properties
برخی نکات در حال توزیع شده kafka:
- اکنون 3 سرورkafka داریم؛  با اجرای دستور زیر برروی هر یک از سرورها 3 partition ساخته شده و داده ها با produce در آنها نگهداری می شود. با توجه به آنکه RF=1 است، از هریک از partition ها فقط یکی خواهیم داشت.
/opt/kafka/bin/kafka-topics --create --zookeeper node-master:2181,node1:2181,node2:2181/kafka_znode \
--replication-factor 1 --partitions 9 --topic orders
- بنابراین برای topic orders پارتیشن های زیر را خواهیم داشت:
- orders-0 ؛ orders-3 ؛ orders-6
- orders-1 ؛ orders-4 ؛ orders-7
- orders-2 ؛ orders-5 ؛ orders-8
-  با اجرای دستور زیر برروی هر یک از سرورها 6 partition ساخته می شود.
/opt/kafka/bin/kafka-topics --create --zookeeper node-master:2181,node1:2181,node2:2181/kafka_znode \
--replication-factor 2 --partitions 9 --topic clicks
- با اجرای فرمان زیر
/opt/kafka/bin/kafka-topics --describe --zookeeper node-master:2181,node1:2181,node2:2181/kafka_znode \
--topic clicks
* خروجی به شکل زیر است. و مشخص می کند که leader هر پارتیشن برروی کدام سرور است.
Topic: clicks   PartitionCount: 9       ReplicationFactor: 2    Configs:
        Topic: clicks   Partition: 0    Leader: 2       Replicas: 2,0   Isr: 2,0        Offline:
        Topic: clicks   Partition: 1    Leader: 0       Replicas: 0,1   Isr: 0,1        Offline:
        Topic: clicks   Partition: 2    Leader: 1       Replicas: 1,2   Isr: 1,2        Offline:
        Topic: clicks   Partition: 3    Leader: 2       Replicas: 2,1   Isr: 2,1        Offline:
        Topic: clicks   Partition: 4    Leader: 0       Replicas: 0,2   Isr: 0,2        Offline:
        Topic: clicks   Partition: 5    Leader: 1       Replicas: 1,0   Isr: 1,0        Offline:
        Topic: clicks   Partition: 6    Leader: 2       Replicas: 2,0   Isr: 2,0        Offline:
        Topic: clicks   Partition: 7    Leader: 0       Replicas: 0,1   Isr: 0,1        Offline:
        Topic: clicks   Partition: 8    Leader: 1       Replicas: 1,2   Isr: 1,2        Offline:
- با اجرای فرمان زیر مشخص می شود که تاکدام پیام (offset) توسط consumer ها خوانده شده است. (خطای Consumer group 'app1' does not exist. گرفتم)
/opt/kafkac/bin/kafka-consumer-groups --bootstrap-server node-master:9092,node1:9092,node2:9092  --group app1 --describe
- امکان consume کردن از partition مشخص و offset مورد نظر هم وجود دارد
/opt/kafka/bin/kafka-console-consumer --bootstrap-server node-master:9092,node1:9092,node2:9092  --topic orders --partition 5 --offset 1
- درصورتیکه برای ثبت اطلاعات کلید درنظر گرفته شود، زمانیکه kafka  یک کلید را در پارتیشنی قرارداد، اگر برای بار بعد همان کلید دریافت شود، داده جدید در همان پارتیشنی که قبلا به آن تخصیص یافته قرار می گیرد.
kafka-console-producer --topic example-topic --broker-list broker:9092\
  --property parse.key=true\
  --property key.separator=":"

نظرات 0 + ارسال نظر
امکان ثبت نظر جدید برای این مطلب وجود ندارد.