Skip to content

Commit

Permalink
fix merge
Browse files Browse the repository at this point in the history
  • Loading branch information
magicxyyz committed Nov 22, 2024
1 parent 19a5a3c commit cf0c3a5
Showing 1 changed file with 0 additions and 21 deletions.
21 changes: 0 additions & 21 deletions pubsub/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,24 +289,3 @@ func (c *Consumer[Request, Response]) SetError(ctx context.Context, messageID st
}
return nil
}

func (c *Consumer[Request, Response]) SetError(ctx context.Context, messageID string, error string) error {
resp, err := json.Marshal(error)
if err != nil {
return fmt.Errorf("marshaling result: %w", err)
}
errorKey := ErrorKeyFor(c.StreamName(), messageID)
log.Debug("consumer: setting error", "cid", c.id, "msgIdInStream", messageID, "errorKeyInRedis", errorKey)
acquired, err := c.client.SetNX(ctx, errorKey, resp, c.cfg.ResponseEntryTimeout).Result()
if err != nil || !acquired {
return fmt.Errorf("setting error for message with message-id in stream: %v, error: %w", messageID, err)
}
log.Debug("consumer: xack", "cid", c.id, "messageId", messageID)
if _, err := c.client.XAck(ctx, c.redisStream, c.redisGroup, messageID).Result(); err != nil {
return fmt.Errorf("acking message: %v, error: %w", messageID, err)
}
if _, err := c.client.XDel(ctx, c.redisStream, messageID).Result(); err != nil {
return fmt.Errorf("deleting message: %v, error: %w", messageID, err)
}
return nil
}

0 comments on commit cf0c3a5

Please sign in to comment.