วันพฤหัสบดีที่ 20 ตุลาคม พ.ศ. 2559

Apache Spark Streaming Realtime Solr Indexer

ผลการค้นหารูปภาพสำหรับ solr

     ในบทความนนี้จะนำเสนอการทำ Spark Streaming Real-time Solr Indexer ก่อนอื่นเลยต้องเกริ่นกันก่อนว่า Apache Solr ไว้ใช้ทำ Real-time Search คล้ายๆกับ Google ขอไม่ลงรายละเอียดนะครับ โดยจะ Stream ข้อมูลผ่านทาง Apache Kafka ซึ่งเป็น Messaging Queue เหมาะกับการทำงานเป็น Parallel โดยเขียนตัวอย่างเล็กๆได้ด้านล่างครับ

import com.lucidworks.spark.util.SolrSupport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocument

import kafka.producer._
import kafka.serializer.StringDecoder

import org.apache.spark.SparkContext
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf

object SparkStreamingRealtimeSolrIndexer {
  def main(args: Array[String]) {

    val zkQuorum = "myzookeeper:2181/solr"

    val brokers_list = "kafka01:9092,kafka02:9092,kafka03:9092"
    val input_topic = "solr_test"

    val solrCollection = "mycollection"
    val SCconf = new SparkConf().setAppName("SparkStreamingRealtimeSolrIndexer")
      .setMaster("local")
    val sc = new SparkContext(SCconf)
    val ssc = new StreamingContext(sc, Seconds(10))

    val kafkaParams = Map("metadata.broker.list" -> brokers_list)
    val topics = Set(input_topic)

    val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    val lines = stream.map(_._2).map(_.split(","))

    val docs = lines.map { doc =>
      val docx = SolrSupport.autoMapToSolrInputDoc(doc(0), doc, null)
      docx.setField("field_1", doc(1))
      docx.setField("field_2", doc(2))
      docx
    }
    SolrSupport.indexDStreamOfDocs(zkQuorum, solrCollection, 10, docs);

    ssc.start()
    ssc.awaitTermination()
  }
}


ไม่มีความคิดเห็น:

แสดงความคิดเห็น