تکمیل Sqoop و شروع کار با nifi

جلسه هشتم: 1400/05/12

چند دستور لینوکسی

-

شکستن اطلاعات  (انتقال اطلاعات از بانک اطلاعاتی به hdfs)

- روال Sqoop در هنگام استفاده از split-by  و مبنا قراردادن یک فیلد برای انتقال اطلاعات به هادوپ، به این گونه است که min و max فیلد مورد نظر را شناسایی و سپس آن را به شکل پیش فرض 4 تکه می کند. حال درصورتیکه پراکندگی رکوردهای اطلاعات در طیف مورد نظر یکنواخت نباشد، یکی از 4 تکه بسیار بزرگ و چه بسا دیگری خیلی کوچک شود.
- با دستور زیر تنها اطلاعات ستونهای خاصی به هادوپ منتقل می شود
sqoop import --connect "jdbc:mysql://192.168.72.10/db1" \
 --username root --password P@33wordMysql --table address2  --target-dir "/user/hadoop/address2_addrcol" --columns addr
نکته: با استفاده از گزینه export در فرمان sqoop می توان اطلاعات را از hdfs با دیتابیس منتقل کرد
- انتقال اطلاعات چندین جدول اطلاعاتی به شکل همزمان به hdfs مقدور است. حال اگر یکی از جداول فاقد کلید باشد sqoop در انتقال با مشکل روبرو شده و عملیات متوقف می شود. با فرمان زیر می توان جداولی را از دستور استثناء کرده و برای انتقال کنار گذاشت.

sqoop import-all-tables --connect "jdbc:mysql://192.168.56.12/sampledb" 
--username root --password P@33wordMysql \
--warehouse-dir "/user/hadoop/address_mul"
--exclude-tables tbl1,tbl2 
- uber mode با مقدار false در sqoop به چه معناست؟ 
همانگونه که همواره به دنبال راهی برای دوری از فایل های کوچک در dfs هستیم، خوب است که مشابه آن فکری برای job های کوچک داشته باشیم. سیستم برای هر جاب فارغ ازاندازه آن باید اقداماتی مانند تخصیص application master و ........ انجام دهد. خلاصه آنکه اجرای job های کوچک سربار دارد. در این شرایط خوب است که job و application master برروی یک container قرار داشته باشند. در این صورت لازم است تا  با تنظیماتی مشخص کنیم  تا درصورتیکه job ی در محدوده ی تعدادی مشخصی تکه باشد، سیستم آنها را در یک container و در کنار application master مربوط به آن اجرا کند. (در فایل mapred-site.xml)
- تا اینجا کل اطلاعات یک جدول به hdfs منتقل شد. در ادامه روشی ارائه می شود تا تنها تغییرات به hdfs منتقل شوند.
روش انتقال تغییرات به hdfs از طریق sqoop
- 2 روش کلی وجود دارد؛ براساس آخرین مقدار (براساس فیلدی عددی یا تاریخی فقط شدنی است)؛ براساس آخرین زمان
- در دستور زیر اعلام شده که آخرین انتقال داده مربوط به num=6 بوده و هرچه بزرگتر از آن بود به dfs منتقل شود
sqoop import --connect "jdbc:mysql://192.168.56.12/sampledb" \
 --username root --password P@33wordMysql --table address2  --target-dir "/user/hadoop/address2" \
 --incremental append --check-column num --last-value 6
- در سناریوی زیر تغییرات براساس ستون تاریخ شناسایی و به hdfs منتقل می شود
ابتدا در بانک جدول ساخته شده و تعدادی رکورد ایجاد می شود
CREATE TABLE contacts2 (
  id INT PRIMARY KEY auto_increment,
  name VARCHAR(30),
  email VARCHAR(30),
  last_update_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP);
insert into contacts2(name,email) values('name1','email1');
insert into contacts2(name,email) values('name2','email2');
insert into contacts2(name,email) values('name3','email3');
اکنون با فرمان زیر اطلاعات جدول به dfs منتقل می شود
sqoop import --connect "jdbc:mysql://192.168.56.12/sampledb" \
 --username root --password P@33wordMysql --table contacts2 --target-dir "/data/lakes/contacts_2" 
سپس تغییر زیر را در جدول انجام می دهیم
update contacts2 set email ='eeeeeeeeeee'  where id=2  ;
commit;
اکنون با اجرای فرمان زیر تنها تغییرات به مقصد منتقل خواهد شد
sqoop import --connect "jdbc:mysql://192.168.56.12/sampledb" \
 --username root --password P@33wordMysql --table contacts2  \
 --target-dir "/data/lakes/contacts_2" \
 --incremental lastmodified --check-column last_update_date --append --last-value "2021-08-08 23:35:00"
 یک دستور جالب: yarn top را در یک session اجرا کنید، به محض اجرای دستوراتی که از yarn استفاده می کنند(مثل sqoop) ، یکسری اطلاعات در مورد منابع مورد استفاده از yarn خودکار نمایش داده می شود.
