Skip to content

Commit

Permalink
feat: support stream decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
liuq19 committed Jan 30, 2024
1 parent 3135e27 commit 71bcbfa
Show file tree
Hide file tree
Showing 9 changed files with 648 additions and 22 deletions.
13 changes: 13 additions & 0 deletions ast/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,16 @@ func (self *Searcher) getByPath(copystring bool, path ...interface{}) (Node, err
}
return newRawNode(raw, t), nil
}

func Skip(json string, pos *int) (start int, err error ) {
parser := NewParser(json)
parser.p = *pos

start, err = parser.getByPath()
if code := err.(types.ParsingError); code != 0 {
return -1, err
}

*pos = parser.p
return start, nil
}

Check failure on line 93 in ast/search.go

View workflow job for this annotation

GitHub Actions / build (1.20.x)

syntax error: unexpected var after top level declaration
9 changes: 9 additions & 0 deletions ast/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,3 +410,12 @@ func BenchmarkSetOne_Parallel_Sonic(b *testing.B) {
}
})
}

func TestAstSkip(t *testing.T) {
input := ` {"test":123} `
pos := 0
start, err := Skip(input, &pos)
assert.Equal(t, start, 1)
assert.NoError(t, err)
assert.Equal(t, pos, 13)
}
11 changes: 3 additions & 8 deletions compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
`io`
`reflect`

`github.com/bytedance/sonic/decoder`
`github.com/bytedance/sonic/dev/decoder`
`github.com/bytedance/sonic/option`
)

Expand Down Expand Up @@ -119,13 +119,8 @@ func (cfg frozenConfig) NewEncoder(writer io.Writer) Encoder {
// TODO: use dev.NewDecoder
// NewDecoder is implemented by sonic
func (cfg frozenConfig) NewDecoder(reader io.Reader) Decoder {
dec := json.NewDecoder(reader)
if cfg.UseNumber {
dec.UseNumber()
}
if cfg.DisallowUnknownFields {
dec.DisallowUnknownFields()
}
dec := decoder.NewStreamDecoder(reader)
dec.Decoder.SetOptions(cfg.decoderOpts)
return dec
}

Expand Down
16 changes: 16 additions & 0 deletions dev/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"unsafe"

"encoding/json"
"github.com/bytedance/sonic/ast"
"github.com/bytedance/sonic/dev/internal/rt"
"github.com/davecgh/go-spew/spew"
)
Expand Down Expand Up @@ -75,3 +76,18 @@ func (self *Decoder) decodeImpl(val interface{}) error {
}
return err
}

func (self *Decoder) Reset(s string) {
self.json = s
}

// Skip skips only one json value, and returns first non-blank character position and its ending position if it is valid.
// Otherwise, returns negative error code using start and invalid character position using end
func Skip(data []byte) (start int, end int) {
pos := 0
start, err := ast.Skip(rt.Mem2Str(data), &pos)
if err != nil {
return -1, pos
}
return start, pos
}

Check failure on line 93 in dev/decoder/decoder.go

View workflow job for this annotation

GitHub Actions / build (1.20.x)

