ادامه Nifi، توزیع شوندگی zookeeper و شروع کار با Kafka

جلسه نهم: 1400/05/17

چند دستور لینوکسی

- با دستور tail سیستم انتهای فایل را نمایش داده و افزودن پارامتر f  به معنی follow بوده و درصورتیکه به فایل مذکور اضافه شد، آخرین مقادیر انتهایی فایل را نیز نمایش خواهد داد. برای آزمایش 
tail -f /opt/nifi/logs/nifi-app.log
- دستور ll فهرستی از فایل های موجود شامل زیر شاخه ها را نمایش می دهد.
اجرای یک سناریوی دیگر در Nifi و چند نکته
- امکان تبدیل مستقیم از فرمت csv یا json به parquet نیست. بلکه باید ابتدا به فرمت avro  تبدیل و پس از آن از avro به parquet تبدیل کرد. با استفاده از پراسس های زیر:
- ConvertCSVToAvro
- ConvertJSONToAvro
- ConvertAvroToParquet
دستور ConvertJsonToSQL هم دستور insert می سازد.
اگر قصد رفع و رجوع کردن موقتی failure ها را داریم می توانیم از پراسس PutMail استفاده کنیم.
برای جداسازی فایلها براساس فرمتشان؛ مثلا csv از json از پراسس RouteOnAttribute استفاده کنید. به آن property مانند is_json اضافه نموده و در بخش properties مقدار زیر را وارد نمایید، برای فرمت csv هم به ترتیب مشابه انجام شود.
${filename:endsWith(".json")}
نکته: هنگام تبدیل json به avro باید مشابه زیر schema ی برای داده ها تعیین کرد
{
    "type" : "record",
    "name" : "flight",
    "doc" : "sample avro for flight",
    "fields" : [{"name" : "ORIGIN_COUNTRY_NAME", 
                "type" : "string"},
                {"name" : "DEST_COUNTRY_NAME", 
                "type" : "int"},
                {"name" : "United States", 
                "type" : "string"}]
}
- نکته: در تنظیم پراسس PutHDFS باید در عنصر اطلاعاتی HadoopConfigurationResource، آدرس فایل های core-site.xml و hdfs-site.xml ارائه شوند. درصورتیکه hadoop برروی سرور دیگری نصب است لازم است که تا یک نسخه از این فایل ها برروی سرور nifi کپی شده و مسیر آن ارائه شود.
- نکته: فایل flow ساخته شده در nifi در مسیر nifi/conf با پسوند xml ذخیره می شود.
راه اندازی Kafka
- برای شروع ابتدا باید ZooKeeper را راه اندازی نماییم؛ چه برای Kafka single node یا توزیع شده. Zookeeper در راه اندازی Kafka نقش Coordinator دارد. 
- Quorum یا اکثریت آراء برای 5 نود میشه 3؛ برای 3 نود میشه 2 ، برای 6 نود میشه 4، ..... . برای 3 نود حدنصاب 2 به این معناست که حداکثر درصورت ازدست رفتن 1 نود تحمل دارد. 
- برای cluster کردن ZooKeeper ابتدا محتویات درون فولدر data را تخلیه، سپس کل فولدر ZooKeeper را برروی تمامی نودهای که برای cluster کردن انتخاب نموده ایم کپی کنید. 
-اکنون فایل zoo.cfg از مسیر /opt/zookeeper/conf/ را ویرایش کنید.
*پارامتر tickTime مقیاس سایر مقادیر را مشخص می کند. واحد آن میلی ثانیه است. مثلا اگر مقدار آن 2000 باشد (معادل 2 ثانیه) به معنی آن است که سایر پارامترها باید در ضریب 2 ضرب شوند. 
سایر مقادیر را به طور مثال با مقادیر زیر تنظیم کنید. 
tickTime=1000
initLimit=20
syncLimit=20
dataDir=/opt/zookeeper/data/
clientPort=2181
server.1=node-master:2888:3888
server.2=node1:2888:3888
server.3=node2:2888:3888
- به محض اینکه نود leader حضور نداشته باشد، اولین نود شروع به رای گیری نموده و در نهایت leader  جدید انتخاب می شود این معماری masterless بوده و از سرور از ابتدا مشخص نشده است. از پورت 2888 برای ارتباط بین سرورها استفاده می شود و از پورت 3888 برای انتخاب leader استفاده می شود (!). 
- فایل zoo.cfg  جدید را باید برروی همه سرورها کپس کرد.
- تااینجا سرورها از id خود بیخبر هستند؛ به منظور این آگاهی فایلی با نام myid در فولدر data ی zookeeper ایجاد کرده و صرفا یک عدد مثل 1، 2 و .... در آن ثبت و ذخیره نمایید.
- اکنون نوبت استارت کردن zookeeper است که به ترتیب زیر انجام می شود. zookeeper باید برروی همه سرورها اجرا شود.
/opt/zookeeper/bin/zkServer.sh  start
- برای مشاهده وضعیت جاری هر نود و اینکه کدام leader و کدامیک follower است باید از پارامتر status در فرمان بالا استفاده کنید.
- نام سرویس zookeeper در سیستم عامل QuroumPeerMain است که می توان با استفاده از دستور jps  آن را دید.
- برای تست معماری برپا شده، به node1 رفته و فرمان زیر را اجرا کنید.
/opt/zookeeper/bin/zkCli.sh  -server node2:2181
- با اجرای فرمان بالا وارد محیط zookeeper شده و می توان از دستورات zookeeper استفاده کرد. zookeeper ساختاری درختواره دارد. 
با دستور / ls می توان ریشه ای این درخت با عنوان zookeeper را مشاهده کرد. برای ساخت زیرمجموعه کافیست از فرمان زیر استفاده کرد. در دستور زیر مقدار test نیز در این znode قرارداده شده است.
create /ha_hadoop test
create /ha_hadoop/yarn_ha test
create /ha_hadoop/hdfs_ha test
create /kafka test
kafka چیست؟ یک پلتفرم stream storage یا message log storage است. از ویژگی مهم لاگ به توان به موارد 1-نگهداری موقت  2-تولید با نرخ بالا 3- append only (منظور آن است که هر عملیاتی مثل insert،  update و delete به شکل یک event در انتهای لاگ اضافه خواهد شد)  اشاره کرد. خلاصه اینکه تغییرات داده ها در kafka قرار می گیرد و بعد با استفاده از ابزار پردازشی کارهای بعدی مثل پردازش یا ذخیره سازی (مثلا در HDFS) انجام می شود. نکته قابل توجه درمورد سازوکار log آنست که می توان داده های مربوط به مقطع زمانی مشخصی به بعد را دریافت کرد. 
در این موضوع کسی که message را آماده می کند producer و آنکه آن را مورد استفاده قرار می دهد consumer است. 

نظرات 0 + ارسال نظر
امکان ثبت نظر جدید برای این مطلب وجود ندارد.