Skip to content

An opinionated approach to use Apache Kafka in Dotnet

License

Notifications You must be signed in to change notification settings

haamond/Alamut.Kafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

68 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Alamut.Kafka

The purpose of this library is to easy to use Apache.Kafka in Dotnet (Console, ASP.NET, Web API) application on top of Dotnet Core Dependency Injection infrastructure.
Actually it's a wrapper of Confluent's Apache Kafka .NET client

Documentation is based on v0.8.1


Installing Alamut.Kafka

You should install Alamut.Kafka with NuGet:

Install-Package Alamut.Kafka

Or via the .NET Core command-line interface:

dotnet add package Alamut.Kafka

Either commands, from Package Manager Console or .NET Core CLI, will download and install all required dependencies.


Basic Configuration

These simple basic settings are needed to communicate with Kafka
*This configuration will be used for both ProducerConfig and ConsumerConfig *

"KafkaConfig": {
    "BootstrapServers": "10.104.51.12:9092,10.104.51.13:9092,10.104.51.14:9092",
    "GroupId": "alamut.group",
    // All Producer and Consumer configuration
  }

You have to inject configuration into your DI :

// for Consumer configuration
services.AddPoco<ConsumerConfig>(Configuration, "KafkaConfig");
// for Producer configuration
services.AddPoco<ProducerConfig>(Configuration, "KafkaConfig");

AddPoco is an Alamut Helper

comprehensive samples


Producer

Producer publish a message into specified topic. We ususally use IPublisher as a producer in our application. The publisher could publish a message in a variety types of data structure:

  • String (use native Kafka Client's serializer)
  • Object (serialize it to JSON)
  • IMessage

Producer Sample

IPublisher publisher = new KafkaProducer(*/dependencies provided by DI*/);

// string message
await publisher.Publish("alamut.messaging.kafka", "a string message");

// object message
var objectMessage = new Foo
{
    Bar = message
};
await publisher.Publish("alamut.messaging.kafka", objectMessage);

// IMessage message
var typedMessage = new Foo
{
    Bar = message
};
await publisher.Publish("alamut.messaging.kafka", MessageFactory.Build(typedMessage));

we will talk about the MessageFactory in more details latter.

Register Producer
If you want to get IPublisher through DI you should register it in project Startup:

services.AddSingleton<IPublisher, KafkaProducer>();

Consumer

Consumer subscribes to the specified topic(s) and works as a Background Hosted Service.

Consumer automatically calls the classes that implemented IMessageHandler interface that decorated with TopicsAttribute.

Message Handler Sample:

using System.Threading;
using System.Threading.Tasks;

using Alamut.Abstractions.Messaging;
using Alamut.Kafka.Models;
using Microsoft.Extensions.Logging;

namespace Alamut.Kafka.Consumer.Subscribers
{
    [Topics("alamut.messaging.kafka")]
    public class SendSmsGeneric : IMessageHandler<Message<Foo>>
    {
        private readonly ILogger _logger;

        public SendSmsGeneric(ILogger<SendSmsGeneric> logger)
        {
            _logger = logger;

        }
        public Task Handle(Message<Foo> message, CancellationToken token)
        {
            _logger.LogInformation($"Received message <{ message.Body.Bar }>");

            return Task.CompletedTask;
        }
    }
}

In the example above SendSmsGeneric handles FooMessage that published in alamut.messaging.kafka Topic. (other Message handlers have not yet documented)

Consumer Registration and Wiring

  • First of all, you need a simple configuration that described above.
  • Register Message Handlers:
    services.RegisterMessageHandlers(typeof(SendSmsGeneric).Assembly);
    registers all classes that implemented IMessageHandler in the specified assembly.
  • Then you have to register your Hosted Service to subscribe to Kafka Messages, There are two ways:
    • Register Hosted Service with default GroupId and specified Topics that discovered in RegisterMessageHandlers section:
      services.AddHostedSubscriber();
    • Register Hosted Service for specifics Topic(s):
      • services.AddNewHostedSubscriber("alamut.messaging.kafka", ... );
        in this case, a Hosted Services registered and handles just provided Topic(s).
      • services.AddNewHostedSubscriber(KafkaHelper.GetAllTopics(typeof(SendSmsGeneric).Assembly));
        a Hosted Service registered and handles topics provide by KafkaHelper.GetAllTopics

With these two steps your wiring will be completed. (example)
There are other ways to do this that will be explained later

Packages

No packages published

Languages