ในบทความนนี้จะนำเสนอการทำ Spark Streaming Real-time Solr Indexer ก่อนอื่นเลยต้องเกริ่นกันก่อนว่า Apache Solr ไว้ใช้ทำ Real-time Search คล้ายๆกับ Google ขอไม่ลงรายละเอียดนะครับ โดยจะ Stream ข้อมูลผ่านทาง Apache Kafka ซึ่งเป็น Messaging Queue เหมาะกับการทำงานเป็น Parallel โดยเขียนตัวอย่างเล็กๆได้ด้านล่างครับ
import com.lucidworks.spark.util.SolrSupport;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.SolrDocument
import kafka.producer._
import kafka.serializer.StringDecoder
import org.apache.spark.SparkContext
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
object SparkStreamingRealtimeSolrIndexer {
def main(args: Array[String]) {
val zkQuorum = "myzookeeper:2181/solr"
val brokers_list = "kafka01:9092,kafka02:9092,kafka03:9092"
val input_topic = "solr_test"
val solrCollection = "mycollection"
val SCconf = new SparkConf().setAppName("SparkStreamingRealtimeSolrIndexer")
.setMaster("local")
val sc = new SparkContext(SCconf)
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaParams = Map("metadata.broker.list" -> brokers_list)
val topics = Set(input_topic)
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
val lines = stream.map(_._2).map(_.split(","))
val docs = lines.map { doc =>
val docx = SolrSupport.autoMapToSolrInputDoc(doc(0), doc, null)
docx.setField("field_1", doc(1))
docx.setField("field_2", doc(2))
docx
}
SolrSupport.indexDStreamOfDocs(zkQuorum, solrCollection, 10, docs);
ssc.start()
ssc.awaitTermination()
}
}