بعدا دستور sqoop job --create بررسی شود.
sqoop job --create myjob \
--import --connect "jdbc:mysql://192.168.56.12/sampledb" \
 --username root --password P@33wordMysql --table contacts2 --target-dir "/data/lakes/contacts_3" 
- برای بهبود small file  ها می توان از merge-key استفاده کرد
sqoop import --connect "jdbc:mysql://192.168.56.12/sampledb" \
 --username root --password P@33wordMysql --table contacts2 \
 --target-dir "/data/lakes/contacts_2" \
 --incremental lastmodified --merge-key id --check-column last_update_date --last-value "2021-08-29 02:08:51"
- ارسال خروجی Query به hdfs از طریق sqoop ؛ نکته قابل توجه در این دستور آن است که حتما باید پارامتر CONDITIONS$  در انتهای Query آورده شود تا Sqoop بتواند سایر شرط های خود را (مانند last update و ....) به آن اضافه کند

sqoop import --connect "jdbc:mysql://192.168.56.12/sampledb" \
  --query 'SELECT user.*, addr.* FROM user, addr, where user.id = addr.user_id and $CONDITIONS' \
  --username root --password P@33wordMysql --split-by user.id --target-dir /user/hadoop/joinresults
  - تا اینجا append و change ها شناسایی و به hdfs منتقل شده اند. اگر رکوردی در بانک اطلاعاتی حذف فیزیکی شود sqoop  برای شناسایی و انتقال آن راهی ندارد، مگر آنکه حذف فیزیکی نبوده و به طور مثال با یک flag رکورد حذف شده قابل شناسایی باشد؛ در این صورت همانند change تغییرات به hdfs ولی با تگ حذف ارسال می شود.
نحوه نگهداری Data In Columns در معماری توزیع شده:فرض کنید جدول با 3 ستون اطلاعاتی و 3 میلیون رکورد اطلاعاتی در اختیار داریم. هنگام ذخیره سازی براساس data in columns، سیستم براساس row-group کارکرده و ابتدا 3 دسته یک میلیونی از اطلاعات درنظر می گیرد. یک میلیون اول را برروی نود 1 و یک میلیون دوم را ....... حال در هر نود، ابتدا اطلاعات ستون اول و بعد ستون دوم و ........ را نگهداری می نماید.
مقایسه انواع فرمت فایل در هنگام کار با sqoop
- فرمان  sqoop به شکل پیش فرض اطلاعات را با فرمت RAW  به hdfs منتقل می کند. مانند csv، و از dfs  قابل خواندن است. اندازه ی فایل ایجاد شده نسبتا بزرگ بوده و صرفه جویی در فضا ندارد.
- برای فرمت Sequnece به انتهای فرمان  as-sequencefile-- را اضافه کنید. سرعت write به dfs نسبت به سایر فرمتها بالا است. باینری ذخیره می شود و حجم فایل نیز در مقایسه با سایر فرمت ها بالاتر است.
- برای فرمت Parquet به انتهای فرما  as-parquetfile-- را اضافه کنید.سرعت write نسبت به Sequence file پایینتر است (حدود raw). درصورت وجود اطلاعات تکراری حجم داده ی ایجاد شده در hdfs به مراتب کوچکتر است. این نوع column based است. دراین فرمت schema ی فایل ایجاد شده در کنار فایل های ساخته شده در dfs وجود دارد. درصورتیکه با small file در این وضعیت روبرو هستید خوب است تا از   m 1 در دستور sqoop  برای کاهش تعداد فایل استفاده کنید. دراینصورت سرعت نوشتن در dfs هم به مراتب بهبود خواهد یافت.
راه دیگر برای انتقال اطلاعات به dfs استفاده از bulk insert است. این روش به مراتب سرعت بهتری نسبت به روش های قبل خواهد داشت.
nifi چیست؟ ابزار طراحی data flow به صورت گرافیکی است. مستقل از yarn بوده و می توان از منابع اطلاعاتی مختلف اطلاعات را واکشی نماید (مشابه ssis) . هریک از اجزاء برای خواندن اطلاعات در طراحی را یک process می گوییم. برای استفاده از آن کافیست تا فایل nifi-1.9.2-bin.tar.gz را مسیر/opt/ قرارداده و به nifi تغییر نام دهیم. در ادامه با فرمان   opt/nifi/ bin/nifi.sh start/   استارت کرده و از طریق پورت 8080 به آن دست یافت. از process های متداول می توان به موارد زیر اشاره کرد:
- getFile: فایل را از مبدا cut می کند
- putFile: قراردادن فایل در مسیرجدید
- logAttribute: برای لاگ کردن
- getHTTP: برای فراخوانی از وب سرویس
- putParquet: مثلا csv بخونیم و در hdfs با فرمت Parquet بنویسیم
توجه: برای پاک کردن flow در nifi ابتدا باید relation را حذف کرد تا امکان حذف کردن process مهیا شود. اگر process در حال اجرا باشد حذف نمی شود و ابتدا باید استاپ شود.
- getHDFS: برای فراخوانی از هادوپ
- RouteOnAttribute: فایل را خوانده و براساس فرمت کارهای مختلف بعد از آن اجرا می کنیم