Skip to content

Commit

Permalink
Upgrade Zed to v1.13.0
Browse files Browse the repository at this point in the history
  • Loading branch information
nwt committed Feb 8, 2024
1 parent 2e4228e commit eabe035
Show file tree
Hide file tree
Showing 11 changed files with 84 additions and 95 deletions.
10 changes: 5 additions & 5 deletions cmd/zync/from-kafka/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,8 @@ func (f *From) Run(args []string) error {
}

var fifoLakes []*fifo.Lake
var fifoLakeChs []<-chan *zed.Value
topicToChs := map[string][]chan<- *zed.Value{}
var fifoLakeChs []<-chan zed.Value
topicToChs := map[string][]chan<- zed.Value{}
topicToOffset := map[string]int64{}

group, groupCtx := errgroup.WithContext(ctx)
Expand All @@ -161,7 +161,7 @@ func (f *From) Run(args []string) error {
if err != nil {
return fmt.Errorf("pool %s: %w", pool, err)
}
ch := make(chan *zed.Value)
ch := make(chan zed.Value)
mu.Lock()
fifoLakes = append(fifoLakes, fifoLake)
fifoLakeChs = append(fifoLakeChs, ch)
Expand Down Expand Up @@ -215,7 +215,7 @@ func (f *From) Run(args []string) error {
return group.Wait()
}

func (f *From) runRead(ctx context.Context, c *fifo.Consumer, topicToChs map[string][]chan<- *zed.Value) error {
func (f *From) runRead(ctx context.Context, c *fifo.Consumer, topicToChs map[string][]chan<- zed.Value) error {
for {
val, err := c.ReadValue(ctx)
if err != nil {
Expand Down Expand Up @@ -243,7 +243,7 @@ func (f *From) runRead(ctx context.Context, c *fifo.Consumer, topicToChs map[str
}
}

func (f *From) runLoad(ctx, timeoutCtx context.Context, zctx *zed.Context, fifoLake *fifo.Lake, shaper string, ch <-chan *zed.Value) error {
func (f *From) runLoad(ctx, timeoutCtx context.Context, zctx *zed.Context, fifoLake *fifo.Lake, shaper string, ch <-chan zed.Value) error {
ticker := time.NewTicker(f.interval)
defer ticker.Stop()
// Stop ticker until data arrives.
Expand Down
30 changes: 14 additions & 16 deletions connectjson/connectjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"strings"

"github.com/brimdata/zed"
"github.com/brimdata/zed/runtime/expr"
"github.com/brimdata/zed/runtime/sam/expr"
"github.com/brimdata/zed/zcode"
"github.com/brimdata/zed/zio/jsonio"
"github.com/brimdata/zed/zson"
Expand All @@ -27,19 +27,19 @@ type connectSchema struct {
Field string `json:"field,omitempty"`
}

func Encode(val *zed.Value) ([]byte, error) {
if zed.TypeUnder(val.Type) == zed.TypeNull {
func Encode(val zed.Value) ([]byte, error) {
if zed.TypeUnder(val.Type()) == zed.TypeNull {
return nil, nil
}
schema, err := marshalSchema(val.Type)
schema, err := marshalSchema(val.Type())
if err != nil {
return nil, err
}
return json.Marshal(struct {
Schema *connectSchema `json:"schema"`
Payload interface{} `json:"payload"`
}{
schema, marshalPayload(val.Type, val.Bytes()),
schema, marshalPayload(val.Type(), val.Bytes()),
})
}

Expand Down Expand Up @@ -169,7 +169,6 @@ type Decoder struct {
jsonioReader *jsonio.Reader
shapers map[string]*expr.ConstShaper
this expr.This
val zed.Value
}

func NewDecoder(zctx *zed.Context) *Decoder {
Expand All @@ -184,7 +183,7 @@ func NewDecoder(zctx *zed.Context) *Decoder {
}
}

func (c *Decoder) Decode(b []byte) (*zed.Value, error) {
func (c *Decoder) Decode(b []byte) (zed.Value, error) {
b = bytes.TrimSpace(b)
if len(b) == 0 {
return zed.Null, nil
Expand All @@ -202,13 +201,13 @@ func (c *Decoder) Decode(b []byte) (*zed.Value, error) {
return nil
})
if err != nil {
return nil, err
return zed.Null, err
}
c.buf.Reset()
c.buf.Write(payload)
val, err := c.jsonioReader.Read()
if err != nil {
return nil, err
return zed.Null, err
}
// Using the schema's JSON encoding as the key here means different
// encodings of the same schema won't share an entry, but that should be
Expand All @@ -217,16 +216,16 @@ func (c *Decoder) Decode(b []byte) (*zed.Value, error) {
if !ok {
var cs connectSchema
if err := json.Unmarshal(schema, &cs); err != nil {
return nil, err
return zed.Null, err
}
_, typ, err := c.decodeSchema(&cs)
if err != nil {
return nil, err
return zed.Null, err
}
shaper = expr.NewConstShaper(c.zctx, &c.this, typ, expr.Cast|expr.Order)
c.shapers[string(schema)] = shaper
}
return c.decodeBytes(shaper.Eval(c.ectx, val)), nil
return c.decodeBytes(shaper.Eval(c.ectx, *val)), nil
}

func (c *Decoder) decodeSchema(s *connectSchema) (string, zed.Type, error) {
Expand Down Expand Up @@ -277,12 +276,12 @@ func (c *Decoder) decodeSchema(s *connectSchema) (string, zed.Type, error) {
return s.Field, typ, err
}

func (c *Decoder) decodeBytes(val *zed.Value) *zed.Value {
func (c *Decoder) decodeBytes(val zed.Value) zed.Value {
if val.IsNull() {
return val
}
c.builder.Truncate()
err := Walk(val.Type, val.Bytes(), func(typ zed.Type, bytes zcode.Bytes) error {
err := Walk(val.Type(), val.Bytes(), func(typ zed.Type, bytes zcode.Bytes) error {
if bytes == nil {
c.builder.Append(nil)
} else if zed.IsContainerType(typ) {
Expand All @@ -304,8 +303,7 @@ func (c *Decoder) decodeBytes(val *zed.Value) *zed.Value {
if err != nil {
panic(err)
}
c.val = *zed.NewValue(val.Type, c.builder.Bytes().Body())
return &c.val
return zed.NewValue(val.Type(), c.builder.Bytes().Body())
}

func Walk(typ zed.Type, body zcode.Bytes, visit zed.Visitor) error {
Expand Down
36 changes: 17 additions & 19 deletions etl/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,20 +98,19 @@ func (p *Pipeline) writeToOutputPool(ctx context.Context, batch *zbuf.Array) err
if err != nil {
return err
}
vals := batch.Values()
for k := range vals {
for _, rec := range batch.Values() {
//XXX This still doesn't work with the zctx bug fix. See issue #31
//if vals[k].Type == p.doneType {
// out.Append(&vals[k])
//}
if typedef, ok := vals[k].Type.(*zed.TypeNamed); ok && typedef.Name == "done" {
out.Append(&vals[k])
if named, ok := rec.Type().(*zed.TypeNamed); ok && named.Name == "done" {
out.Append(rec)
}
if extra := vals[k].Deref("left"); extra != nil {
out.Append(extra)
if extra := rec.Deref("left"); extra != nil {
out.Append(*extra)
}
if extra := vals[k].Deref("right"); extra != nil {
out.Append(extra)
if extra := rec.Deref("right"); extra != nil {
out.Append(*extra)
}
}
//XXX We need to track the commitID and use new commit-only-if
Expand All @@ -130,26 +129,24 @@ func insertOffsets(ctx context.Context, zctx *zed.Context, doneType zed.Type, ba
// flow count() but that is not implemented yet. Instead, we just format
// up some ZSON that can then insert the proper offsets in each record.
var zsons strings.Builder
vals := batch.Values()
for k := range vals {
for _, rec := range batch.Values() {
// This pointer comparison should work but it doesn't right now.
// Are they all allocated in the same zctx?
//if vals[k].Type == doneType {
// continue
//}
if typedef, ok := vals[k].Type.(*zed.TypeNamed); ok && typedef.Name == "done" {
if named, ok := rec.Type().(*zed.TypeNamed); ok && named.Name == "done" {
continue
}
if vals[k].Deref("left") != nil {
if rec.Deref("left") != nil {
continue
}
rec := zson.FormatValue(&vals[k])
topic, _, err := getKafkaMeta(&vals[k])
topic, _, err := getKafkaMeta(rec)
if err != nil {
return nil, err
}
off := offsets[topic]
zsons.WriteString(fmt.Sprintf("{rec:%s,offset:%d}\n", rec, off))
zsons.WriteString(fmt.Sprintf("{rec:%s,offset:%d}\n", zson.FormatValue(rec), off))
offsets[topic] = off + 1
}
comp := compiler.NewCompiler()
Expand All @@ -162,20 +159,21 @@ func insertOffsets(ctx context.Context, zctx *zed.Context, doneType zed.Type, ba
if err != nil {
return nil, err
}
return NewArrayFromReader(q.AsReader())
defer q.Pull(true)
return NewArrayFromReader(zbuf.PullerReader(q))
}

func getKafkaMeta(rec *zed.Value) (string, int64, error) {
func getKafkaMeta(rec zed.Value) (string, int64, error) {
// XXX this API should be simplified in zed package
kafkaRec := rec.Deref("kafka")
if kafkaRec == nil {
return "", 0, fmt.Errorf("value missing 'kafka' metadata field: %s", zson.FormatValue(rec))
}
topic, err := FieldAsString(kafkaRec, "topic")
topic, err := FieldAsString(*kafkaRec, "topic")
if err != nil {
return "", 0, err
}
offset, err := FieldAsInt(kafkaRec, "offset")
offset, err := FieldAsInt(*kafkaRec, "offset")
if err != nil {
return "", 0, err
}
Expand Down
20 changes: 9 additions & 11 deletions etl/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ func (p *Pool) NextProducerOffsets(ctx context.Context) (map[string]int64, error
// Note at start-up if there are no offsets, then we will return an empty
// map and the caller will get offset 0 for the next offset of any lookups.
offsets := make(map[string]int64)
vals := batch.Values()
for k := range vals {
rec := &vals[k]
for _, rec := range batch.Values() {
offset, err := FieldAsInt(rec, "offset")
if err != nil {
return nil, err
Expand All @@ -90,34 +88,34 @@ func NewArrayFromReader(zr zio.Reader) (*zbuf.Array, error) {
return &a, nil
}

func Field(val *zed.Value, field string) (*zed.Value, error) {
func Field(val zed.Value, field string) (zed.Value, error) {
fieldVal := val.Deref(field)
if fieldVal == nil {
return nil, fmt.Errorf("field %q not found in %q", field, zson.FormatValue(val))
return zed.Null, fmt.Errorf("field %q not found in %q", field, zson.FormatValue(val))
}
if fieldVal.IsNull() {
return nil, fmt.Errorf("field %q null in %q", field, zson.FormatValue(val))
return zed.Null, fmt.Errorf("field %q null in %q", field, zson.FormatValue(val))
}
return fieldVal, nil
return *fieldVal, nil
}

func FieldAsInt(val *zed.Value, field string) (int64, error) {
func FieldAsInt(val zed.Value, field string) (int64, error) {
fieldVal, err := Field(val, field)
if err != nil {
return 0, err
}
if !zed.IsInteger(fieldVal.Type.ID()) {
if !zed.IsInteger(fieldVal.Type().ID()) {
return 0, fmt.Errorf("field %q not an interger in %q", field, zson.FormatValue(val))
}
return fieldVal.AsInt(), nil
}

func FieldAsString(val *zed.Value, field string) (string, error) {
func FieldAsString(val zed.Value, field string) (string, error) {
fieldVal, err := Field(val, field)
if err != nil {
return "", err
}
if fieldVal.Type.ID() != zed.IDString {
if fieldVal.Type().ID() != zed.IDString {
return "", fmt.Errorf("field %q not a string in %q", field, zson.FormatValue(val))
}
return fieldVal.AsString(), nil
Expand Down
26 changes: 13 additions & 13 deletions fifo/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ type Consumer struct {

// decoder wraps the Decode method.
//
// Implementations retain ownership of val and val.Bytes, which remain valid
// until the next Decode..
// Implementations retain ownership of val.Bytes, which remain valid
// until the next Decode.
type decoder interface {
Decode(b []byte) (val *zed.Value, err error)
Decode(b []byte) (val zed.Value, err error)
}

func NewConsumer(zctx *zed.Context, opts []kgo.Opt, reg *srclient.SchemaRegistryClient, format string, topics map[string]int64, meta bool) (*Consumer, error) {
Expand Down Expand Up @@ -81,17 +81,17 @@ func (c *Consumer) Close() {

// ReadValue returns the next value. Unlike zio.Reader.Read, the caller
// receives ownership of zed.Value.Bytes.
func (c *Consumer) ReadValue(ctx context.Context) (*zed.Value, error) {
func (c *Consumer) ReadValue(ctx context.Context) (zed.Value, error) {
for {
if !c.recordIter.Done() {
return c.handle(c.recordIter.Next())
}
fetches := c.kclient.PollFetches(ctx)
for _, e := range fetches.Errors() {
if e.Topic != "" {
return nil, fmt.Errorf("topic %s, partition %d: %w", e.Topic, e.Partition, e.Err)
return zed.Null, fmt.Errorf("topic %s, partition %d: %w", e.Topic, e.Partition, e.Err)
}
return nil, e.Err
return zed.Null, e.Err
}
c.recordIter = *fetches.RecordIter()
}
Expand All @@ -114,9 +114,9 @@ func (c *Consumer) Run(ctx context.Context, w zio.Writer, timeout time.Duration)
}
}

func (c *Consumer) handle(krec *kgo.Record) (*zed.Value, error) {
func (c *Consumer) handle(krec *kgo.Record) (zed.Value, error) {
if saved := c.savedOffsets[krec.Topic]; krec.Offset < saved {
return nil, fmt.Errorf("topic %s, partition %d: received offset %d is less than saved offset %d",
return zed.Null, fmt.Errorf("topic %s, partition %d: received offset %d is less than saved offset %d",
krec.Topic, krec.Partition, krec.Offset, saved)
}
c.savedOffsets[krec.Topic] = krec.Offset
Expand All @@ -131,18 +131,18 @@ func (c *Consumer) handle(krec *kgo.Record) (*zed.Value, error) {
}
key, err := c.decoder.Decode(krec.Key)
if err != nil {
return nil, err
return zed.Null, err
}
keyType := key.Type
keyType := key.Type()
b.Append(key.Bytes())
val, err := c.decoder.Decode(krec.Value)
if err != nil {
return nil, err
return zed.Null, err
}
b.Append(val.Bytes())
outerType, err := c.outerType(keyType, val.Type)
outerType, err := c.outerType(keyType, val.Type())
if err != nil {
return nil, err
return zed.Null, err
}
return zed.NewValue(outerType, b.Bytes()), nil
}
Expand Down
5 changes: 3 additions & 2 deletions fifo/lake.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (l *Lake) NextConsumerOffset(ctx context.Context, topic string) (int64, err
// This should not happen.
return 0, errors.New("'tail 1' returned more than one record")
}
offset, err := etl.FieldAsInt(&vals[0], "offset")
offset, err := etl.FieldAsInt(vals[0], "offset")
if err != nil {
return 0, err
}
Expand All @@ -101,5 +101,6 @@ func RunLocalQuery(ctx context.Context, zctx *zed.Context, batch *zbuf.Array, qu
if err != nil {
return nil, err
}
return etl.NewArrayFromReader(q.AsReader())
defer q.Pull(true)
return etl.NewArrayFromReader(zbuf.PullerReader(q))
}
Loading

0 comments on commit eabe035

Please sign in to comment.