تفکر MapReduce، کمی کار با Spark

جلسه ششم: 1400/05/05

چند دستور لینوکسی:
- با دستور History، تاریخچه اجرای دستورات در سیستم عامل نمایش داده می شود.
Application Master  
درخواست اجرای Job برای Resource Manager (اینجا YARN) ارسال می شود.  Resource Manager به ازای هر Job یک Application Master ریجیستر می کند. این  Application Master  روی یکی از DN ها تشکیل شده و وظیفه گزارش گیری از Container ها و اعلام وضعیت به Resource Manager را برعهده دارد. در خاتمه کار جاب، Application Master از روی Resource Manager ؛ unregister می شود.
تفکر MapReduce با ذکر مثال
انتخابات ریاست جمهوری در 30 استان برگزاری می شود؛ در این انتخابات 4 کاندید حضور دارند. در تفکر MR، ابتدا شمارش آرا برای هریک از کاندیداهاء در هریک از 30 استان انجام می شود. حال 4 استان که هریک از آنها برای یک کاندید درنظر گرفته می شوند وظیفه جمعبندی مجموع آراء یک کاندید را داشته باشند. به این ترتیب هریک از 30 استان نتایج جمعبندی شده ی کاندید شماره 1 را برای استان A ارسال می کنند و الی آخر. در این روش استانهای A، B، C و D به ترتیب نتایج 4 کاندیدا را دراختیار دارند. در این مثال 4 استان منتخب برای جمع آوری اطلاعات معادل Container ها هستند. 
Shuffle : به معنی ارسال کلیدهای یکسان به Reducer های یکسان است. در جمعبندی ها Aggrigation Functions از Shuffle زیاد استفاده می شود. Shuffle بدلیل آنکه نیاز است تا نتایج بین Container ها جابجا شوند هزینه Network دارد. 
علت کندی Hadoop MapReduce: یکی از دلایل کندی آن است که نتایج اولیه ابتدا به صورت محلی برروی HDFS نوشته می شود و پس از اتمام کار محلی از طریق Shuffle به Container مقصد ارسال می کند. راه حل هایی مانند  Spark MapReduce نوشتن برروی هارد را حذف کرده اند و درنتیجه سرعت بالاتر رفته است.
راه اندازی Spark:
-برروی node-master در مسیر opt/ قرار گرفته و فایل Spark ی که از قبل در این مسیر کپی کرده بودیم را باز می کنیم.
tar -xzf spark-2.4.0-bin-hadoop2.7.tgz
- در مسیر conf از spark نام فایل spark-defaults.conf.template را به spark-defaults.conf تغییر نام دهید.
- به جهت آنکه از yarn برای مدیریت منابع استفاده می کنیم لازم است تا در انتهای فایل کانفیگ بالا دستور زیر را وارد کنید. سایر دستورات را نیز وارد کنید.
spark.master yarn
spark.executer.memory 1g
spark.driver.memory 1g
- در مسیر conf از spark نام فایل spark-env.sh.template را به spark-env.shتغییر نام دهید و مقادیر زیر را در آن تنظیم کنید
/opt/hadoop/etc/hadoop
اجرای Python:
- در لینوکس کافی است تا دستور python را در خط فرمان اجرا کنید
-تاقبل از اینکه home  را برای پایتوناسپارک تعریف کنیم از دستور زیر برای نوشتن و اجرای کدهای پایتون در اسپارک استفاده خواهیم کرد. قبل از اجرای این دستور باید yarn و hdfs را استارت کرده باشید.
/opt/spark/bin/pyspark
- اجرای چند دستور در pyspark
-دستور زیر دیتای موجود در فایل های cvs را در متغییر data ریخته و پس از آن نمایش می دهد
data_csv = spark.read.format("csv").load("/data/retail/*.csv")
data_csv.show()
- دستور زیر سطر اول را به عنوان header معرفی می کند
data_csv = spark.read.format("csv").option("header","True").load("/data/retail/*.csv")
- دستور زیر خروجی را در فایلی قرار می دهد. توجه: parquet یکی از فرمت های پراستفاده ی column based در hdfs است
data_csv.write.format("json").save("/data/result/retail_json")
data_csv.write.format("parquet").save("/data/result/retail_parquet")
-دستور زیر دیتای موجود در فایل به فرمت parquet را در متغییر data میریزد و در سطر بعد هم 2 رکورد از آن را نمایش می دهد
data_parquet= spark.read.format("parquet").load("/data/result/retail_parquet")
data_parquet.show(2)
-اضافه کردن شرط و شمارش؛ در این وضعیت به غیر از Map عملیات Reduce هم فعال می شود. 
data_parquet.where("stockcode='85123A'").count()
data_json.where("stockcode='85123A'").count()
-نکته قابل تامل آنست که سرعت عملیات count در حالتیکه از parquet استفاده شد به طرز محسوسی از csv بهتر بود.
-حال نوبت به این رسیده که اجرای فرمان بالا را با استفاده از select امتحان کنیم
data_parquet.createOrReplaceTempView("retail_par")
data_csv.createOrReplaceTempView("retail_csv")
spark.sql(" select count(*) from retail_par where stockcode='85123A'").show()
- برای جلوگیری از تعداد فایل کوچک زیاد میتوان از عبارت repartition در هنگام نوشتن بهره گرفت
data_csv.repartition(1).write.format("parquet").save("/data/result/retail_parquet")
نظرات 0 + ارسال نظر
امکان ثبت نظر جدید برای این مطلب وجود ندارد.