syntax error: unexpected var after top level declaration
243 changes: 243 additions & 0 deletions dev/decoder/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
/*
* Copyright 2021 ByteDance Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package decoder

import (
`bytes`
`io`
`sync`

`github.com/bytedance/sonic/ast`
`github.com/bytedance/sonic/dev/internal/rt`
`github.com/bytedance/sonic/option`
_ `github.com/davecgh/go-spew/spew`
)

var (
minLeftBufferShift uint = 1
)

// StreamDecoder is the decoder context object for streaming input.
type StreamDecoder struct {
r io.Reader
buf []byte
scanp int
scanned int64
err error
Decoder
}

var bufPool = sync.Pool{
New: func () interface{} {
return make([]byte, 0, option.DefaultDecoderBufferSize)
},
}

// NewStreamDecoder adapts to encoding/json.NewDecoder API.
//
// NewStreamDecoder returns a new decoder that reads from r.
func NewStreamDecoder(r io.Reader) *StreamDecoder {
return &StreamDecoder{r : r}
}

// Decode decodes input stream into val with corresponding data.
// Redundantly bytes may be read and left in its buffer, and can be used at next call.
// Either io error from underlying io.Reader (except io.EOF)
// or syntax error from data will be recorded and stop subsequently decoding.
func (self *StreamDecoder) Decode(val interface{}) (err error) {
// read more data into buf
if self.More() {
var s = self.scanp

try_skip:
var e = len(self.buf)
var src = rt.Mem2Str(self.buf[s:e])

// try skip
pos := 0
start, err := ast.Skip(src, &pos)
if err != nil {
if self.readMore() {
goto try_skip
} else {
return SyntaxError{e, self.json, err.Error()}
}
}

raw := src[start: pos]

// must copy string here for safety
self.Decoder.Reset(string(raw))
err = self.Decoder.Decode(val)
if err != nil {
return err
}

self.scanp = s + pos
_, empty := self.scan()
if empty {
// no remain valid bytes, thus we just recycle buffer
mem := self.buf
self.buf = nil
bufPool.Put(mem[:0])
} else {
// remain undecoded bytes, move them onto head
n := copy(self.buf, self.buf[self.scanp:])
self.buf = self.buf[:n]
}

self.scanned += int64(self.scanp)
self.scanp = 0
}

return self.err
}

// InputOffset returns the input stream byte offset of the current decoder position.
// The offset gives the location of the end of the most recently returned token and the beginning of the next token.
func (self *StreamDecoder) InputOffset() int64 {
return self.scanned + int64(self.scanp)
}

// Buffered returns a reader of the data remaining in the Decoder's buffer.
// The reader is valid until the next call to Decode.
func (self *StreamDecoder) Buffered() io.Reader {
return bytes.NewReader(self.buf[self.scanp:])
}

// More reports whether there is another element in the
// current array or object being parsed.
func (self *StreamDecoder) More() bool {
if self.err != nil {
return false
}
c, err := self.peek()
return err == nil && c != ']' && c != '}'
}

// More reports whether there is another element in the
// current array or object being parsed.
func (self *StreamDecoder) readMore() bool {
if self.err != nil {
return false
}

var err error
var n int
for {
// Grow buffer if not large enough.
l := len(self.buf)
realloc(&self.buf)

n, err = self.r.Read(self.buf[l:cap(self.buf)])
self.buf = self.buf[: l+n]

self.scanp = l
_, empty := self.scan()
if !empty {
return true
}

// buffer has been scanned, now report any error
if err != nil {
self.setErr(err)
return false
}
}
}

func (self *StreamDecoder) setErr(err error) {
self.err = err
mem := self.buf[:0]
self.buf = nil
bufPool.Put(mem)
}

func (self *StreamDecoder) peek() (byte, error) {
var err error
for {
c, empty := self.scan()
if !empty {
return byte(c), nil
}
// buffer has been scanned, now report any error
if err != nil {
self.setErr(err)
return 0, err
}
err = self.refill()
}
}

// return the next non-space
func (self *StreamDecoder) scan() (byte, bool) {
for i := self.scanp; i < len(self.buf); i++ {
c := self.buf[i]
if isSpace(c) {
continue
}
self.scanp = i
return c, false
}
return 0, true
}

func isSpace(c byte) bool {
return ((1 << ' ') | (1 << '\t') | (1 << '\r') | (1 << '\n')) & (1 << c) != 0
}

func (self *StreamDecoder) refill() error {
// Make room to read more into the buffer.
// First slide down data already consumed.
if self.scanp > 0 {
self.scanned += int64(self.scanp)
n := copy(self.buf, self.buf[self.scanp:])
self.buf = self.buf[:n]
self.scanp = 0
}

// Grow buffer if not large enough.
realloc(&self.buf)

// Read. Delay error for next iteration (after scan).
n, err := self.r.Read(self.buf[len(self.buf):cap(self.buf)])
self.buf = self.buf[0 : len(self.buf)+n]

return err
}

func realloc(buf *[]byte) bool {
l := uint(len(*buf))
c := uint(cap(*buf))
if c == 0 {
*buf = bufPool.Get().([]byte)
return true
}
if c - l <= c >> minLeftBufferShift {
e := l+(l>>minLeftBufferShift)
if e <= c {
e = c*2
}
tmp := make([]byte, l, e)
copy(tmp, *buf)
*buf = tmp
return true
}
return false
}


Loading

0 comments on commit 71bcbfa

Please sign in to comment.