Describes general tasks for developing C#/.NET applications.
Confirm that your environment meets the following requirements:
In the following example code, three messages are produced to a topic named mytopic in a stream named my_stream.
class Producer
{
public static async void Produce()
{
string stream = "/my_stream";
string topicName = "mytopic";
var config = new Dictionary<string, object> { { "streams.producer.default.stream", stream } };
var messages = new string[] { "Msg1", "Msg2", "Msg3" };
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
foreach (var msg in messages)
{
var deliveryReport = await producer.ProduceAsync(topicName, null, msg);
Console.WriteLine($"Delivery report:{deliveryReport.TopicPartitionOffset}");
}
producer.Flush(TimeSpan.FromSeconds(1));
}
}
}
In following example code, the HPE Ezmeral Data Fabric Event Store consumer
is subscribed to my_stream/mytopic and it prints the content of each
message that it reads.
using Confluent.Kafka;
using Confluent.Kafka.Serialization
class Consumer
{
public static void Consume()
{
var stream = "/mystream";
var topic = "mytopic";
var config = new Dictionary<string, object>
{
{ "group.id", "simple-csharp-consumer" },
{ "streams.consumer.default.stream", stream }
};
bool running = true;
using (var consumer = new Consumer<Ignore, string>(config, null, new StringDeserializer(Encoding.UTF8)))
{
var l = new List<TopicPartitionOffset> { new TopicPartitionOffset(topic, 0, 0) };
consumer.Assign(l);
// Raised on critical errors, e.g. connection failures.
consumer.OnError += (_, error) =>
{
Console.WriteLine($"Error: {error}");
running = false;
};
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, error) =>
{
Console.WriteLine($"Consume error: {error}");
running = false;
};
while (running)
{
Message<Ignore, string> msg;
if (consumer.Consume(out msg, TimeSpan.FromSeconds(10)))
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
}
}
}
}
}
To run the sample producer and consumer applications:
class Demo
{
public static void Main(string[] args)
{
Producer.Produce();
Consumer.Consume();
}
}
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="15.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{99EDBA4B-D7DA-48BB-8D0C-AF4B12387935}</ProjectGuid>
<OutputType>Exe</OutputType>
<RuntimeIdentifiers>win10-x64</RuntimeIdentifiers>
<RootNamespace>app</RootNamespace>
<AssemblyName>app</AssemblyName>
<TargetFrameworkVersion>v4.6.1</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>bin\Debug\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Compile Include="app.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="mapr-streams-dotnet" Version="0.11.3" />
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
</Project>dotnet run