วันพฤหัสบดีที่ 20 ตุลาคม พ.ศ. 2559

Apache Spark Streaming Realtime Solr Indexer

ผลการค้นหารูปภาพสำหรับ solr

     ในบทความนนี้จะนำเสนอการทำ Spark Streaming Real-time Solr Indexer ก่อนอื่นเลยต้องเกริ่นกันก่อนว่า Apache Solr ไว้ใช้ทำ Real-time Search คล้ายๆกับ Google ขอไม่ลงรายละเอียดนะครับ โดยจะ Stream ข้อมูลผ่านทาง Apache Kafka ซึ่งเป็น Messaging Queue เหมาะกับการทำงานเป็น Parallel โดยเขียนตัวอย่างเล็กๆได้ด้านล่างครับ

import com.lucidworks.spark.util.SolrSupport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocument

import kafka.producer._
import kafka.serializer.StringDecoder

import org.apache.spark.SparkContext
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object SparkStreamingRealtimeSolrIndexer {
  def main(args: Array[String]) {

    val zkQuorum = "myzookeeper:2181/solr"

    val brokers_list = "kafka01:9092,kafka02:9092,kafka03:9092"
    val input_topic = "solr_test"

    val solrCollection = "mycollection"
    val SCconf = new SparkConf().setAppName("SparkStreamingRealtimeSolrIndexer")
      .setMaster("local")
    val sc = new SparkContext(SCconf)
    val ssc = new StreamingContext(sc, Seconds(10))

    val kafkaParams = Map("metadata.broker.list" -> brokers_list)
    val topics = Set(input_topic)

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    val lines = stream.map(_._2).map(_.split(","))

    val docs = lines.map { doc =>
      val docx = SolrSupport.autoMapToSolrInputDoc(doc(0), doc, null)
      docx.setField("field_1", doc(1))
      docx.setField("field_2", doc(2))
      docx
    }
    SolrSupport.indexDStreamOfDocs(zkQuorum, solrCollection, 10, docs);

    ssc.start()
    ssc.awaitTermination()
  }
}


Apache Spark Transform Sas7BDat to Parquet

     พอดีเนื่องจากได้มีโอกาส ได้ลองแปลง Sas7BDat file เป็น parquet file บน Hadoop ผ่านทาง Apache Spark เลยเอามาให้ดูกันครับ เขาว่าเล็กลง 14 เท่า


Ref : http://blog.sasanalysis.com/2015/07/transform-sas-files-to-parquet-through.html

จาก Source ต้นทางใช้ Python แปลงข้อมูล แต่ผมขอให้ Scala ก็แล้วกันครับ

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

import com.github.saurfang.sas.spark._

object SparkSas7BDat {
  def main(args: Array[String]) {

    val sasfile = "worldcts.sas7bdat"
 
    val sparkConf = new SparkConf().setAppName("SparkSas7BDat")
    .setMaster("local")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

    val sas_data = sqlContext.read.format("com.github.saurfang.sas.spark").load(sasfile).cache()
    sas_data.write.parquet("test_parquet")

  }
}

จากการทดสอบ ไฟล์เล็กลงไป 4.75 เท่า
ต้นฉบับขนาด 7.33 MB
ผลลัพธ์ 1.54 MB

Apache Spark Map vs FlatMap

     สำหรับ Apache Spark นั้นมี Action ที่ใช้บ่อยๆอยู่หลายตัว เช่น Map และ FlatMap ในบทความนี้จะอธิบายความแตกต่างของทั้งสองตัว

ผลการค้นหารูปภาพสำหรับ spark map vs flatmap     Map Function นั้นจะ return rdd ใหม่ ที่ถูกนำไปเข้า function ที่เราเขียนขึ้น
เช่น  หากมี  rdd ของ string
val newRDD = myRDD.map{ x => x.split(" ")}
โดย rdd ใหม่จะ return RDD[Array[String]] โดยจำนวน element เท่าเดิม

     FlatMap Function นั้นจะเหมือน Map แต่ จะ return rdd ใหม่ที่มีการเรียงลำดับ ของ single item
เช่น newRDD = myRDD.flatMap{ x => x.split(" ")}
โดย rdd ใหม่จะ return RDD[String] สังเกตได้ว่า rdd ใหม่จะมีการเปลี่ยนแปลงจำนวน element ใน rdd

วันอังคารที่ 18 ตุลาคม พ.ศ. 2559

Apache Spark : First Application

     สำหรับการเขียน Spark Application นั้นเราจะเริ่ม App แรก ด้วย WordCount เนื่องจากเป็นการแสดงถึงการใช้ กระบวณการ MapReduce แบบง่ายๆ และไม่ยากต่อการเข้าใจ

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object WordCount{
  def main(args: Array[String]) {
    val sparkConf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local")

    val sc = new SparkContext(sparkConf)
    val myRDD = sc.textFile("input.txt")
    val wordcount = myRDD.flatMap { x => x.split(" ") }
      .map { x => (x, 1) }
      .reduceByKey(_ + _)
    wordcount.foreach(println)
  }
}

ผลลัพธ์ :

Apache Spark คืออะไร


     Apache Spark คือ Processing Engine ชนิดหนึ่งที่ใช้ในการประมวลผลขนาดใหญ่ๆ โดยจะเน้นการประมวลผลแบบ in-memory ทำให้มีประสิทธิภาพที่รวดเร็ว

     ใน Apache Spark จะมี Library ให้เลือกใช้ตามแต่ชนิดของงาน มีดังนี้
     Spark SQL ใช้สำหรับแปลง dataset ให้อยู่ในรูปแบบของ dataframe ทำให้สามารถใช้ SQL command ในการประมวลผลข้อมูลได้
     Spark Streaming ใช้สำหรับการประมวลผลแบบ Real-Time
     MLlib ใช้สำหรับการประมวลผล Machine Learning
     GraphX ใช้สำหรับการประมวลผล Graph

และยังสามารถ Run งานบน Hadoop, Mesos, Standalone และ On Cloud

     ภาษาที่รองรับ
     Scala(แนะนำ)
     Java
     Python(แนะนำ)
     R

     Resilient Distributed Dataset (RDD) คือ Dataset ที่กระจายจายข้อมูลอยู่บนหลายๆ Node ของ Cluster ซึ่งมีความสามารถที่จะประมวลผลแบบ Parallel อีกทั้งยังมีความยืดหยุ่นเช่น หากมี Node ที่ Down ขณะทำงาน Spark จะไปกระจายงานให้กับ Node อื่นที่ยังมีข้อมูลแบบเดียวกันกับ Node ที่ Down ไปทำให้ยังสามารถประมวลผลต่อได้

     Lazy Execution : สำหรับ Apache Spark จะมีการประมวลผลแบบ Lazy Execution คือจะไม่ทำการประมวลผลจนกว่าจะเจอ คำสั่งที่เป็น Action

     Transformation : สำหรับ Apache Spark จะมีคำสั่งเกี่ยวกับ Transformations หรือจัดข้อมูลให้อยู่ในรูปแบบที่ต้องการเช่น
     map(func)
     filter(func)
     flatMap(func)
     mapPartitions(func)

     Action : สำหรับ Apache Spark จะมีคำสั่งเกี่ยวกับ Action ต่างๆเช่น
     reduce(func)
     collect()
     count()
     first()
     take(n)
     saveAsTextFile(path)