diff --git a/README.md b/README.md index 425f01d2..6fa11f5a 100644 --- a/README.md +++ b/README.md @@ -9,7 +9,7 @@ SwiftKafka is a Swift Package in development that provides a convenient way to c The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error. ```swift -let config = KafkaProducerConfig(bootstrapServers: ["localhost:9092"]) +let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"]) let (producer, acknowledgements) = try await KafkaProducer.makeProducerWithAcknowledgements( config: config, @@ -47,7 +47,7 @@ await withThrowingTaskGroup(of: Void.self) { group in After initializing the `KafkaConsumer` with a topic-partition pair to read from, messages can be consumed using the `messages` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). ```swift -let config = KafkaConsumerConfig( +let config = KafkaConsumerConfiguration( consumptionStrategy: .partition( topic: "topic-name", partition: KafkaPartition(rawValue: 0) @@ -75,7 +75,7 @@ for await messageResult in consumer.messages { SwiftKafka also allows users to subscribe to an array of topics as part of a consumer group. ```swift -let config = KafkaConsumerConfig( +let config = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]), bootstrapServers: ["localhost:9092"] ) @@ -100,7 +100,7 @@ for await messageResult in consumer.messages { By default, the `KafkaConsumer` automatically commits message offsets after receiving the corresponding message. However, we allow users to disable this setting and commit message offsets manually. ```swift -let config = KafkaConsumerConfig( +let config = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "example-group", topics: ["topic-name"]), enableAutoCommit: false, bootstrapServers: ["localhost:9092"] diff --git a/Sources/SwiftKafka/Configuration/KafkaConsumerConfig.swift b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift similarity index 99% rename from Sources/SwiftKafka/Configuration/KafkaConsumerConfig.swift rename to Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift index 37ca0a35..cb1cbd34 100644 --- a/Sources/SwiftKafka/Configuration/KafkaConsumerConfig.swift +++ b/Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift @@ -15,7 +15,7 @@ import Crdkafka import struct Foundation.UUID -public struct KafkaConsumerConfig: Hashable, Equatable { +public struct KafkaConsumerConfiguration: Hashable, Equatable { // MARK: - SwiftKafka-specific Config properties /// The backpressure strategy to be used for message consumption. diff --git a/Sources/SwiftKafka/Configuration/KafkaProducerConfig.swift b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift similarity index 99% rename from Sources/SwiftKafka/Configuration/KafkaProducerConfig.swift rename to Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift index 16287453..ab2fa6dc 100644 --- a/Sources/SwiftKafka/Configuration/KafkaProducerConfig.swift +++ b/Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift @@ -12,7 +12,7 @@ // //===----------------------------------------------------------------------===// -public struct KafkaProducerConfig: Hashable, Equatable { +public struct KafkaProducerConfiguration: Hashable, Equatable { var dictionary: [String: String] = [:] // MARK: - Producer-specific Config Properties diff --git a/Sources/SwiftKafka/Configuration/KafkaTopicConfig.swift b/Sources/SwiftKafka/Configuration/KafkaTopicConfiguration.swift similarity index 99% rename from Sources/SwiftKafka/Configuration/KafkaTopicConfig.swift rename to Sources/SwiftKafka/Configuration/KafkaTopicConfiguration.swift index 8226915d..d0a12b99 100644 --- a/Sources/SwiftKafka/Configuration/KafkaTopicConfig.swift +++ b/Sources/SwiftKafka/Configuration/KafkaTopicConfiguration.swift @@ -13,7 +13,7 @@ //===----------------------------------------------------------------------===// /// Used to configure new topics created by the ``KafkaProducer``. -public struct KafkaTopicConfig: Hashable, Equatable { +public struct KafkaTopicConfiguration: Hashable, Equatable { var dictionary: [String: String] = [:] /// This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, -1 or all=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than min.insync.replicas (broker configuration) in the ISR set the produce request will fail. diff --git a/Sources/SwiftKafka/KafkaConsumer.swift b/Sources/SwiftKafka/KafkaConsumer.swift index cb226406..4566d08c 100644 --- a/Sources/SwiftKafka/KafkaConsumer.swift +++ b/Sources/SwiftKafka/KafkaConsumer.swift @@ -58,7 +58,7 @@ public struct ConsumerMessagesAsyncSequence: AsyncSequence { /// Receive messages from the Kafka cluster. public final class KafkaConsumer { /// The configuration object of the consumer client. - private var config: KafkaConsumerConfig + private var config: KafkaConsumerConfiguration /// A logger. private let logger: Logger /// Used for handling the connection to the Kafka cluster. @@ -85,11 +85,11 @@ public final class KafkaConsumer { /// Initialize a new ``KafkaConsumer``. /// To listen to incoming messages, please subscribe to a list of topics using ``subscribe(topics:)`` /// or assign the consumer to a particular topic + partition pair using ``assign(topic:partition:offset:)``. - /// - Parameter config: The ``KafkaConsumerConfig`` for configuring the ``KafkaConsumer``. + /// - Parameter config: The ``KafkaConsumerConfiguration`` for configuring the ``KafkaConsumer``. /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if the initialization failed. public init( - config: KafkaConsumerConfig, + config: KafkaConsumerConfiguration, logger: Logger ) throws { self.config = config diff --git a/Sources/SwiftKafka/KafkaProducer.swift b/Sources/SwiftKafka/KafkaProducer.swift index 0efc6c95..45a0eb7f 100644 --- a/Sources/SwiftKafka/KafkaProducer.swift +++ b/Sources/SwiftKafka/KafkaProducer.swift @@ -38,7 +38,7 @@ public struct KafkaMessageAcknowledgements: AsyncSequence { /// Send messages to the Kafka cluster. /// Please make sure to explicitly call ``shutdownGracefully(timeout:)`` when the ``KafkaProducer`` is not used anymore. -/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfig`` +/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfiguration`` /// configuration object (only works if server has `auto.create.topics.enable` property set). public actor KafkaProducer { /// States that the ``KafkaProducer`` can have. @@ -58,8 +58,8 @@ public actor KafkaProducer { /// Counter that is used to assign each message a unique ID. /// Every time a new message is sent to the Kafka cluster, the counter is increased by one. private var messageIDCounter: UInt = 0 - /// The ``TopicConfig`` used for newly created topics. - private let topicConfig: KafkaTopicConfig + /// The ``TopicConfiguration`` used for newly created topics. + private let topicConfig: KafkaTopicConfiguration /// A logger. private let logger: Logger /// Dictionary containing all topic names with their respective `rd_kafka_topic_t` pointer. @@ -70,13 +70,13 @@ public actor KafkaProducer { // Private initializer, use factory methods to create KafkaProducer /// Initialize a new ``KafkaProducer``. - /// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``. - /// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics. + /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. /// - Parameter logger: A logger. /// - Throws: A ``KafkaError`` if initializing the producer failed. private init( client: KafkaClient, - topicConfig: KafkaTopicConfig, + topicConfig: KafkaTopicConfiguration, logger: Logger ) async throws { self.client = client @@ -90,14 +90,14 @@ public actor KafkaProducer { /// /// This factory method creates a producer without message acknowledgements. /// - /// - Parameter configuration: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``. - /// - Parameter topicConfiguration: The ``KafkaTopicConfig`` used for newly created topics. + /// - Parameter configuration: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfiguration: The ``KafkaTopicConfiguration`` used for newly created topics. /// - Parameter logger: A logger. /// - Returns: The newly created ``KafkaProducer``. /// - Throws: A ``KafkaError`` if initializing the producer failed. public static func makeProducer( - config: KafkaProducerConfig = KafkaProducerConfig(), - topicConfig: KafkaTopicConfig = KafkaTopicConfig(), + config: KafkaProducerConfiguration = KafkaProducerConfiguration(), + topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(), logger: Logger ) async throws -> KafkaProducer { let client = try RDKafka.createClient( @@ -124,15 +124,15 @@ public actor KafkaProducer { /// /// - Important: When the asynchronous sequence is deinited the producer will be shutdown. /// - /// - Parameter config: The ``KafkaProducerConfig`` for configuring the ``KafkaProducer``. - /// - Parameter topicConfig: The ``KafkaTopicConfig`` used for newly created topics. + /// - Parameter config: The ``KafkaProducerConfiguration`` for configuring the ``KafkaProducer``. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used for newly created topics. /// - Parameter logger: A logger. /// - Returns: A tuple containing the created ``KafkaProducer`` and the ``KafkaMessageAcknowledgements`` /// `AsyncSequence` used for receiving message acknowledgements. /// - Throws: A ``KafkaError`` if initializing the producer failed. public static func makeProducerWithAcknowledgements( - config: KafkaProducerConfig = KafkaProducerConfig(), - topicConfig: KafkaTopicConfig = KafkaTopicConfig(), + config: KafkaProducerConfiguration = KafkaProducerConfiguration(), + topicConfig: KafkaTopicConfiguration = KafkaTopicConfiguration(), logger: Logger ) async throws -> (KafkaProducer, KafkaMessageAcknowledgements) { var streamContinuation: AsyncStream>.Continuation? diff --git a/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift b/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift index d07b388b..17765a57 100644 --- a/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift +++ b/Sources/SwiftKafka/RDKafka/RDKafkaTopicConfig.swift @@ -17,10 +17,10 @@ import Crdkafka /// A collection of helper functions wrapping common `rd_kafka_topic_conf_*` functions in Swift. struct RDKafkaTopicConfig { /// Create a new `rd_kafka_topic_conf_t` object in memory and initialize it with the given configuration properties. - /// - Parameter topicConfig: The ``KafkaTopicConfig`` used to initialize the `rd_kafka_topic_conf_t` object. + /// - Parameter topicConfig: The ``KafkaTopicConfiguration`` used to initialize the `rd_kafka_topic_conf_t` object. /// - Returns: An `OpaquePointer` pointing to the newly created `rd_kafka_topic_conf_t` object in memory. /// - Throws: A ``KafkaError`` if setting a config value failed. - static func createFrom(topicConfig: KafkaTopicConfig) throws -> OpaquePointer { + static func createFrom(topicConfig: KafkaTopicConfiguration) throws -> OpaquePointer { let configPointer: OpaquePointer = rd_kafka_topic_conf_new() try topicConfig.dictionary.forEach { key, value in try Self.set(configPointer: configPointer, key: key, value: value) diff --git a/Tests/IntegrationTests/SwiftKafkaTests.swift b/Tests/IntegrationTests/SwiftKafkaTests.swift index 1a84e7dc..23953c0c 100644 --- a/Tests/IntegrationTests/SwiftKafkaTests.swift +++ b/Tests/IntegrationTests/SwiftKafkaTests.swift @@ -34,18 +34,18 @@ final class SwiftKafkaTests: XCTestCase { let kafkaHost = ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost" let kafkaPort = ProcessInfo.processInfo.environment["KAFKA_PORT"] ?? "9092" var bootstrapServer: String! - var producerConfig: KafkaProducerConfig! + var producerConfig: KafkaProducerConfiguration! var uniqueTestTopic: String! override func setUpWithError() throws { self.bootstrapServer = "\(self.kafkaHost):\(self.kafkaPort)" - self.producerConfig = KafkaProducerConfig( + self.producerConfig = KafkaProducerConfiguration( bootstrapServers: [self.bootstrapServer], brokerAddressFamily: .v4 ) - let basicConfig = KafkaConsumerConfig( + let basicConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "no-group", topics: []), bootstrapServers: [self.bootstrapServer], brokerAddressFamily: .v4 @@ -57,7 +57,7 @@ final class SwiftKafkaTests: XCTestCase { } override func tearDownWithError() throws { - let basicConfig = KafkaConsumerConfig( + let basicConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "no-group", topics: []), bootstrapServers: [self.bootstrapServer], brokerAddressFamily: .v4 @@ -94,7 +94,7 @@ final class SwiftKafkaTests: XCTestCase { // Consumer Task group.addTask { - let consumerConfig = KafkaConsumerConfig( + let consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "subscription-test-group-id", topics: [self.uniqueTestTopic]), autoOffsetReset: .beginning, // Always read topics from beginning bootstrapServers: [self.bootstrapServer], @@ -151,7 +151,7 @@ final class SwiftKafkaTests: XCTestCase { // Consumer Task group.addTask { - let consumerConfig = KafkaConsumerConfig( + let consumerConfiguration = KafkaConsumerConfiguration( consumptionStrategy: .partition( topic: self.uniqueTestTopic, partition: KafkaPartition(rawValue: 0), @@ -163,7 +163,7 @@ final class SwiftKafkaTests: XCTestCase { ) let consumer = try KafkaConsumer( - config: consumerConfig, + config: consumerConfiguration, logger: .kafkaTest ) @@ -212,7 +212,7 @@ final class SwiftKafkaTests: XCTestCase { // Consumer Task group.addTask { - let consumerConfig = KafkaConsumerConfig( + let consumerConfig = KafkaConsumerConfiguration( consumptionStrategy: .group(groupID: "commit-sync-test-group-id", topics: [self.uniqueTestTopic]), enableAutoCommit: false, autoOffsetReset: .beginning, // Always read topics from beginning diff --git a/Tests/SwiftKafkaTests/KafkaProducerTests.swift b/Tests/SwiftKafkaTests/KafkaProducerTests.swift index 507fc107..5763a68a 100644 --- a/Tests/SwiftKafkaTests/KafkaProducerTests.swift +++ b/Tests/SwiftKafkaTests/KafkaProducerTests.swift @@ -35,12 +35,12 @@ final class KafkaProducerTests: XCTestCase { let kafkaHost = ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost" let kafkaPort = ProcessInfo.processInfo.environment["KAFKA_PORT"] ?? "9092" var bootstrapServer: String! - var config: KafkaProducerConfig! + var config: KafkaProducerConfiguration! override func setUpWithError() throws { self.bootstrapServer = "\(self.kafkaHost):\(self.kafkaPort)" - self.config = KafkaProducerConfig( + self.config = KafkaProducerConfiguration( bootstrapServers: [self.bootstrapServer], brokerAddressFamily: .v4 ) diff --git a/scripts/soundness.sh b/scripts/soundness.sh index 2d30f5dd..49e8e184 100755 --- a/scripts/soundness.sh +++ b/scripts/soundness.sh @@ -36,7 +36,7 @@ unacceptable_terms=( # which is considered unacceptable by us. exclude_files=( CODE_OF_CONDUCT.md - *Config.swift + *Configuration.swift ) for word in "${exclude_files[@]}"; do exclude_files+=(":(exclude)$word")