Describes the CDC open format.
The following shows the mapping between the MapR CDC data types and the JSON open format data types.
{
"map": {
"null": null,
"boolean" : true,
"string": "eureka",
"byte" : {"$numberByte": 127},
"short": {"$numberShort": 32767},
"int": {"$numberInt": 2147483647},
"long": {"$numberLong":9223372036854775807},
"float" : {"$numberFloat":3.4028235E38},
"double" : 1.7976931348623157e308,
"decimal": {"$decimal": "12345678901234567890189012345678901.23456789"},
"date": {"$dateDay": "yyyy-mm-dd"},
"time" : {"$time" : "HH:mm:ss[.sss]"},
"timestamp" : {"$date" : "yyyy-MM-ddTHH:mm:ss.SSSXXX"},
"interval" : {"$interval" : number_of_millisecods},
"binary" : {"$binary" : "base64_encoded_binary_value"},
"array" : [42, "open sesame", 3.14, {"$dateDay": "2015-01-21"}]
}
}
The following example changed data record shows two (2) mutations.
{
"_id":"row1"
"$opType":"$RECORD_UPDATE",
"$opTime":1518654391801,
"$mutations":[
{"$fieldPath":"arrayB,
"$fieldOp":"$SET",
"$fieldValue":[{"$numberInt":100},false,"set a map"]
}
{"$fieldPath":"arrayC,
"$fieldOp":"$SET",
"$fieldValue":[{"$numberInt":200},false,"set a map"]
}
]
}
The following sample code initialized consumer properties for open format and consumes the changelog data from the topic.
/*
* Initialize Basic Consumer Properties for Open Format
* @return
*/
private Properties getOpenFormatListenerProperties() {
Properties props = new Properties();
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
return props; }
/*
* Consume from changelog topic
*
*/
public void startConsume(String topic) {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String> (getOpenFormatListnerProperties());
List<String> topicList = new ArrayList<>();
topicList.add(topic);
consumer.subscribe(topicList);
ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
while (iterator.hasNext())
{ ConsumerRecord<String, String> record = iterator.next(); String cdcResult = record.value(); }
}