Using the Kafka 0.9 API, you can configure a Spark application to produce MapR
Streams messages.
-
Add the following dependency:
groupId = org.apache.spark
artifactId = spark-streaming-kafka-producer_2.11
version = <spark_version>-mapr-<mapr_eco_version>
Note: If you would like to use Streaming Producer Examples, you must add the
appropriate Spark streaming Kafka producer jar from the MapR Maven
repository to the Spark classpath
(/opt/mapr/spark/spark-<spark_version>/jars/.
-
When you write the Spark program, import and use classes from
org.apache.spark.streaming.kafka.producer._ and
org.apache.spark.streaming.dstream.
The import of
org.apache.spark.streaming.stream.DStream adds
the following method from
DStream:
sendToKafka(topic: String, conf: ProducerConf)
In the code below, calling
sendToKafka will send
numMessages messages to the set of topics specified by the
topics parameter.
val producerConf = new ProducerConf(bootstrapServers = kafkaBrokers.split(",").toList)
.withKeySerializer("org.apache.kafka.common.serialization.ByteArraySerializer")
.withValueSerializer("org.apache.kafka.common.serialization.StringSerializer")
val items = (0 until numMessages.toInt).map(i => Item(i, i).toString)
val defaultRDD: RDD[String] = ssc.sparkContext.parallelize(items)
val dStream: DStream[String] = new ConstantInputDStream[String](ssc, defaultRDD)
dStream.foreachRDD(_.sendToKafka(topics, producerConf))
dStream.count().print()
The
org.apache.kafka.common.serialization.ByteArraySerializer
and org.apache.kafka.common.serialization.StringSerializer
properties are used by default, and in case you do not want to use another
serializer, withKeySerializer and
withValueSerializer methods are not
necessary.