_____ ______ _____ _____ _ _ __ __ ______ _____
| __ \ | ____|| __ \ / ____|| | | || \/ || ____|| __ \
| |__) || |__ | | | || (___ | | | || \ / || |__ | |__) |
| _ / | __| | | | | \___ \ | | | || |\/| || __| | _ /
| | \ \ | |____ | |__| | ____) || |__| || | | || |____ | | \ \
|_| \_\|______||_____/ |_____/ \____/ |_| |_||______||_| \_\
Redsumer is a GO library that provides a simple way to consume and produce messages from a Redis Stream. It is designed to be used in a microservices architecture, where a service needs to consume messages from a Redis Stream and process them. It is built on top of the go-redis library.
Use the package manager go get to install Redsumer.
go get github.com/enerBit/redsumer
package main
import (
"context"
"fmt"
"time"
"github.com/enerBit/redsumer/v3/pkg/client"
"github.com/enerBit/redsumer/v3/pkg/consumer"
)
func main() {
// Redis client configuration
redisArgs := client.RedisArgs{
RedisHost: "localhost",
RedisPort: 6379,
Db: 0,
}
var claimBatchSize int64 = 1
var pendingBatchSize int64 = 1
consumerArgs := consumer.ConsumerArgs{
// stream, group and consumer names
StreamName: "stream_name",
GroupName: "group_name",
ConsumerName: "consumer_name",
// batch of messages to new messages
BatchSize: 1,
// batch of messages to claim, if is nil, it will dont claim messages
ClaimBatchSize: &claimBatchSize,
// batch of messages to pending, if is nil, it will dont pending messages
PendingBatchSize: &pendingBatchSize,
// time to block the connection
Block: time.Millisecond * 1,
// MinDurationToClaim is the minimum time that a message must be in the pending state to be claimed
MinDurationToClaim: time.Second * 1,
// IdleStillMine is the time that a message is still mine after the last ack
IdleStillMine: 0,
// MaxTries is the maximum number of tries to wait for the stream to be created
Tries: []int{1, 2, 3, 10, 15},
}
ctx := context.Background()
// Create a new consumer
consumerClient, err := consumer.NewConsumer(ctx, redisArgs, consumerArgs)
if err != nil {
panic(err)
}
for {
// Consume messages, get messages news, pending and claimed
messages, err := consumerClient.Consume(ctx)
if err != nil {
fmt.Println(err)
}
// Process messages
for _, message := range messages {
// Check if the message is still mine
if ok, _ := client.StillMine(ctx, message.ID); !ok {
fmt.Println("Message", message.ID, "is not mine anymore")
continue
}
fmt.Println(message.ID, message.Values)
// Acknowledge the message
err = consumerClient.Ack(ctx, message.ID)
if err != nil {
fmt.Println(err)
}
}
}
}
package main
import (
"context"
"time"
"github.com/enerBit/redsumer/v3/pkg/producer"
)
func main() {
// Redis client configuration
redisArgs := producer.RedisArgs{
RedisHost: "localhost",
RedisPort: 6379,
Db: 0,
}
// Producer configuration
producerArgs := producer.ProducerArgs{StreamName: "stream_name"}
ctx := context.Background()
// Create a new producer
producer, err := producer.NewProducer(ctx, redisArgs, producerArgs)
if err != nil {
panic(err)
}
// Produce a message
err = producer.Produce(ctx, map[string]interface{}{
"key": "value",
})
if err != nil {
panic(err)
}
}
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.
Please make sure to update tests as appropriate.