From e346edf87471b64f085b0eb6ac04d1e3a3213cb2 Mon Sep 17 00:00:00 2001 From: yoa1226 Date: Tue, 12 Nov 2024 18:58:14 +0800 Subject: [PATCH] kafka producer --- _posts/2024-11-03-kafka-producer.md | 757 ++++++++++++++++++++++++++++ 1 file changed, 757 insertions(+) create mode 100644 _posts/2024-11-03-kafka-producer.md diff --git a/_posts/2024-11-03-kafka-producer.md b/_posts/2024-11-03-kafka-producer.md new file mode 100644 index 0000000..22e59bb --- /dev/null +++ b/_posts/2024-11-03-kafka-producer.md @@ -0,0 +1,757 @@ +--- +layout: post +title: "kafka 消息生产过程" +date: 2024-11-3 11:00:00 +0200 +tags: [GC] +--- + + +Kafka 是一款非常优秀的消息中间件,流式处理平台。在本文中,我们将深入分析 Kafka 消息生产过程的源码实现,揭示生产者从消息生成到发送的完整流程。 本质旨在描述消息生产和传输的大致流程,具体诸多细节需要读者自行学习。 + +## 生产者 + +首先介绍 Kafka 生产者的核心组件,包括 ProducerConfig 的配置参数解析、消息累加器(RecordAccumulator)的缓冲机制、元数据管理、发送线程等。 + +实例代码: + +```java + +Properties props = new Properties(); + +props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); +props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); +props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaProducer<>(props); +``` + +上面是客户端使用的示例代码,首先需要创建配置,然后创建 kafka 客户端类 `KafkaProducer` 。 + +### ProducerConfig + +kafka 定义了非常多的配置满足使用者的定制化需求,[文档地址](https://kafka.apache.org/documentation.html#producerconfigs)。 + +生产者的配置定义在 `ProducerConfig` 类中,并且每个配置都有对应的文档说明。 + +```java +public static final String BATCH_SIZE_CONFIG = "batch.size"; + private static final String BATCH_SIZE_DOC = "The producer will attempt ....." +``` + +客户端配置的初始化本质是 `ProducerConfig` 类的初始化。 + +```java +return new KafkaProducer<>(props); + +//props -> Utils.propsToMap(properties) + +public KafkaProducer(Map configs, Serializer keySerializer, Serializer valueSerializer) { + this(new ProducerConfig(ProducerConfig.appendSerializerToConfig(configs, + keySerializer, valueSerializer)),..... ) +} + +public ProducerConfig(Map props) { super(CONFIG, props); } +``` + +调用父类初始化方法,`CONFIG` 是静态遍历,类加载时初始化,为每个配置的属性指定默认值。 + +单个配置由 `ConfigKey` 定义。 + +```java + CONFIG = new ConfigDef() + .define(BATCH_SIZE_CONFIG, Type.INT, 16384, atLeast(0), Importance.MEDIUM, BATCH_SIZE_DOC) + ..... + .defineXXX(); + +public static class ConfigKey { + public final String name; + public final Type type; + public final String documentation; + public final Object defaultValue; +} +``` + +在父类方法中将默认配置和用户定义配置进行合并。 + +```java +public AbstractConfig(ConfigDef definition, Map originals) { + this(definition, originals, Collections.emptyMap(), true); + } + +public AbstractConfig(ConfigDef definition, Map originals, Map configProviderProps, boolean doLog) { + this.values = definition.parse(this.originals); + this.values.putAll(configUpdates); + definition.parse(this.values); + this.definition = definition; +} +``` + +### 生产者初始化 + +下面是生产者初始化的核心方法,config 的值是上文刚刚创建的,time 的值是 `Time.SYSTEM`,其余参数均为 `null`。 + +```java +KafkaProducer(ProducerConfig config, + Serializer keySerializer, Serializer valueSerializer, + ProducerMetadata metadata, + KafkaClient kafkaClient, + ProducerInterceptors interceptors, + Time time); +``` + +`KafkaProducer` 有很多核心组件,首先简单罗列组件,使用时详细介绍实现原理。 + +#### 分区器 + +```java +this.partitioner = config.getConfiguredInstance( + ProducerConfig.PARTITIONER_CLASS_CONFIG, + Partitioner.class, + Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)); + +boolean enableAdaptivePartitioning = partitioner == null && + config.getBoolean(ProducerConfig.PARTITIONER_ADPATIVE_PARTITIONING_ENABLE_CONFIG); + +RecordAccumulator.PartitionerConfig partitionerConfig = +new RecordAccumulator.PartitionerConfig( enableAdaptivePartitioning, + config.getLong(ProducerConfig.PARTITIONER_AVAILABILITY_TIMEOUT_MS_CONFIG) +``` + +#### 序列化 + +```java +this.keySerializer = config.getConfiguredInstance( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Serializer.class); +``` + +```java +this.valueSerializer = config.getConfiguredInstance( + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Serializer.class); +``` +##### 拦截器 + +```java +List> interceptorList = ClientUtils.configuredInterceptors( + config, ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptor.class); +``` + +#### 压缩算法 + +```java +this.compression = configureCompression(config); +``` + +#### 累加器 + +```java +this.accumulator = new RecordAccumulator(logContext, + batchSize, compression, lingerMs(config), retryBackoffMs, + retryBackoffMaxMs, deliveryTimeoutMs, partitionerConfig, metrics, + PRODUCER_METRIC_GROUP_NAME, time, apiVersions, transactionManager, + new BufferPool(this.totalMemorySize, batchSize, metrics, time, PRODUCER_METRIC_GROUP_NAME)); +``` + + +#### 元数据管理 + +```java +List addresses = ClientUtils.parseAndValidateAddresses(config); +this.metadata = new ProducerMetadata(retryBackoffMs, + retryBackoffMaxMs, + config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), + config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG), + logContext, + clusterResourceListeners, + Time.SYSTEM); +this.metadata.bootstrap(addresses); +``` + +#### 发送者线程 + +```java +this.sender = newSender(logContext, kafkaClient, this.metadata); +String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId; +this.ioThread = new KafkaThread(ioThreadName, this.sender, true); +this.ioThread.start(); +``` + +#### 网络层客户端 + +```java +public static NetworkClient createNetworkClient(AbstractConfig config, + String clientId, + Metrics metrics, + String metricsGroupPrefix, + LogContext logContext, + ApiVersions apiVersions, + Time time,..); +``` + +## 元数据管理 + +### 元数据 + +客户端需要获取 kafka 的集群信息以便在发送数据的时候使用。这些数据包括集群节点信息、topic 、partition、partition、leader 等。 + +维护元数据的类并不做过多介绍,只需要知道这些数据需要在客户端维护,并且需要根据情况进行更新。 + +```java +public class Metadata implements Closeable { + private volatile MetadataSnapshot metadataSnapshot = MetadataSnapshot.empty(); + private List bootstrapAddresses; +} + +public class MetadataSnapshot { + private final String clusterId; + private final Map nodes; + private final Node controller; + private final Map metadataByPartition; + private final Map topicIds; + private final Map topicNames; + private Cluster clusterInstance; +} + +public final class Cluster { + private final List nodes; + private final Node controller; + private final Map partitionsByTopicPartition; + private final Map> partitionsByTopic; + private final Map> availablePartitionsByTopic; + private final Map> partitionsByNode; + private final Map nodesById; + private final ClusterResource clusterResource; + private final Map topicIds; + private final Map topicNames; +} + +public final class TopicPartition implements Serializable { + private final int partition; + private final String topic; +} +``` + +### 拉取元数据 + +#### 发送请求 + +在 `sendInternalMetadataRequest` 发送元数据同步请求。 + +```java +// -> Sender#run -> NetworkClient#poll -> +//-> NetworkClient.DefaultMetadataUpdater#maybeUpdate(long, org.apache.kafka.common.Node) +//-> NetworkClient#sendInternalMetadataRequest + +void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) { + ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true); + doSend(clientRequest, true, now); +} +``` + +`Selector#send` 将请求设置到 `KafkaChannel` 中等待发送。 + +```java +public void send(NetworkSend send){ + String connectionId = send.destinationId(); + KafkaChannel channel = openOrClosingChannelOrFail(connectionId); + channel.setSend(send); +} + +public void setSend(NetworkSend send) { + this.send = send; + this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); +} +``` + +`Selectable#poll` 会处理 channel 的事件将请求发送出去。 + +在 kafka 中,对所有请求进行了定义。 + +```java +public enum ApiKeys { + METADATA(ApiMessageType.METADATA), + //omit +} + +public Builder(MetadataRequestData data) { + super(ApiKeys.METADATA); + this.data = data; +} +``` + +#### 处理响应 + +`handleMetadataResponse` 对响应返回进行处理,最终数据都存储在 `MetadataSnapshot` 中。 + +```java +//-> NetworkClient#handleCompletedReceives +//-> NetworkClient.DefaultMetadataUpdater#handleSuccessfulResponse +//->Metadata#update + +public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) { + this.metadataSnapshot = handleMetadataResponse(response, isPartialUpdate, nowMs); +} + +private MetadataSnapshot handleMetadataResponse(MetadataResponse metadataResponse,...) { + for (MetadataResponse.TopicMetadata metadata : metadataResponse.topicMetadata()) { + String topicName = metadata.topic(); + Uuid topicId = metadata.topicId(); + for (MetadataResponse.PartitionMetadata partitionMetadata : metadata.partitionMetadata()) { + updateLatestMetadata(partitionMetadata, metadataResponse.hasReliableLeaderEpochs(), topicId, oldTopicId) + .ifPresent(partitions::add); + } + } +} +``` + +`PartitionMetadata` 存储了 partition信息,leader partition、ISR 等。 + +```java +public static class PartitionMetadata { + public final TopicPartition topicPartition; + public final Optional leaderId; + public final Optional leaderEpoch; + public final List replicaIds; + public final List inSyncReplicaIds; + public final List offlineReplicaIds; +} +``` + +#### 服务端 + +当请求发送出去,broker 作为服务端就开始处理请求。`KafkaApis#handle` 是所有请求的统一入口。 + +`KafkaApis#handleTopicMetadataRequest` 获取元数据并且返回给客户端。 + + +```scala +//kafka.server.KafkaApis#handle +def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + request.header.apiKey match { + case ApiKeys.METADATA => handleTopicMetadataRequest(request) + } +} + +def handleTopicMetadataRequest(request: RequestChannel.Request): Unit = { + val topicMetadata = getTopicMetadata(request, metadataRequest.isAllTopics, allowAutoCreation, authorizedTopics, + request.context.listenerName, errorUnavailableEndpoints, errorUnavailableListeners) + + val brokers = metadataCache.getAliveBrokerNodes(request.context.listenerName) + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + MetadataResponse.prepareResponse( + brokers.toList.asJava, + clusterId,...... + completeTopicMetadata.asJava, + )) + } +``` + +各种代码细节在后文介绍。 + +## 消息累加器 + +`KafkaProducer#doSend` 是消息发送的逻辑,但是实际这里是指把消息发送到累加器缓冲区。 + +```java +private Future doSend(ProducerRecord record, Callback callback) { + //刷新元数据 + clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs); + + //计算 partition + int partition = partition(record, serializedKey, serializedValue, cluster); + + //将消息添加到累加器 + RecordAccumulator.RecordAppendResult result = accumulator.append(record.topic(), partition, timestamp, serializedKey, + serializedValue, headers, appendCallbacks, remainingWaitMs, abortOnNewBatch, nowMs, cluster); + + // 唤醒发送线程 + if (result.batchIsFull || result.newBatchCreated) { + this.sender.wakeup(); + } +} +``` + +### 计算 partition + +partition 由四种方式可以计算: + +1. 应用程序已经明确指定。 +2. 应用程序通过 partitioner 指定。 +3. 通过 serializedKey 计算。 +4. 默认,RecordMetadata.UNKNOWN_PARTITION,累加器通过原生逻辑指定。 + +>Try to calculate partition, but note that after this call it can be RecordMetadata.UNKNOWN_PARTITION, which means that the RecordAccumulator would pick a partition using built-in logic (which may take into account broker load, the amount of data produced to each partition, etc.). + +```java +private int partition(ProducerRecord record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) { + if (record.partition() != null) + return record.partition(); + + if (partitioner != null) { + int customPartition = partitioner.partition( + record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); + if (customPartition < 0) { + throw new IllegalArgumentException(String.format(...)); + } + return customPartition; + } + + if (serializedKey != null && !partitionerIgnoreKeys) { + // hash the keyBytes to choose a partition + return BuiltInPartitioner.partitionForKey(serializedKey, cluster.partitionsForTopic(record.topic()).size()); + } else { + return RecordMetadata.UNKNOWN_PARTITION; + } +} +``` + +### 累加器结构 + +在累加器中,根据 topic 和 partition 对消息分门别类进行累加,最终储存消息的是 `ProducerBatch`。累加器缓冲区是一个双端队列。 + +```java +public class RecordAccumulator { + private final ConcurrentMap topicInfoMap = + new CopyOnWriteMap<>(); +} + +private static class TopicInfo { + public final ConcurrentMap> batches = new CopyOnWriteMap<>(); + + public final BuiltInPartitioner builtInPartitioner; + public TopicInfo(BuiltInPartitioner builtInPartitioner) { + this.builtInPartitioner = builtInPartitioner; + } +} +``` + +### 累加消息 + +`append` 通过 topic 和 partition 获取到 `Deque` 。 + +`tryAppend` 找到 `ProducerBatch`。 + +```java +public RecordAppendResult append(String topic, int partition, long timestamp, + byte[] key, byte[] value,...){ + //获取 topic + TopicInfo topicInfo = topicInfoMap.computeIfAbsent(topic, ......); + while(true){ + effectivePartition = partition; + Deque dq = topicInfo.batches.computeIfAbsent(effectivePartition, k -> new ArrayDeque<>()); + + synchronized (dq) { + RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callbacks, dq, nowMs); + } + } +} + +private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers, + Callback callback, Deque deque, long nowMs) { + ProducerBatch last = deque.peekLast(); + if (last != null) { + int initialBytes = last.estimatedSizeInBytes(); + FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, nowMs); + int appendedBytes = last.estimatedSizeInBytes() - initialBytes; + return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false, false, appendedBytes); + } + return null; +} +``` + +通过 `writeTo` 将 key、value、offsetDelta、timestampDelta 依次写入到缓冲区。 + +`appendStream` 理解成一段连续的字节数组。 + +```java +//-> ProducerBatch#tryAppend -> MemoryRecordsBuilder#append() +//-> MemoryRecordsBuilder#appendWithOffset() +//->MemoryRecordsBuilder#appendDefaultRecord +private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, + Header[] headers) throws IOException { + int offsetDelta = (int) (offset - baseOffset); + long timestampDelta = timestamp - baseTimestamp; + int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); + recordWritten(offset, timestamp, sizeInBytes); +} + +public static int writeTo(DataOutputStream out, int offsetDelta, + long timestampDelta, ByteBuffer key, + ByteBuffer value,){ + ByteUtils.writeVarlong(timestampDelta, out); + ByteUtils.writeVarint(offsetDelta, out); + + int keySize = key.remaining(); + ByteUtils.writeVarint(keySize, out); + Utils.writeTo(out, key, keySize); + + int valueSize = value.remaining(); + ByteUtils.writeVarint(valueSize, out); + Utils.writeTo(out, value, valueSize); + + ByteUtils.writeVarint(headers.length, out); + + for (Header header : headers) { + ByteUtils.writeVarint(utf8Bytes.length, out); + out.write(utf8Bytes); + out.write(headerValue); + } +} +``` + +空间不足添加失败时,需要新创建 ProducerBatch 对象。 + +```java +int size = Math.max(this.batchSize,....); +buffer = free.allocate(size, maxTimeToBlock); + +private RecordAppendResult appendNewBatch(String topic, + int partition, + Deque dq,...){ + //omit + MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, apiVersions.maxUsableProduceMagic()); + + ProducerBatch batch = new ProducerBatch(new TopicPartition(topic, partition), recordsBuilder, nowMs); + + FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers, + callbacks, nowMs)); +} +``` + +最后唤醒发送线程 + +```java +if (result.batchIsFull || result.newBatchCreated) { + this.sender.wakeup(); +} +``` + +## 发送线程 + +发送线程循环处理可发送的数据。 + +```java +public class Sender implements Runnable { + public void run() { + while (running) { + runOnce(); + } + } +} + +void runOnce() { + //.... + long pollTimeout = sendProducerData(currentTimeMs); + client.poll(pollTimeout, currentTimeMs); +} +``` + + + +### 目标服务器 + +通过遍历累加器找到目标 partition 的leader 节点。 + +```java +//-> Sender#sendProducerData-> RecordAccumulator#ready + public ReadyCheckResult ready(MetadataSnapshot metadataSnapshot, long nowMs) { + Set readyNodes = new HashSet<>(); + for (Map.Entry topicInfoEntry : this.topicInfoMap.entrySet()) { + final String topic = topicInfoEntry.getKey(); + nextReadyCheckDelayMs = partitionReady(metadataSnapshot, nowMs, topic, topicInfoEntry.getValue(), nextReadyCheckDelayMs, readyNodes, unknownLeaderTopics); + } + return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics); +} + +private long partitionReady(MetadataSnapshot metadataSnapshot, long nowMs, String topic,...){ + ConcurrentMap> batches = topicInfo.batches; + for (Map.Entry> entry : batches.entrySet()) { + TopicPartition part = new TopicPartition(topic, entry.getKey()); + Node leader = metadataSnapshot.cluster().leaderFor(part); + //..... + readyNodes.add(leader); + } +} +``` + +为目标 leader 节点所在的 broker 准备好连接。 + +```java +Iterator iter = result.readyNodes.iterator(); +while (iter.hasNext()) { + Node node = iter.next(); + if (!this.client.ready(node, now)) { + //.. + } + } +``` + +### 收集批量数据 + +以 broker 为维度收集发送数据。 + +```java +public Map> drain(MetadataSnapshot metadataSnapshot, Set nodes, int maxSize, long now) { + Map> batches = new HashMap<>(); + for (Node node : nodes) { + List ready = drainBatchesForOneNode(metadataSnapshot, node, maxSize, now); + batches.put(node.id(), ready); + } + return batches; +} +``` + +### 写入数据到缓冲区 + +`ProduceRequest` 包装批量数据为请求结构,`callback`绑定回调用函数,`ApiKeys.PRODUCE` 指定 api 类型,`client.send` 发送请求。 + +```java +private void sendProduceRequests(Map> collated, long now) { + for (Map.Entry> entry : collated.entrySet()) + sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue()); +} + +private void sendProduceRequest(....., List batches) { + ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, + new ProduceRequestData() .setTopicData(tpd)); + RequestCompletionHandler callback = ....; + ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, ....., callback); + client.send(clientRequest, now); +} + +public static class Builder extends AbstractRequest.Builder { + private final ProduceRequestData data; + public Builder(..... ProduceRequestData data) { + super(ApiKeys.PRODUCE, minVersion, maxVersion); + this.data = data; + } +} +``` + +最后调用 `doSend` 将数据设置到传输层,绑定可写事件,等待被发送到 broker。 + +```java +private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) { + String destination = clientRequest.destination(); + RequestHeader header = clientRequest.makeHeader(request.version()); + Send send = request.toSend(header); + selector.send(new NetworkSend(clientRequest.destination(), send)); +} + +public void send(NetworkSend send) { + String connectionId = send.destinationId(); + KafkaChannel channel = openOrClosingChannelOrFail(connectionId); + channel.setSend(send); +} + +public void setSend(NetworkSend send) { + this.send = send; + this.transportLayer.addInterestOps(SelectionKey.OP_WRITE); +} +``` + +## 服务端处理消息 + +服务端处理消息的是 `handleProduceRequest`,遍历 `entriesPerPartition` 找到对应的 partition 添加数据。 + +```scala +override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + request.header.apiKey match { + //.... + case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal) + } +} + +//->ReplicaManager#handleProduceAppend +//->ReplicaManager#appendRecords +//->ReplicaManager#appendToLocalLog + private def appendToLocalLog(....){ + entriesPerPartition.map { case (topicPartition, records) => + val partition = getPartitionOrException(topicPartition) + val info = partition.appendRecordsToLeader(records, origin,....) + } + } + +def appendRecordsToLeader(....){ + val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch,....) +} +``` + +`maybeRoll` 决定是否创建新的日志段。 + +`LogSegment#append` 添加消息到文件中,并且根据情况更新索引文件。 + +`log` 是 `FileRecords`,是对文件的包装。 + +```java +//->UnifiedLog#append + val segment = maybeRoll(validRecords.sizeInBytes, appendInfo) + +//-> LogSegment#append +public void append(long largestOffset, long largestTimestampMs, long shallowOffsetOfMaxTimestamp, MemoryRecords records) { + + long appendedBytes = log.append(records); + + offsetIndex().append(largestOffset, physicalPosition); + timeIndex().maybeAppend(maxTimestampSoFar(), shallowOffsetOfMaxTimestampSoFar()); +} + +public static FileRecords open(File file, + boolean mutable, + boolean fileAlreadyExists, + int initFileSize, + boolean preallocate) throws IOException { + FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); + int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; + return new FileRecords(file, channel, 0, end, false); +} +``` + +索引文件的写入 + +```java +//OffsetIndex#append +public void append(long offset, int position) { + mmap().putInt(relativeOffset(offset)); + mmap().putInt(position); +} + +//TimeIndex#maybeAppend(long, long, boolean) +public void maybeAppend(long timestamp, long offset, boolean skipFullCheck) { + MappedByteBuffer mmap = mmap(); + mmap.putLong(timestamp); + mmap.putInt(relativeOffset(offset)); +} +``` + +## 总结 + +本文概述了 Kafka 消息生产流程,从 Producer 初始化到消息发送及服务端接收的关键环节。首先,生产者通过配置初始化,包括分区器、序列化器组件,并利用累加器进行消息批量处理。元数据管理模块确保生产者获取最新的集群信息,而发送线程和网络客户端负责与服务端通信。生产者根据分区信息将消息写入目标服务器缓冲区,并等待响应。服务端处理并确认消息,以保证高效、可靠的数据传输。此流程实现了 Kafka 高吞吐、低延迟的消息传递机制。 + + + + + + + + + + + + + + + + + + + + + + + + + + + +