Skip to content

Commit

Permalink
add stability status, add rack option to consuming
Browse files Browse the repository at this point in the history
  • Loading branch information
twmb committed Jun 30, 2020
1 parent e94bc96 commit 2c931c6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 1 deletion.
26 changes: 25 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,30 @@ options, and a complete Kafka administration interface.

[1]: https://github.com/edenhill/kafkacat

## Stability status

I consider the current API **relatively** stable. Once this hits a 1.x release,
the API will be even more stable. I would like to get some feedback / definitive
usage of the client other than just myself before deeming things unchanging.
As it stands, I know the ins and outs of the client, so it is too easy for me
to avoid what may be knife edges for other people.

I am fairly confident in the correctness of the administrative APIs, since they
are very easy to implement. I am mostly confident in the correctness of
producing, consuming, and transacting. I've spent a good amount of time
integration testing my [kafka-go][2] client that this program uses. The main
thing I have currently been unable to test is closest replica fetching, which
is only theoretically supported. It is worth it to read the stability status in
the kafka-go repo as well if using this client.

[2]: https://github.com/twmb/kafka-go/

In effect, consider this a **beta++**. Again, this is a bit more than a beta
because the administrative APIs are relatively sound. I would love confirmation
that this program has been used successfully, and would love to start a "Users"
section below. With more confirmation of success, and confirmation that there
are no knife edges, I will inch closer to a 1.x release.

## Configuration

kcl supports configuration through a config file, environment variables, and
Expand All @@ -26,7 +50,7 @@ and a timeout for requests that take timeouts.

Thanks to [cobra][2], autocompletion exists for bash, zsh, and powershell.

[2]: https://github.com/spf13/cobra
[3]: https://github.com/spf13/cobra

As an example of what to put in your .bashrc,

Expand Down
1 change: 1 addition & 0 deletions commands/consume/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ func (c *consumption) command() *cobra.Command {
cmd.Flags().BoolVarP(&c.regex, "regex", "r", false, "parse topics as regex; consume any topic that matches any expression")
cmd.Flags().StringVarP(&c.escapeChar, "escape-char", "c", "%", "character to use for beginning a record field escape (accepts any utf8)")
cmd.Flags().Int32Var(&c.fetchMaxBytes, "fetch-max-bytes", 1<<20, "maximum amount of bytes per fetch request per broker")
cmd.Flags().StringVar(&c.rack, "rack", "", "the rack to use for fetch requests; setting this opts in to nearest replica fetching (Kafka 2.2.0+)")
return cmd
}

Expand Down
2 changes: 2 additions & 0 deletions commands/consume/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ type consumption struct {
num int
format string
escapeChar string
rack string

fetchMaxBytes int32

Expand Down Expand Up @@ -118,6 +119,7 @@ func (c *consumption) run(topics []string) {
}

c.cl.AddOpt(kgo.FetchMaxBytes(c.fetchMaxBytes))
c.cl.AddOpt(kgo.Rack(c.rack))

cl := c.cl.Client()
if len(c.group) > 0 && !(isConsumerOffsets || isTransactionState) {
Expand Down
3 changes: 3 additions & 0 deletions commands/transact/transact.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func Command(cl *client.Client) *cobra.Command {
groupAlg string
instanceID string
writeFormat string
rack string

rwFormat string

Expand Down Expand Up @@ -119,6 +120,7 @@ func Command(cl *client.Client) *cobra.Command {
}

cl.AddOpt(kgo.FetchIsolationLevel(kgo.ReadCommitted())) // we will be reading committed
cl.AddOpt(kgo.Rack(rack))

///////////////
// producing //
Expand Down Expand Up @@ -178,6 +180,7 @@ func Command(cl *client.Client) *cobra.Command {
cmd.Flags().StringVarP(&group, "group", "g", "", "group to assign")
cmd.Flags().StringVarP(&groupAlg, "balancer", "b", "cooperative-sticky", "group balancer to use if group consuming (range, roundrobin, sticky, cooperative-sticky)")
cmd.Flags().StringVarP(&instanceID, "instance-id", "i", "", "group instance ID to use for consuming; empty means none (implies static membership; Kafka 2.5.0+)")
cmd.Flags().StringVar(&rack, "rack", "", "the rack to use for fetch requests; setting this opts in to nearest replica fetching (Kafka 2.2.0+)")

cmd.Flags().StringVarP(&writeFormat, "write-format", "w", "%t\t%k\t%v\n", "format to write to the transform program")
cmd.Flags().StringVarP(&readFormat, "read-format", "r", "%t\t%k\t%v\n", "format to read from the transform program")
Expand Down

0 comments on commit 2c931c6

Please sign in to comment.