This topic includes basic information about how to develop a HPE Ezmeral Data Fabric Event Store Python application and an example program that you can run.
mytopic in a stream named
my_stream.
from confluent_kafka import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
p.produce('mytopic', data.encode('utf-8'))
p.flush()
mytopic in a stream named
my_stream.
from mapr_streams_python import Producer
p = Producer({'streams.producer.default.stream': '/my_stream'})
some_data_source= ["msg1", "msg2", "msg3"]
for data in some_data_source:
p.produce('mytopic', data.encode('utf-8'))
p.flush()
my_stream/mytopic and it prints the content of each
message that it reads.
from confluent_kafka import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
msg = c.poll(timeout=1.0)
if msg is None: continue
if not msg.error():
print('Received message: %s' % msg.value().decode('utf-8'))
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
my_stream/mytopic and it prints the content of each
message that it reads.
from mapr_streams_python import Consumer, KafkaError
c = Consumer({'group.id': 'mygroup',
'default.topic.config': {'auto.offset.reset': 'earliest'}})
c.subscribe(['/my_stream:mytopic'])
running = True
while running:
msg = c.poll(timeout=1.0)
if msg is None: continue
if not msg.error():
print('Received message: %s' % msg.value().decode('utf-8'))
elif msg.error().code() != KafkaError._PARTITION_EOF:
print(msg.error())
running = False
c.close()
mystream.$ python producer.py$ python consumer.py