This topic includes basic information about how to develop a HPE Ezmeral Data Fabric Event Store C application. Sample applications are provided.
Confirm that your environment meets the following requirements:
The following sections describes how to create a producer and consumer in C, compile the source code, generate executables, and run the applications.
rd_kafka_conf_new() to create the producer configuration.rd_kafka_new () to create the producer handle. rd_kafka_topic_conf_new() to create the topic configuration.rd_kafka_topic_new () to create a topic handle for the
producer.rd_kafka_produce() to produce messages.rd_kafka_poll()to poll for callbacks. This is useful
to see if there are messages that have yet to be sent to the server.rd_kafka_topic_destroy () to destroy the topic handle
destroyrd_kafka_destroy () to destroy the producer handle. For example, the following source code produces 5 messages to topic /MapR_Streams:MapR-Topic1:
/*
* This file contains the producer function.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <rdkafka.h>
#include <errno.h>
/* msgDeliveryCB: Is the delivery callback.
* The delivery report callback will be called once for each message
* accepted by rd_kafka_produce() with err set to indicate
* the result of the produce request. An application must call rd_kafka_poll()
* at regular intervals to serve queued delivery report callbacks.
*/
static void msgDeliveryCB (rd_kafka_t *rk,
const rd_kafka_message_t *rkmessage, void *opaque) {
if (rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("FAILURE: Message not delivered to partition.\n");
printf("ERROR: %s", rd_kafka_err2str(rkmessage->err));
} else {
printf("Produced: %.*s\n",(int)rkmessage->len, (const char*)rkmessage->payload);
}
}
/*
* Method : int producer(int nummsgs_p, const char *fullTopicName)
* Description : This is a simple producer method. In this method the producer
* produces messages to a topic.
*/
int producer(int nummsgs_p, const char *fullTopicName) {
printf("************************ PRODUCER ************************\n");
rd_kafka_t *prodHndle;
rd_kafka_conf_t *prodCfg;
char errstr[1000];
int totalMsgs = nummsgs_p;
printf("Create producer configuration object\n");
/*
* rd_kafka_conf_new(): This API creates default rd_kafka_conf_t object to
* be passed at the time of producer object creation using rd_kafka_new call.
*/
prodCfg = rd_kafka_conf_new();
if (prodCfg == NULL) {
printf("Failed to create conf\n");
return (EXIT_FAILURE);
}
/* rd_kafka_conf_set_dr_msg_cb(): This API sets the producer callback
* 'msgDeliveryCB' in producer config 'prodCfg'
* The delivery report callback will be called once for each message
* accepted by rd_kafka_produce() with err set to indicate
* the result of the produce request. An application must call rd_kafka_poll()
* at regular intervals to serve queued delivery report callbacks.
*/
rd_kafka_conf_set_dr_msg_cb(prodCfg, msgDeliveryCB);
printf("Create Producer Kafka handle\n");
/*
* rd_kafka_new():Creates a new Kafka handle and starts its operation
* according to the specified type (RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER).
* prodCfg object passed here is freed by this function and must not be used
* or destroyed by the application subsequently. errstr must be a pointer to
* memory of at least size errstr_size where
* `rd_kafka_new()` may write a human readable error message in case the
* creation of a new handle fails. In which case the function returns NULL.
*/
prodHndle = rd_kafka_new(RD_KAFKA_PRODUCER, prodCfg, errstr, sizeof(errstr));
if (prodHndle == NULL) {
printf("Failed to create producer: %s\n", errstr);
return (EXIT_FAILURE);
}
/*
* Following code does following:
* 1. Create a topic handle for each producer-topic combination
* 2. Produce 'totalMsgs' # of messages using topic handle created in step 1
* 3. Wait for all messages to be produced and callback to be delivered.
* 4. Move on to next topic and repeat.
*/
int totalTopics = 1;
for (int nTopics = 0; nTopics < totalTopics ; nTopics++) {
printf("Create topic handle\n");
rd_kafka_topic_conf_t *prodTopicCfg;
/*
* rd_kafka_topic_conf_new(): This API Creates topic conf object
* required to create topic handle which then will be used for each
* producer-topic combination
*/
prodTopicCfg = rd_kafka_topic_conf_new();
if (prodTopicCfg == NULL) {
printf("Failed to create new topic conf\n");
return (EXIT_FAILURE);
}
rd_kafka_topic_t *prodTopicHndl;
/*
* rd_kafka_topic_new(): This API Creates topic handle for a given
* producer, topic name and topic config. Topic handles are refcounted
* internally and calling rd_kafka_topic_new()
* again with the same topic name will return the previous topic handle
* without updating the original handle's configuration.
* Applications must eventually call rd_kafka_topic_destroy() for each
* succesfull call to rd_kafka_topic_new() to clear up resources.
*/
prodTopicHndl = rd_kafka_topic_new(prodHndle, fullTopicName, prodTopicCfg);
if (prodTopicHndl == NULL) {
printf("Failed to create new topic handle\n");
return (EXIT_FAILURE);
}
prodTopicCfg = NULL; /* Now owned by topic */
const char* key ="Key";
printf("Send/Produce message to topic: %s\n", fullTopicName);
for (int i = 0; i < totalMsgs; i++) {
char payload[1000];
if (i == 0)
sprintf(payload, "%s", "Welcome to MapR Streams CAPI");
else
sprintf(payload, "MapR Streams CAPI Message Payload %d", i);
/*
* rd_kafka_produce(): This API produces a single message
* to the cluster. prodTopicHandle must be created using
* rd_kafka_topic_new() api. This is an asynch non-blocking API.
* RD_KAFKA_PARTITION_UA is used to indicate automatic
* partitioning, using topics partitioner or fixed partition
* can be provided. RD_KAFKA_MSG_F_COPY flag indicates that
* library copies the payload and application manages its own
* payload memory. If API fails to send, errno will be set
* accordingly and will be able to access librdkafka specific
* error using rd_kafka_last_error() api.
*/
if (rd_kafka_produce(prodTopicHndl,
RD_KAFKA_PARTITION_UA,
RD_KAFKA_MSG_F_COPY,
payload,
strlen(payload),
key,
strlen(key),
NULL) == -1) {
int errNum = errno;
printf("Failed to produce to topic : %s\n", rd_kafka_topic_name(prodTopicHndl));
printf("Error Number: %d ERROR NAME: %s\n"
,errNum, rd_kafka_err2str(rd_kafka_last_error()));
return (errNum);
}
}
printf("Wait for messages to be delivered\n");
/*
* rd_kafka_outq_len(): This API out queue contains messages waiting
* to be sent to, or acknowledged by, server.
* An application should wait for this queue to reach zero before
* terminating to make sure outstanding requests are fully processed.
*
* rd_kafka_poll(): This API polls the producer handle for events,
* which will cause application provided callbacks to be called.
* An application must call rd_kafka_poll() at regular intervals to
* serve queued delivery report callbacks. In this case
* 'msgDeliveryCB' will get called.
*/
while (rd_kafka_outq_len(prodHndle) > 0)
rd_kafka_poll(prodHndle, 100);
printf("\nDestroy topic handle\n");
/*
* Applications must eventually call rd_kafka_topic_destroy() for each
* succesfull call to rd_kafka_topic_new() to clear up resources.
*/
rd_kafka_topic_destroy(prodTopicHndl);
}
printf("Destroy producer handle\n");
/*
* rd_kafka_destroy(): This API destroys the producer handle created using
* rd_kafka_new call and frees resources.
*/
rd_kafka_destroy(prodHndle);
return(EXIT_SUCCESS);
}
/* MAIN */
int main(int argc, char *argv[]) {
/* Number of messages the producer will produce */
int nummsgs_p = 5;
/* This is pre created Stream with one topic and one partition*/
const char* fullTopicName = "/MapR_Streams:MapR-Topic1";
int ret_val;
/* Produce Messages */
ret_val = producer(nummsgs_p, fullTopicName);
if (EXIT_SUCCESS != ret_val) {
printf("\nFAIL: producer failed\n");
} else {
printf("\nPASS: %d messages produced and sent to topic partition %s \n", nummsgs_p, fullTopicName);
}
}rd_kafka_conf_new() to create the consumer configuration.rd_kafka_conf_set() to set the configuration parameters. For this
API, you must set the "group.id." rd_kafka_new () to create the consumer handle.rd_kafka_subscribe() or rd_kafka_assign() to
specify which topics to consume. rd_kafka_consumer_poll() to poll for messages that are ready to
be consumed.rd_kafka_consumer_close() to perform auto commits and prepare to
destroy the consumer handle.rd_kafka_destroy () to destroy the consumer handle. For example, the following source code consumes 5 messages from topic /MapR_Streams:MapR-Topic1:
/*
* This file contains the consumer function.
*
*/
#include <stdio.h>
#include <stdlib.h>
#include <rdkafka.h>
#include <string.h>
/*
* Method : int consumer(int expected_nummsgs, const char *fullTopicName)
* Description : This is a simple consumer method. In this method the consumer
* consumes messages from a topic.
*/
int consumer(int expected_nummsgs, const char *fullTopicName) {
printf("********* CONSUMER START *********\n");
rd_kafka_t *consHndle;
rd_kafka_conf_t *consCfg;
rd_kafka_topic_conf_t *consTopicCfg;
char errstr[1000];
rd_kafka_resp_err_t errCode;
printf("Create new consumer configuration object\n");
/*
* rd_kafka_conf_new(): This API creates default rd_kafka_conf_t object to
* be passed at the time of consumer object creation using rd_kafka_new call.
*/
consCfg = rd_kafka_conf_new();
if(consCfg == NULL) {
printf("Failed to create consumer conf\n");
return(EXIT_FAILURE);
}
/*
* rd_kafka_conf_set(): This API is used to set config parameters in the
* rd_kafka_conf_t object. group.id Must be set for all the consumers.
* All changes to the consCfg must be done before creating consumer object.
*/
if(RD_KAFKA_CONF_OK != rd_kafka_conf_set(consCfg,
"group.id", "consumerGroup",
errstr, sizeof(errstr))) {
printf("rd_kafka_conf_set() failed with error: %s\n", errstr);
return (EXIT_FAILURE);
}
/*
* rd_kafka_topic_conf_new(): This API Creates topic conf object
* required to set the default topic configuration.
*/
printf("Set topic configurations\n");
consTopicCfg = rd_kafka_topic_conf_new();
/* rd_kafka_topic_conf_set(): This API sets the config property by name.
* consTopicCfg should have been previously set up with `rd_kafka_topic_conf_new()`
* property set in this call is 'auto.offset.reset', when set to
* earliest will return messages on rd_kafka_consumer_poll from beginning of
* time (for the very first time consumption) or from last commited offset
* for online consumer. If property is set to 'latest' it will return the
* messages produced after consumer has started(for first time consumer) or
* from the last committed offset for online consumer
*/
if (RD_KAFKA_CONF_OK != rd_kafka_topic_conf_set(consTopicCfg, "auto.offset.reset",
"earliest" ,errstr, sizeof(errstr))) {
printf("rd_kafka_topic_conf_set() failed with error: %s\n", errstr);
return (EXIT_FAILURE);
}
/*
* rd_kafka_conf_set_default_topic_conf(): This API sets the default topic
* configuration to use for automatically subscribed topics
* The topic config object is not usable after this call.
*/
rd_kafka_conf_set_default_topic_conf(consCfg, consTopicCfg);
printf("Create consumer Kafka handle\n");
/*
* rd_kafka_new():Creates a new Kafka handle and starts its operation
* according to the specified type (RD_KAFKA_CONSUMER or RD_KAFKA_PRODUCER).
* consCfg object passed here is freed by this function and must not be used
* or destroyed by the application subsequently. errstr must be a pointer to
* memory of at least size errstr_size where
* `rd_kafka_new()` may write a human readable error message in case the
* creation of a new handle fails. In which case the function returns NULL.
*/
consHndle = rd_kafka_new(RD_KAFKA_CONSUMER, consCfg, errstr, sizeof(errstr));
if(consHndle == NULL) {
printf("Failed to create consumer:%s", errstr);
return (EXIT_FAILURE);
}
/* rd_kafka_poll_set_consumer() is used to redirect the main queue which is
* serviced using rd_kafka_poll() to the rd_kafka_consumer_poll(). With one api
* 'rd_kafka_consumer_poll()' both callbacks and message are serviced.
* Once queue is forwarded using this API, it is not permitted to call
* rd_kafka_poll to service non message delivery callbacks.
*/
rd_kafka_poll_set_consumer(consHndle);
/* Topic partition list (tp_list) is supplied as an input to the consumer
* subscribe(using rd_kafka_subscribe()). The api rd_kafka_subscribe() expects
* that the partition argument to be set to RD_KAFKA_PARTITION_UA and internally
* all partitions are assigned to the consumer.
* Note: partition balancing/assignment is done if more consumers are part
* of the same consumer group.
*/
printf("Create topic partition list for topic: %s\n", fullTopicName);
rd_kafka_topic_partition_list_t *tp_list = rd_kafka_topic_partition_list_new(0);
rd_kafka_topic_partition_t* tpObj = rd_kafka_topic_partition_list_add(tp_list,
fullTopicName, RD_KAFKA_PARTITION_UA);
if (NULL == tpObj) {
printf("Could not add the topic partition to the list.\n");
return (EXIT_FAILURE);
}
printf("Subscribe consumer to the topic:\n");
/*
* rd_kafka_subscribe(): This API subscribes given consumer to the topic list
* provided in tp_list, depending upon number of consumers in a consumer group
* partitions will be balanced and assigned to each consumer.
*/
errCode = rd_kafka_subscribe(consHndle, tp_list);
if (errCode != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("Topic partition subscription failed. ERROR: %d\n", errCode);
return(errCode);
}
printf("Destroy topic partition list:\n");
/*
* rd_kafka_topic_partition_list_destroy(): This API is used to free all
* resources used by the list and the list itself.
*/
rd_kafka_topic_partition_list_destroy(tp_list);
printf("\nStart message consumption:\n");
int msg_count = 0;
while(1) {
/*
* rd_kafka_consumer_poll(): This API returns one message or callback at
* a time. An application should make sure to call consumer_poll() at regular
* intervals, even if no messages are expected, to serve any
* queued callbacks waiting to be called. When the application is finished
* with a message it must call rd_kafka_message_destroy() to destroy and
* message.
*/
rd_kafka_message_t *msg = rd_kafka_consumer_poll(consHndle, 1000);
if (msg != NULL) {
if (msg->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
msg_count++;
printf("%d Consumed: %.*s\n", msg_count,(int) msg->len,
(const char*)msg->payload);
if (msg_count == expected_nummsgs){
rd_kafka_message_destroy(msg);
break;
}
}
rd_kafka_message_destroy(msg);
}
}
printf("\nCommit the offsets before closing the consumer\n");
/*
* Commit offsets on broker for the provided list of topic partitions.
* when input is NULL the current partition assignment will be used instead.
* If async is false this operation will block until the offset commit
* is done, returning the resulting success or error code.
* This call is made to be sure that offsets are committed before closing
* consumer.
*/
int retVal = rd_kafka_commit(consHndle, NULL, false/*async*/);
if(retVal != RD_KAFKA_RESP_ERR_NO_ERROR) {
printf("rd_kafka_commit() failed");
return(EXIT_FAILURE);
}
printf("\nClose and destroy consumer handle\n");
/*
* Consumer shutdown sequense:
* 1. rd_kafka_consumer_close(): This is blocking call. It makes sure to revoke
* assignments, commit offsets, leave consumer group.
* The application still needs to call rd_kafka_destroy() after
* this call finishes to clean up the underlying handle resources.
* 2. rd_kafka_destroy(): This API destroys the consumer handle created using
* rd_kafka_new call and frees resources
*/
rd_kafka_consumer_close(consHndle);
rd_kafka_destroy(consHndle);
return(EXIT_SUCCESS);
}
/* MAIN */
int main(int argc, char *argv[]) {
/* Number of expected messages for the consumer */
int expected_nummsgs = 5;
/* This is pre created Stream with one topic and one partition*/
const char* fullTopicName = "/MapR_Streams:MapR-Topic1";
int ret_val;
/* Consume Messages */
ret_val = consumer(expected_nummsgs, fullTopicName);
if (EXIT_SUCCESS != ret_val) {
printf("\nFAIL: consumer failed\n");
} else {
printf("\nPASS: %d messages consumed from topic %s\n", expected_nummsgs, fullTopicName);
}
}
The following steps compile the source code and generate executables in the same directory
as the Makefile. For example, in the librdkafka_example directory, the
consumer and producer executables are generated from the
producer.c and consumer.c source files.
CC= g++
HEADERDIR=/opt/mapr/include/librdkafka/
CCFLAGS= -Wall -I$(HEADERDIR) -g -std=c99
export LD_LIBRARY_PATH=/opt/mapr/lib
LIBDIR= /opt/mapr/lib/
%.o: %.c
gcc $(CCFLAGS) -c $<
consumer: consumer.o
gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
producer: producer.o
gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
all: consumer producer
clean:
/bin/rm -f *.o consumer producer
CC= g++
HEADERDIR=/opt/mapr/include/librdkafka/
CCFLAGS= -Wall -I$(HEADERDIR) -g -std=c99
#Edit JAVA_HOME to be appropriate for your environment
JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64/
export LD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/amd64/server
LIBDIR= /opt/mapr/lib/
%.o: %.c
gcc $(CCFLAGS) -c $<
consumer: consumer.o
gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
producer: producer.o
gcc -o $@ $@.o -lrdkafka -L$(LIBDIR) $(CCFLAGS)
all: consumer producer
clean:
/bin/rm -f *.o consumer producerexport LD_LIBRARY_PATH=/opt/mapr/lib Then,
replace this line with the following line of code:
export DYLD_LIBRARY_PATH=/opt/mapr/libexport LD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/amd64/serverThen,
replace this line with the following line of code:
export DYLD_LIBRARY_PATH=/opt/mapr/lib:$(JAVA_HOME)/jre/lib/serverfind / -name libjvm* to
determine the JAVA_HOME directory on your machine. However, note that the
results of this command include the full path to the libjvm file not just the
JAVA_HOME directory. For example, JAVA_HOME may be set to
Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/
on a Mac and JAVA_HOME may be set to
/usr/lib/jvm/java-1.7.0-openjdk-1.7.0.79.x86_64/ on
Linux.
make cleanmake allOnce you have the application executables, complete the following steps to run the application:
maprcli stream create -path /MapR_Streamsstream create,the
producer will create the topic. By default, autocreate is enabled. For more information,
see stream create.error while loading shared libraries:
librdkafka.so.1: cannot open shared object file: No such file or directory
./producer************************ PRODUCER ************************
Create producer configuration object
Create Producer Kafka handle
Create topic handle
Send/Produce message to topic: /MapR_Streams:MapR-Topic1
Wait for messages to be delivered
Produced: Welcome to MapR Streams CAPI
Produced: MapR Streams CAPI Message Payload 1
Produced: MapR Streams CAPI Message Payload 2
Produced: MapR Streams CAPI Message Payload 3
Produced: MapR Streams CAPI Message Payload 4
Destroy topic handle
Destroy producer handle
PASS: 5 messages produced and sent to topic partition /MapR_Streams:MapR-Topic1
./consumer********* CONSUMER START *********
Create new consumer configuration object
Set topic configurations
Create consumer Kafka handle
Create topic partition list for topic: /MapR_Streams:MapR-Topic1
Subscribe consumer to the topic:
Destroy topic partition list:
Start message consumption:
1 Consumed: Welcome to MapR-ES CAPI
2 Consumed: MapR Streams CAPI Message Payload 1
3 Consumed: MapR Streams CAPI Message Payload 2
4 Consumed: MapR Streams CAPI Message Payload 3
5 Consumed: MapR Streams CAPI Message Payload 4
Commit the offsets before closing the consumer
Close and destroy consumer handle
PASS: 5 messages consumed from topic /MapR_Streams:MapR-Topic1