Skip to content

Commit

Permalink
Rename public Config types to Configuration (#65)
Browse files Browse the repository at this point in the history
* Rename *Config types to *Configuration

Modifications:

* `KafkaConsumerConfig` -> `KafkaConsumerConfiguration`
* `KafkaProducerConfig` -> `KafkaProducerConfiguration`
* `KafkaTopicConfig` -> `KafkaTopicConfiguration`

* * update `soundness.sh` to support new Configuration filenames
  • Loading branch information
felixschlegel authored Jun 21, 2023
1 parent 840242c commit 1173b07
Show file tree
Hide file tree
Showing 10 changed files with 37 additions and 37 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ SwiftKafka is a Swift Package in development that provides a convenient way to c
The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.

```swift
let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"])
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])

let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements(
config: config,
Expand Down Expand Up @@ -47,7 +47,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
After initializing the `KafkaConsumer` with a topic-partition pair to read from, messages can be consumed using the `messages` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence).

```swift
let config = KafkaConsumerConfig(
let config = KafkaConsumerConfiguration(
consumptionStrategy: .partition(
topic: "topic-name",
partition: KafkaPartition(rawValue: 0)
Expand Down Expand Up @@ -75,7 +75,7 @@ for await messageResult in consumer.messages {
SwiftKafka also allows users to subscribe to an array of topics as part of a consumer group.

```swift
let config = KafkaConsumerConfig(
let config = KafkaConsumerConfiguration(
consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]),
bootstrapServers: ["localhost:9092"]
)
Expand All @@ -100,7 +100,7 @@ for await messageResult in consumer.messages {
By default, the `KafkaConsumer` automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually.

```swift
let config = KafkaConsumerConfig(
let config = KafkaConsumerConfiguration(
consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]),
enableAutoCommit: false,
bootstrapServers: ["localhost:9092"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import Crdkafka
import struct Foundation.UUID

public struct KafkaConsumerConfig: Hashable, Equatable {
public struct KafkaConsumerConfiguration: Hashable, Equatable {
// MARK: - SwiftKafka-specific Config properties

/// The backpressure strategy to be used for message consumption.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//

public struct KafkaProducerConfig: Hashable, Equatable {
public struct KafkaProducerConfiguration: Hashable, Equatable {
var dictionary: [String: String] = [:]

// MARK: - Producer-specific Config Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
//===----------------------------------------------------------------------===//

/// Used to configure new topics created by the ``KafkaProducer``.
public struct KafkaTopicConfig: Hashable, Equatable {
public struct KafkaTopicConfiguration: Hashable, Equatable {
var dictionary: [String: String] = [:]

/// This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, -1 or all=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than min.insync.replicas (broker configuration) in the ISR set the produce request will fail.
Expand Down
6 changes: 3 additions & 3 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public struct ConsumerMessagesAsyncSequence: AsyncSequence {
/// Receive messages from the Kafka cluster.
public final class KafkaConsumer {
/// The configuration object of the consumer client.
private var config: KafkaConsumerConfig
private var config: KafkaConsumerConfiguration
/// A logger.
private let logger: Logger
/// Used for handling the connection to the Kafka cluster.
Expand All @@ -85,11 +85,11 @@ public final class KafkaConsumer {
/// Initialize a new ``KafkaConsumer``.
/// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)``
/// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``.
/// - Parameter config: The ``KafkaConsumerConfig`` for configuring the ``KafkaConsumer``.
/// - Parameter config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``.
/// - Parameter logger: A logger.
/// - Throws: A ``KafkaError`` if the initialization failed.
public init(
config: KafkaConsumerConfig,
config: KafkaConsumerConfiguration,
logger: Logger
) throws {
self.config = config
Expand Down
28 changes: 14 additions & 14 deletions Sources/SwiftKafka/KafkaProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence {

/// Send messages to the Kafka cluster.
/// Please make sure to explicitly call ``shutdownGracefully(timeout:)`` when the ``KafkaProducer`` is not used anymore.
/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfig``
/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration``
/// configuration object (only works if server has `auto.create.topics.enable` property set).
public actor KafkaProducer {
/// States that the ``KafkaProducer`` can have.
Expand All @@ -58,8 +58,8 @@ public actor KafkaProducer {
/// Counter that is used to assign each message a unique ID.
/// Every time a new message is sent to the Kafka cluster, the counter is increased by one.
private var messageIDCounter: UInt = 0
/// The ``TopicConfig`` used for newly created topics.
private let topicConfig: KafkaTopicConfig
/// The ``TopicConfiguration`` used for newly created topics.
private let topicConfig: KafkaTopicConfiguration
/// A logger.
private let logger: Logger
/// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer.
Expand All @@ -70,13 +70,13 @@ public actor KafkaProducer {

// Private initializer, use factory methods to create KafkaProducer
/// Initialize a new ``KafkaProducer``.
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
private init(
client: KafkaClient,
topicConfig: KafkaTopicConfig,
topicConfig: KafkaTopicConfiguration,
logger: Logger
) async throws {
self.client = client
Expand All @@ -90,14 +90,14 @@ public actor KafkaProducer {
///
/// This factory method creates a producer without message acknowledgements.
///
/// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter configuration: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfiguration: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: The newly created ``KafkaProducer``.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
public static func makeProducer(
config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
config: KafkaProducerConfiguration = KafkaProducerConfiguration(),
topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
logger: Logger
) async throws -> KafkaProducer {
let client = try RDKafka.createClient(
Expand All @@ -124,15 +124,15 @@ public actor KafkaProducer {
///
/// - Important: When the asynchronous sequence is deinited the producer will be shutdown.
///
/// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics.
/// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics.
/// - Parameter logger: A logger.
/// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements``
/// `AsyncSequence` used for receiving message acknowledgements.
/// - Throws: A ``KafkaError`` if initializing the producer failed.
public static func makeProducerWithAcknowledgements(
config: KafkaProducerConfig = KafkaProducerConfig(),
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
config: KafkaProducerConfiguration = KafkaProducerConfiguration(),
topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(),
logger: Logger
) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) {
var streamContinuation: AsyncStream<Result<KafkaAcknowledgedMessage, KafkaAcknowledgedMessageError>>.Continuation?
Expand Down
4 changes: 2 additions & 2 deletions Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import Crdkafka
/// A collection of helper functions wrapping common `rd_kafka_topic_conf_*` functions in Swift.
struct RDKafkaTopicConfig {
/// Create a new `rd_kafka_topic_conf_t` object in memory and initialize it with the given configuration properties.
/// - Parameter topicConfig: The ``KafkaTopicConfig`` used to initialize the `rd_kafka_topic_conf_t` object.
/// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used to initialize the `rd_kafka_topic_conf_t` object.
/// - Returns: An `OpaquePointer` pointing to the newly created `rd_kafka_topic_conf_t` object in memory.
/// - Throws: A ``KafkaError`` if setting a config value failed.
static func createFrom(topicConfig: KafkaTopicConfig) throws -> OpaquePointer {
static func createFrom(topicConfig: KafkaTopicConfiguration) throws -> OpaquePointer {
let configPointer: OpaquePointer = rd_kafka_topic_conf_new()
try topicConfig.dictionary.forEach { key, value in
try Self.set(configPointer: configPointer, key: key, value: value)
Expand Down
16 changes: 8 additions & 8 deletions Tests/IntegrationTests/SwiftKafkaTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,18 @@ final class SwiftKafkaTests: XCTestCase {
let kafkaHost = ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost"
let kafkaPort = ProcessInfo.processInfo.environment["KAFKA_PORT"] ?? "9092"
var bootstrapServer: String!
var producerConfig: KafkaProducerConfig!
var producerConfig: KafkaProducerConfiguration!
var uniqueTestTopic: String!

override func setUpWithError() throws {
self.bootstrapServer = "\(self.kafkaHost):\(self.kafkaPort)"

self.producerConfig = KafkaProducerConfig(
self.producerConfig = KafkaProducerConfiguration(
bootstrapServers: [self.bootstrapServer],
brokerAddressFamily: .v4
)

let basicConfig = KafkaConsumerConfig(
let basicConfig = KafkaConsumerConfiguration(
consumptionStrategy: .group(groupID: "no-group", topics: []),
bootstrapServers: [self.bootstrapServer],
brokerAddressFamily: .v4
Expand All @@ -57,7 +57,7 @@ final class SwiftKafkaTests: XCTestCase {
}

override func tearDownWithError() throws {
let basicConfig = KafkaConsumerConfig(
let basicConfig = KafkaConsumerConfiguration(
consumptionStrategy: .group(groupID: "no-group", topics: []),
bootstrapServers: [self.bootstrapServer],
brokerAddressFamily: .v4
Expand Down Expand Up @@ -94,7 +94,7 @@ final class SwiftKafkaTests: XCTestCase {

// Consumer Task
group.addTask {
let consumerConfig = KafkaConsumerConfig(
let consumerConfig = KafkaConsumerConfiguration(
consumptionStrategy: .group(groupID: "subscription-test-group-id", topics: [self.uniqueTestTopic]),
autoOffsetReset: .beginning, // Always read topics from beginning
bootstrapServers: [self.bootstrapServer],
Expand Down Expand Up @@ -151,7 +151,7 @@ final class SwiftKafkaTests: XCTestCase {

// Consumer Task
group.addTask {
let consumerConfig = KafkaConsumerConfig(
let consumerConfiguration = KafkaConsumerConfiguration(
consumptionStrategy: .partition(
topic: self.uniqueTestTopic,
partition: KafkaPartition(rawValue: 0),
Expand All @@ -163,7 +163,7 @@ final class SwiftKafkaTests: XCTestCase {
)

let consumer = try KafkaConsumer(
config: consumerConfig,
config: consumerConfiguration,
logger: .kafkaTest
)

Expand Down Expand Up @@ -212,7 +212,7 @@ final class SwiftKafkaTests: XCTestCase {

// Consumer Task
group.addTask {
let consumerConfig = KafkaConsumerConfig(
let consumerConfig = KafkaConsumerConfiguration(
consumptionStrategy: .group(groupID: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]),
enableAutoCommit: false,
autoOffsetReset: .beginning, // Always read topics from beginning
Expand Down
4 changes: 2 additions & 2 deletions Tests/SwiftKafkaTests/KafkaProducerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ final class KafkaProducerTests: XCTestCase {
let kafkaHost = ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost"
let kafkaPort = ProcessInfo.processInfo.environment["KAFKA_PORT"] ?? "9092"
var bootstrapServer: String!
var config: KafkaProducerConfig!
var config: KafkaProducerConfiguration!

override func setUpWithError() throws {
self.bootstrapServer = "\(self.kafkaHost):\(self.kafkaPort)"

self.config = KafkaProducerConfig(
self.config = KafkaProducerConfiguration(
bootstrapServers: [self.bootstrapServer],
brokerAddressFamily: .v4
)
Expand Down
2 changes: 1 addition & 1 deletion scripts/soundness.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ unacceptable_terms=(
# which is considered unacceptable by us.
exclude_files=(
CODE_OF_CONDUCT.md
*Config.swift
*Configuration.swift
)
for word in "${exclude_files[@]}"; do
exclude_files+=(":(exclude)$word")
Expand Down

0 comments on commit 1173b07

Please sign in to comment.