Skip to content

Commit

Permalink
[#110]: fix: correctly handle []any type
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Nov 29, 2023
2 parents 2c9967b + 756a884 commit 454826f
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 44 deletions.
96 changes: 55 additions & 41 deletions amqpjobs/conv.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,65 @@ import (
"strconv"

amqp "github.com/rabbitmq/amqp091-go"
"go.uber.org/zap"
)

func convHeaders(h amqp.Table) map[string][]string { //nolint:gocyclo
ret := make(map[string][]string)
func convHeaders(h amqp.Table, log *zap.Logger) map[string][]string { //nolint:gocyclo
ret := make(map[string][]string, len(h))
for k := range h {
switch t := h[k].(type) {
case int:
ret[k] = []string{strconv.Itoa(t)}
case int8:
ret[k] = []string{strconv.Itoa(int(t))}
case int16:
ret[k] = []string{strconv.Itoa(int(t))}
case int32:
ret[k] = []string{strconv.Itoa(int(t))}
case int64:
ret[k] = []string{strconv.Itoa(int(t))}
case uint:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint8:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint16:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint32:
ret[k] = []string{strconv.FormatUint(uint64(t), 10)}
case uint64:
ret[k] = []string{strconv.FormatUint(t, 10)}
case float32:
ret[k] = []string{strconv.FormatFloat(float64(t), 'f', 5, 64)}
case float64:
ret[k] = []string{strconv.FormatFloat(t, 'f', 5, 64)}
case string:
ret[k] = []string{t}
case []string:
ret[k] = t
case bool:
switch t {
case true:
ret[k] = []string{"true"}
case false:
ret[k] = []string{"false"}
}
case []byte:
ret[k] = []string{string(t)}
}
// mut ret
convHeadersAnyType(&ret, k, h[k], log)
}

return ret
}

func convHeadersAnyType(ret *map[string][]string, k string, header any, log *zap.Logger) {
switch t := header.(type) {
case int:
(*ret)[k] = append((*ret)[k], strconv.Itoa(t))
case int8:
(*ret)[k] = append((*ret)[k], strconv.Itoa(int(t)))
case int16:
(*ret)[k] = append((*ret)[k], strconv.Itoa(int(t)))
case int32:
(*ret)[k] = append((*ret)[k], strconv.Itoa(int(t)))
case int64:
(*ret)[k] = append((*ret)[k], strconv.FormatInt(t, 10))
case uint:
(*ret)[k] = append((*ret)[k], strconv.FormatUint(uint64(t), 10))
case uint8:
(*ret)[k] = append((*ret)[k], strconv.FormatUint(uint64(t), 10))
case uint16:
(*ret)[k] = append((*ret)[k], strconv.FormatUint(uint64(t), 10))
case uint32:
(*ret)[k] = append((*ret)[k], strconv.FormatUint(uint64(t), 10))
case uint64:
(*ret)[k] = append((*ret)[k], strconv.FormatUint(t, 10))
case float32:
(*ret)[k] = append((*ret)[k], strconv.FormatFloat(float64(t), 'f', 5, 64))
case float64:
(*ret)[k] = append((*ret)[k], strconv.FormatFloat(t, 'f', 5, 64))
case string:
(*ret)[k] = append((*ret)[k], t)
case []string:
(*ret)[k] = append((*ret)[k], t...)
case bool:
switch t {
case true:
(*ret)[k] = []string{"true"}
case false:
(*ret)[k] = []string{"false"}
}
case []byte:
(*ret)[k] = append((*ret)[k], string(t))
case []any:
for _, v := range t {
// we need to recursively call this function to handle nested slices of primitives
convHeadersAnyType(ret, k, v, log)
}
default:
// we don't know what this is, so we'll just ignore it
log.Warn("unknown header type", zap.String("key", k), zap.Any("value", t))
}
}
5 changes: 4 additions & 1 deletion amqpjobs/conv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)

func TestConv(t *testing.T) {
Expand Down Expand Up @@ -32,7 +33,9 @@ func TestConv(t *testing.T) {
table["foo"] = float32(2.3)
table["foo2"] = 2.3

ret := convHeaders(table)
log, err := zap.NewDevelopment()
require.NoError(t, err)
ret := convHeaders(table, log)

require.Equal(t, ret["foo"], []string{"2.30000"})
require.Equal(t, ret["foo2"], []string{"2.30000"})
Expand Down
2 changes: 1 addition & 1 deletion amqpjobs/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func pack(id string, j *Item) (amqp.Table, error) {
// unpack restores jobs.Options
func (d *Driver) unpack(deliv amqp.Delivery) *Item {
item := &Item{
headers: convHeaders(deliv.Headers),
headers: convHeaders(deliv.Headers, d.log),
Payload: deliv.Body,
Options: &Options{
Pipeline: (*d.pipeline.Load()).Name(),
Expand Down
Loading

0 comments on commit 454826f

Please sign in to comment.