Using the Kafka 0.9 API, you can configure a Spark application to query HPE Ezmeral Data Fabric Event Store for new messages at a given interval. This information is for Spark 2.2.1 and later users.
cp /opt/mapr/lib/kafka-clients-<version>.jar SPARK_HOME/jars
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-9_2.11
version = <spark_version>-mapr-<mapr_eco_version>
/opt/mapr/spark/spark-<spark_version>/jars/.org.apache.spark.streaming.kafka09. The
following code snippet imports three
classes.import org.apache.spark.streaming.kafka09.{ConsumerStrategies, KafkaUtils, LocationStrategies}val kafkaParams = Map[String, String](
ConsumerConfig.GROUP_ID_CONFIG -> groupId,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> offsetReset)spark.streaming.kafka.consumer.poll.ms in the Spark
configuration.
spark.streaming.kafka.consumer.poll.ms. If you
do not configure
spark.streaming.kafka.consumer.poll.ms, the
spark.network.timeout property is used. If
spark.network.timeout is empty, the default is
120 seconds.val sparkConf = new SparkConf()
.setAppName("v09DirectKafkaWordCount")
.set("spark.streaming.kafka.consumer.poll.ms", pollTimeout)
The
KafkaUtils.createDirectStream method creates an input
stream to read HPE Ezmeral Data Fabric Event Store messages. The
ConsumerStrategies.Subscribe method creates the
consumerStrategy that will limit the set of topics the
stream subscribes to. This is derived from the topics
parameter passed into the program. Using
LocationStategies.PreferConsistent will distribute
partitions evenly across available
executors.
val consumerStrategy = ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams)
val messages = KafkaUtils.createDirectStream[String, String](
ssc, LocationStrategies.PreferConsistent, consumerStrategy)