Skip to content

Commit

Permalink
Merge pull request #39 from Azure/kafka-support
Browse files Browse the repository at this point in the history
Added kafka-support related changes
  • Loading branch information
vaibhavatul47 authored Nov 19, 2024
2 parents ac8dbd5 + 43fbfcd commit de8e0d6
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
- "${CONFIG_PATH}:/Eventhubs_Emulator/ConfigFiles/Config.json"
ports:
- "5672:5672"
- "9092:9092"
environment:
BLOB_SERVER: host.docker.internal:10005
METADATA_SERVER: host.docker.internal:10007
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
- "${CONFIG_PATH}:/Eventhubs_Emulator/ConfigFiles/Config.json"
ports:
- "5672:5672"
- "9092:9092"
environment:
BLOB_SERVER: host.docker.internal:10005
METADATA_SERVER: host.docker.internal:10007
Expand Down
1 change: 1 addition & 0 deletions Docker-Compose-Template/docker-compose-default.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ services:
- "${CONFIG_PATH}:/Eventhubs_Emulator/ConfigFiles/Config.json"
ports:
- "5672:5672"
- "9092:9092"
environment:
BLOB_SERVER: azurite
METADATA_SERVER: azurite
Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,34 @@ To get started, refer to our GitHub Samples [here](https://github.com/Azure/azur

>[!TIP]
> $Default [consumer group](https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-features#consumer-groups) is created by default when emulator runs. You can't create $default consumer group with supplied configuration.
### Networking options
You can run and connect to Emulator in multiple ways. Use a `Connection String` from following as per your use-case:

- When the emulator container and interacting application are running natively on local machine:
```
"Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
```

- Applications (Containerized/Non-containerized) on the different machine and same local network can interact with Emulator using the IPv4 address of the machine:
```
"Endpoint=sb://192.168.y.z;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
```

- Application containers on the same bridge network can interact with Emulator using its alias or IP. Following connection string assumes the name of Emulator has default value i.e."eventhubs-emulator":
```
Endpoint=sb://eventhubs-emulator;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
```

- Application containers on the different bridge network can interact with Emulator using the "host.docker.internal" as host:
```
"Endpoint=sb://host.docker.internal;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
```

> **Note**
If you are using the Kafka protocol, ensure that you update the `Bootstrap Servers` property with the appropriate host from the options above, based on your use case.


## Support

There is no official support provided for Emulator.Any issues/suggestions should be reported via GitHub issues on [installation repo](https://github.com/Azure/azure-event-hubs-emulator-installer/issues).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 17
VisualStudioVersion = 17.11.35327.3
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "EventHubs-Emulator-Kafka-Demo", "EventHubs-Emulator-Kafka-Demo\EventHubs-Emulator-Kafka-Demo.csproj", "{43E62434-CA3C-4FC1-8CED-FC11ECD65910}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{43E62434-CA3C-4FC1-8CED-FC11ECD65910}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{43E62434-CA3C-4FC1-8CED-FC11ECD65910}.Debug|Any CPU.Build.0 = Debug|Any CPU
{43E62434-CA3C-4FC1-8CED-FC11ECD65910}.Release|Any CPU.ActiveCfg = Release|Any CPU
{43E62434-CA3C-4FC1-8CED-FC11ECD65910}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {69FBA9D2-A274-41C3-BC6E-D90284DFD869}
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
<RootNamespace>EventHubs_Emulator_Kafka_Demo</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.5.2" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
using Confluent.Kafka;

class Program
{
public static async Task Main(string[] args)
{
// Configuration
var kafkaBootstrapServers = "localhost:9092";
var eventHubsConnectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";
string eventHubName = "eh1";


// Producer
int produceCount = 100;
var producerConfig = new ProducerConfig
{
BootstrapServers = kafkaBootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = eventHubsConnectionString
};
using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
{
for (int i = 0; i < produceCount; i++)
{
var message = new Message<Null, string> { Value = $"Message {i}" };
var deliveryResult = await producer.ProduceAsync(eventHubName, message);
Console.WriteLine($"Delivered '{deliveryResult.Value}' to '{deliveryResult.TopicPartitionOffset}'");
}
}


// Consumer
int receiveCount = 0;
string consumerGroupId = "cg1";
var consumerConfig = new ConsumerConfig
{
BootstrapServers = kafkaBootstrapServers,
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = eventHubsConnectionString,
GroupId = consumerGroupId,
EnableAutoCommit = true,
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Null, string>(consumerConfig).Build())
{
consumer.Subscribe(eventHubName);

CancellationTokenSource cts = new CancellationTokenSource(TimeSpan.FromSeconds(15));
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};

try
{
while (receiveCount < produceCount)
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Message.Value}' from: '{cr.TopicPartitionOffset}'.");
receiveCount += 1;
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Cancelled!");
}

consumer.Close();
}

Console.WriteLine($"Produced: {produceCount} messages");
Console.WriteLine($"Consumed: {receiveCount} messages");
}
}

0 comments on commit de8e0d6

Please sign in to comment.