From a35bd8d75469c8aa02246923aa951cc171f46217 Mon Sep 17 00:00:00 2001 From: Konstantin Ivanov Date: Fri, 20 Dec 2024 23:44:30 +0300 Subject: [PATCH] (#1308) Rename *Id -> *ID, following Go naming convention. --- batch.go | 2 +- commit.go | 4 ++-- conn.go | 10 +++++----- message.go | 2 +- reader.go | 18 +++++++++--------- reader_test.go | 18 +++++++++--------- 6 files changed, 27 insertions(+), 27 deletions(-) diff --git a/batch.go b/batch.go index f0a26138..0e306225 100644 --- a/batch.go +++ b/batch.go @@ -233,7 +233,7 @@ func (batch *Batch) ReadMessage() (Message, error) { msg.Time = makeTime(timestamp) msg.Headers = headers if batch.conn != nil { - msg.GenerationId = batch.conn.generationId + msg.GenerationID = batch.conn.generationID } return msg, err diff --git a/commit.go b/commit.go index fc5c9b27..7d4242d5 100644 --- a/commit.go +++ b/commit.go @@ -6,7 +6,7 @@ type commit struct { topic string partition int offset int64 - generationId int32 + generationID int32 } // makeCommit builds a commit value from a message, the resulting commit takes @@ -16,7 +16,7 @@ func makeCommit(msg Message) commit { topic: msg.Topic, partition: msg.Partition, offset: msg.Offset + 1, - generationId: msg.GenerationId, + generationID: msg.GenerationID, } } diff --git a/conn.go b/conn.go index 9f66e960..d984d582 100644 --- a/conn.go +++ b/conn.go @@ -14,13 +14,13 @@ import ( "time" ) +const undefinedGenerationID int32 = -1 + var ( errInvalidWriteTopic = errors.New("writes must NOT set Topic on kafka.Message") errInvalidWritePartition = errors.New("writes must NOT set Partition on kafka.Message") ) -const undefinedGenerationId int32 = -1 - // Conn represents a connection to a kafka broker. // // Instances of Conn are safe to use concurrently from multiple goroutines. @@ -68,7 +68,7 @@ type Conn struct { transactionalID *string - generationId int32 + generationID int32 } type apiVersionMap map[apiKey]ApiVersion @@ -186,7 +186,7 @@ func NewConnWith(conn net.Conn, config ConnConfig) *Conn { offset: FirstOffset, requiredAcks: -1, transactionalID: emptyToNullable(config.TransactionalID), - generationId: undefinedGenerationId, + generationID: undefinedGenerationID, } c.wb.w = &c.wbuf @@ -393,7 +393,7 @@ func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error return joinGroupResponseV1{}, Error(response.ErrorCode) } - c.generationId = response.GenerationID + c.generationID = response.GenerationID return response, nil } diff --git a/message.go b/message.go index 2be3db7f..934b0687 100644 --- a/message.go +++ b/message.go @@ -22,7 +22,7 @@ type Message struct { // If the message has been sent by a consumer group, it contains the // generation's id. Value is -1 if not using consumer groups. - GenerationId int32 + GenerationID int32 // This field is used to hold arbitrary data you wish to include, so it // will be available when handle it on the Writer's `Completion` method, diff --git a/reader.go b/reader.go index 70855e60..cb543c15 100644 --- a/reader.go +++ b/reader.go @@ -121,7 +121,7 @@ func (r *Reader) unsubscribe() { // another consumer to avoid such a race. } -func (r *Reader) subscribe(generationId int32, allAssignments map[string][]PartitionAssignment) { +func (r *Reader) subscribe(generationID int32, allAssignments map[string][]PartitionAssignment) { offsets := make(map[topicPartition]int64) for topic, assignments := range allAssignments { for _, assignment := range assignments { @@ -134,7 +134,7 @@ func (r *Reader) subscribe(generationId int32, allAssignments map[string][]Parti } r.mutex.Lock() - r.start(generationId, offsets) + r.start(generationID, offsets) r.mutex.Unlock() r.withLogger(func(l Logger) { @@ -215,7 +215,7 @@ func (o offsetStash) merge(commits []commit) { if offset, ok := offsetsByPartition[c.partition]; !ok || c.offset > offset.offset { offsetsByPartition[c.partition] = offsetEntry{ offset: c.offset, - generationID: c.generationId, + generationID: c.generationID, } } } @@ -874,7 +874,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { r.mutex.Lock() if !r.closed && r.version == 0 { - r.start(undefinedGenerationId, r.getTopicPartitionOffset()) + r.start(undefinedGenerationID, r.getTopicPartitionOffset()) } version := r.version @@ -1095,7 +1095,7 @@ func (r *Reader) SetOffset(offset int64) error { r.offset = offset if r.version != 0 { - r.start(undefinedGenerationId, r.getTopicPartitionOffset()) + r.start(undefinedGenerationID, r.getTopicPartitionOffset()) } r.activateReadLag() @@ -1233,7 +1233,7 @@ func (r *Reader) readLag(ctx context.Context) { } } -func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition]int64) { +func (r *Reader) start(generationID int32, offsetsByPartition map[topicPartition]int64) { if r.closed { // don't start child reader if parent Reader is closed return @@ -1271,7 +1271,7 @@ func (r *Reader) start(generationId int32, offsetsByPartition map[topicPartition // backwards-compatibility flags offsetOutOfRangeError: r.config.OffsetOutOfRangeError, - }).run(ctx, generationId, offset) + }).run(ctx, generationID, offset) }(ctx, key, offset, &r.join) } } @@ -1308,7 +1308,7 @@ type readerMessage struct { error error } -func (r *reader) run(ctx context.Context, generationId int32, offset int64) { +func (r *reader) run(ctx context.Context, generationID int32, offset int64) { // This is the reader's main loop, it only ends if the context is canceled // and will keep attempting to reader messages otherwise. // @@ -1361,7 +1361,7 @@ func (r *reader) run(ctx context.Context, generationId int32, offset int64) { } continue } - conn.generationId = generationId + conn.generationID = generationID // Resetting the attempt counter ensures that if a failure occurs after // a successful initialization we don't keep increasing the backoff diff --git a/reader_test.go b/reader_test.go index 2f0a8c65..a3ee2754 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1462,20 +1462,20 @@ func TestCommitOffsetsWithRetry(t *testing.T) { Offsets offsetStash Config ReaderConfig ExpectedOffsets offsetStash - GenerationId int32 + GenerationID int32 }{ "happy path": { Invocations: 1, Offsets: offsets(), ExpectedOffsets: offsets(), - GenerationId: 1, + GenerationID: 1, }, "1 retry": { Fails: 1, Invocations: 2, Offsets: offsets(), ExpectedOffsets: offsets(), - GenerationId: 1, + GenerationID: 1, }, "out of retries": { Fails: defaultCommitRetries + 1, @@ -1483,7 +1483,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) { HasError: true, Offsets: offsets(), ExpectedOffsets: offsets(), - GenerationId: 1, + GenerationID: 1, }, "illegal generation error only 1 generation": { Fails: 1, @@ -1491,7 +1491,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) { Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 1}}}, ExpectedOffsets: offsetStash{}, Config: ReaderConfig{ErrorOnWrongGenerationCommit: false}, - GenerationId: 2, + GenerationID: 2, }, "illegal generation error only 2 generations": { Fails: 1, @@ -1499,7 +1499,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) { Offsets: offsetStash{"topic": {0: {0, 1}, 1: {0, 2}}}, ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}}, Config: ReaderConfig{ErrorOnWrongGenerationCommit: false}, - GenerationId: 2, + GenerationID: 2, }, "illegal generation error only 1 generation - error propagation": { Fails: 1, @@ -1508,7 +1508,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) { ExpectedOffsets: offsetStash{}, Config: ReaderConfig{ErrorOnWrongGenerationCommit: true}, HasError: true, - GenerationId: 2, + GenerationID: 2, }, "illegal generation error only 2 generations - error propagation": { Fails: 1, @@ -1517,7 +1517,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) { ExpectedOffsets: offsetStash{"topic": {1: {0, 2}}}, Config: ReaderConfig{ErrorOnWrongGenerationCommit: true}, HasError: true, - GenerationId: 2, + GenerationID: 2, }, } @@ -1530,7 +1530,7 @@ func TestCommitOffsetsWithRetry(t *testing.T) { offsetCommitFunc: func(r offsetCommitRequestV2) (offsetCommitResponseV2, error) { requests = append(requests, r) count++ - if r.GenerationID != test.GenerationId { + if r.GenerationID != test.GenerationID { return offsetCommitResponseV2{}, IllegalGeneration } if count <= test.Fails {