The ConsumerUncached.java application demonstrates how to connect to the MapR file system, and consume the messages in a stream topic.
Before running this application, ensure that you have access to a cluster running filesystem. To build and run this application:
export CLASSPATH=`hadoop classpath`javac -cp .:`mapr classpath` ConsumerUncached.javaConsumerUncached.class file. For example:
java -cp .:`mapr classpath` ConsumeUncachedThis application requires the following:
org.apache.kafka.clients.consumer.ConsumerRecordorg.apache.kafka.clients.consumer.ConsumerRecordsorg.apache.kafka.clients.consumer.KafkaConsumerorg.apache.hadoop.conf.Configurationcom.mapr.fs.MapRFileSystemcom.google.common.io.Resourcesjava.net.URIjava.io.IOExceptionjava.io.InputStreamjava.util.Iteratorjava.util.Propertiesjava.util.Randomjava.util.StringTokenizerjava.util.regex.PatternThe application performs the actions described in the following sections.
consumer.props file.
This file should be present in the current directory or mapr classpath. For example,
your consumer.props file could look similar to the following:
#bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
## apps specific ? session.timeout.ms=10000
# These buffer sizes are needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way.
fetch.min.bytes=50000
# receive.buffer.bytes=262144 // fixed size buffer
max.partition.fetch.bytes=2097152
auto.offset.reset=earliest
The application initializes the consumer properties stored in the
consumer.props file.
public static void main(String[] args) throws IOException,InterruptedException {
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
}
consumer = new KafkaConsumer<>(properties);
}
}true so that the audit logs generated by the operations for
converting fid to file path and volid to volume name are sent to the
ExpandAudit.json.log file used by the expandaudit utility and not to
the stream. It then selects the stream and subscribes to the topic to read at path /var/mapr/auditstream/auditlogstream:<clustername>.
Configuration conf = new Configuration();
String uri = MAPRFS_URI;
uri = uri + "mapr/";
conf.set("fs.default.name", uri);
MapRFileSystem fs = new MapRFileSystem();
fs.initialize(URI.create(uri), conf, true);
Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:<clustername>.+");
consumer.subscribe(pattern);boolean stop = false;
int pollTimeout = 1000;
while (!stop) {
ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
if (iterator.hasNext()) {
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
String value = record.value();
String rvalue = value.replace("\"","");
String recordValue = processRecord(fs, rvalue, value);
System.out.println((" Consumed Record: " + recordValue));
}
} else {
//stop = true;
}
}getMountPathFid() API and volid in the message to volume name
using the getVolumeName() API.
public static String processRecord(MapRFileSystem fs, String rvalue, String value)
{
StringTokenizer st = new StringTokenizer(rvalue, ",");
String lfidPath = "";
String lvolName = "";
while (st.hasMoreTokens()) {
String field = st.nextToken();
StringTokenizer st1 = new StringTokenizer(field, ":");
while (st1.hasMoreTokens()) {
String token = st1.nextToken();
if (token.endsWith("Fid")) {
String lfidStr = st1.nextToken();
String path= null;
try {
path = fs.getMountPathFid(lfidStr);
} catch (IOException e){ }
lfidPath = "\"FidPath\":\""+path+"\",";
if (token.endsWith("volumeId")) {
String volid = st1.nextToken();
String name= null;
try {
int volumeId = Integer.parseInt(volid);
name = fs.getVolumeName(volumeId);
}
catch (IOException e){ }
lvolName = "\"VolumeName\":\""+name+"\",";
}
}
}String result = "";
StringTokenizer st2 = new StringTokenizer(value,",");
while (st2.hasMoreTokens()) {
String tokens = st2.nextToken();
result = result + tokens + ",";
if (tokens.contains("Fid")) {
result = result + lfidPath;
}
if (tokens.contains("volumeId")) {
result = result + lvolName;
}
return result.substring(0, result.length() - 1);
}import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.hadoop.conf.Configuration;
import com.mapr.fs.MapRFileSystem;
import com.google.common.io.Resources;
import java.net.URI;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Properties;
import java.util.Random;
import java.util.StringTokenizer;
import java.util.regex.Pattern;
public class ConsumerUncached {
// Set the stream and topic to read from.
private static final String MAPRFS_URI = "maprfs:///";
public static void main(String[] args) throws IOException,InterruptedException {
//configureConsumer(args);
// and the consumer
KafkaConsumer<String, String> consumer;
try (InputStream props = Resources.getResource("consumer.props").openStream()) {
Properties properties = new Properties();
properties.load(props);
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000));
}
consumer = new KafkaConsumer<>(properties);
}
Configuration conf = new Configuration();
String uri = MAPRFS_URI;
uri = uri + "mapr/";
conf.set("fs.default.name", uri);
MapRFileSystem fs = new MapRFileSystem();
fs.initialize(URI.create(uri), conf, true);
//final String topic = "/var/mapr/auditstream/auditlogstream:<clustername>_atsqa4-130.qa.lab";
//Replace <clustername> by the name of cluster
Pattern pattern = Pattern.compile("/var/mapr/auditstream/auditlogstream:<clustername>.+");
// Subscribe to the topic.
consumer.subscribe(pattern);
boolean stop = false;
int pollTimeout = 1000;
while (!stop) {
// Request unread messages from the topic.
ConsumerRecords<String, String> consumerRecords = consumer.poll(pollTimeout);
Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator();
if (iterator.hasNext()) {
while (iterator.hasNext()) {
ConsumerRecord<String, String> record = iterator.next();
// Iterate through returned records, extract the value
// of each message, and print the value to standard output.
//System.out.println((" Consumed Record: " + record.toString()));
String value = record.value();
String rvalue = value.replace("\"","");
String recordValue = processRecord(fs, rvalue, value);
System.out.println((" Consumed Record: " + recordValue));
//System.out.println((" Consumed Record: " + value));
}
} else {
Thread.sleep(1000);
//stop = true;
}
}
consumer.close();
System.out.println("All done.");
}
public static String processRecord(MapRFileSystem fs, String rvalue, String value)
{
StringTokenizer st = new StringTokenizer(rvalue, ",");
String lfidPath = "";
String lvolName = "";
while (st.hasMoreTokens())
{
String field = st.nextToken();
StringTokenizer st1 = new StringTokenizer(field, ":");
while (st1.hasMoreTokens())
{
String token = st1.nextToken();
if (token.endsWith("Fid")) {
String lfidStr = st1.nextToken();
String path= null;
try {
path = fs.getMountPathFid(lfidStr);
} catch (IOException e){
}
lfidPath = "\"FidPath\":\""+path+"\",";
// System.out.println("\nPAth for fid " + lfidStr + "is " + path);
}
if (token.endsWith("volumeId")) {
String volid = st1.nextToken();
String name= null;
try {
int volumeId = Integer.parseInt(volid);
name = fs.getVolumeName(volumeId);
}
catch (IOException e){
}
lvolName = "\"VolumeName\":\""+name+"\",";
// System.out.println("\nVolume Name for volid " + volid + "is " + name);
}
}
}
String result = "";
StringTokenizer st2 = new StringTokenizer(value,",");
while (st2.hasMoreTokens()) {
String tokens = st2.nextToken();
result = result + tokens + ",";
if (tokens.contains("Fid")) {
result = result + lfidPath;
}
if (tokens.contains("volumeId")) {
result = result + lvolName;
}
}
return result.substring(0, result.length() - 1);
}
}