diff --git a/dict.go b/dict.go index af759a5..590001f 100644 --- a/dict.go +++ b/dict.go @@ -23,6 +23,14 @@ static ZSTD_DDict* ZSTD_createDDict_wrapper(uintptr_t dictBuffer, size_t dictSiz return ZSTD_createDDict((const void *)dictBuffer, dictSize); } +static ZSTD_CDict* ZSTD_createCDict_byReference_wrapper(uintptr_t dictBuffer, size_t dictSize, int compressionLevel) { + return ZSTD_createCDict_byReference((const void *)dictBuffer, dictSize, compressionLevel); +} + +static ZSTD_DDict* ZSTD_createDDict_byReference_wrapper(uintptr_t dictBuffer, size_t dictSize) { + return ZSTD_createDDict_byReference((const void *)dictBuffer, dictSize); +} + */ import "C" @@ -104,6 +112,7 @@ var buildDictLock sync.Mutex // A single CDict may be re-used in concurrently running goroutines. type CDict struct { p *C.ZSTD_CDict + pinner runtime.Pinner compressionLevel int } @@ -114,6 +123,16 @@ func NewCDict(dict []byte) (*CDict, error) { return NewCDictLevel(dict, DefaultCompressionLevel) } +// NewCDictByRef creates a new CDict that shares the given dict +// +// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead +// to undefined and undesirable behavior. +// +// Call Release when the returned dict is no longer used. +func NewCDictByRef(dict []byte) (*CDict, error) { + return NewCDictLevelByRef(dict, DefaultCompressionLevel) +} + // NewCDictLevel creates new CDict from the given dict // using the given compressionLevel. // @@ -136,6 +155,36 @@ func NewCDictLevel(dict []byte, compressionLevel int) (*CDict, error) { return cd, nil } +// NewCDictLevelByRef creates a new CDict that shares the given dict using +// the given compressionLevel. +// +// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead +// to undefined and undesirable behavior. +// +// Call Release when the returned dict is no longer used. +func NewCDictLevelByRef(dict []byte, compressionLevel int) (*CDict, error) { + if len(dict) == 0 { + return nil, fmt.Errorf("dict cannot be empty") + } + + // Pin the backing array of the input - it must not move while C.ZSTD_CDict + // is alive. + var pinner runtime.Pinner + pinner.Pin(&dict[0]) + + cd := &CDict{ + p: C.ZSTD_createCDict_byReference_wrapper( + C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))), + C.size_t(len(dict)), + C.int(compressionLevel)), + pinner: pinner, + compressionLevel: compressionLevel, + } + // No need for runtime.KeepAlive due to explicit pinning + runtime.SetFinalizer(cd, freeCDict) + return cd, nil +} + // Release releases resources occupied by cd. // // cd cannot be used after the release. @@ -146,6 +195,7 @@ func (cd *CDict) Release() { result := C.ZSTD_freeCDict(cd.p) ensureNoError("ZSTD_freeCDict", result) cd.p = nil + cd.pinner.Unpin() } func freeCDict(v interface{}) { @@ -156,7 +206,8 @@ func freeCDict(v interface{}) { // // A single DDict may be re-used in concurrently running goroutines. type DDict struct { - p *C.ZSTD_DDict + p *C.ZSTD_DDict + pinner runtime.Pinner } // NewDDict creates new DDict from the given dict. @@ -178,6 +229,33 @@ func NewDDict(dict []byte) (*DDict, error) { return dd, nil } +// NewDDictByRef creates a new DDict that shares the given dict +// +// Callers *must not* mutate the underlying array of 'dict'. Doing so will lead +// to undefined and undesirable behavior. +// +// Call Release when the returned dict is no longer needed. +func NewDDictByRef(dict []byte) (*DDict, error) { + if len(dict) == 0 { + return nil, fmt.Errorf("dict cannot be empty") + } + + // Pin the backing array of the input - it must not move while C.ZSTD_DDict + // is alive. + var pinner runtime.Pinner + pinner.Pin(&dict[0]) + + dd := &DDict{ + p: C.ZSTD_createDDict_byReference_wrapper( + C.uintptr_t(uintptr(unsafe.Pointer(&dict[0]))), + C.size_t(len(dict))), + pinner: pinner, + } + // No need for runtime.KeepAlive due to explicit pinning + runtime.SetFinalizer(dd, freeDDict) + return dd, nil +} + // Release releases resources occupied by dd. // // dd cannot be used after the release. @@ -189,6 +267,7 @@ func (dd *DDict) Release() { result := C.ZSTD_freeDDict(dd.p) ensureNoError("ZSTD_freeDDict", result) dd.p = nil + dd.pinner.Unpin() } func freeDDict(v interface{}) { diff --git a/dict_test.go b/dict_test.go index bd7cce5..b6bc332 100644 --- a/dict_test.go +++ b/dict_test.go @@ -43,6 +43,22 @@ func TestCDictCreateRelease(t *testing.T) { } } +func TestCDictByRefCreateRelease(t *testing.T) { + var samples [][]byte + for i := 0; i < 1000; i++ { + samples = append(samples, []byte(fmt.Sprintf("sample %d", i))) + } + dict := BuildDict(samples, 64*1024) + + for i := 0; i < 10; i++ { + cd, err := NewCDictByRef(dict) + if err != nil { + t.Fatalf("cannot create dict: %s", err) + } + cd.Release() + } +} + func TestDDictCreateRelease(t *testing.T) { var samples [][]byte for i := 0; i < 1000; i++ { @@ -59,6 +75,22 @@ func TestDDictCreateRelease(t *testing.T) { } } +func TestDDictByRefCreateRelease(t *testing.T) { + var samples [][]byte + for i := 0; i < 1000; i++ { + samples = append(samples, []byte(fmt.Sprintf("sample %d", i))) + } + dict := BuildDict(samples, 64*1024) + + for i := 0; i < 10; i++ { + dd, err := NewDDictByRef(dict) + if err != nil { + t.Fatalf("cannot create dict: %s", err) + } + dd.Release() + } +} + func TestBuildDict(t *testing.T) { for _, samplesCount := range []int{0, 1, 10, 100, 1000} { t.Run(fmt.Sprintf("samples_%d", samplesCount), func(t *testing.T) { diff --git a/gozstd_test.go b/gozstd_test.go index 3a465b8..6475982 100644 --- a/gozstd_test.go +++ b/gozstd_test.go @@ -123,6 +123,67 @@ func TestCompressDecompressDistinctConcurrentDicts(t *testing.T) { } } +func TestCompressDecompressDistinctConcurrentDictsByRef(t *testing.T) { + // Build multiple distinct dicts, sharing the underlying byte array + var cdicts []*CDict + var ddicts []*DDict + defer func() { + for _, cd := range cdicts { + cd.Release() + } + for _, dd := range ddicts { + dd.Release() + } + }() + for i := 0; i < 4; i++ { + var samples [][]byte + for j := 0; j < 1000; j++ { + sample := fmt.Sprintf("this is %d,%d sample", j, i) + samples = append(samples, []byte(sample)) + } + dict := BuildDict(samples, 4*1024) + cd, err := NewCDictByRef(dict) + if err != nil { + t.Fatalf("cannot create CDict: %s", err) + } + cdicts = append(cdicts, cd) + dd, err := NewDDictByRef(dict) + if err != nil { + t.Fatalf("cannot create DDict: %s", err) + } + ddicts = append(ddicts, dd) + } + + // Build data for the compression. + var bb bytes.Buffer + i := 0 + for bb.Len() < 1e4 { + fmt.Fprintf(&bb, "%d sample line this is %d", bb.Len(), i) + i++ + } + data := bb.Bytes() + + // Run concurrent goroutines compressing/decompressing with distinct dicts. + ch := make(chan error, len(cdicts)) + for i := 0; i < cap(ch); i++ { + go func(cd *CDict, dd *DDict) { + ch <- testCompressDecompressDistinctConcurrentDicts(cd, dd, data) + }(cdicts[i], ddicts[i]) + } + + // Wait for goroutines to finish. + for i := 0; i < cap(ch); i++ { + select { + case err := <-ch: + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } + } +} + func testCompressDecompressDistinctConcurrentDicts(cd *CDict, dd *DDict, data []byte) error { var compressedData, decompressedData []byte for j := 0; j < 10; j++ { diff --git a/gozstd_timing_test.go b/gozstd_timing_test.go index 450dad6..9143f10 100644 --- a/gozstd_timing_test.go +++ b/gozstd_timing_test.go @@ -19,16 +19,33 @@ func BenchmarkDecompressDict(b *testing.B) { b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { for _, level := range benchCompressionLevels { b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { - benchmarkDecompressDict(b, blockSize, level) + benchmarkDecompressDict(b, blockSize, level, false) }) } }) } } -func benchmarkDecompressDict(b *testing.B, blockSize, level int) { +func BenchmarkDecompressDictByRef(b *testing.B) { + for _, blockSize := range benchBlockSizes { + b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { + for _, level := range benchCompressionLevels { + b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { + benchmarkDecompressDict(b, blockSize, level, true) + }) + } + }) + } +} + +func benchmarkDecompressDict(b *testing.B, blockSize, level int, byReference bool) { block := newBenchString(blockSize) - bd := getBenchDicts(level) + var bd *benchDicts + if byReference { + bd = getBenchDictsByRef(level) + } else { + bd = getBenchDicts(level) + } src := CompressDict(nil, block, bd.cd) b.Logf("compressionRatio: %f", float64(len(block))/float64(len(src))) b.ReportAllocs() @@ -54,16 +71,33 @@ func BenchmarkCompressDict(b *testing.B) { b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { for _, level := range benchCompressionLevels { b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { - benchmarkCompressDict(b, blockSize, level) + benchmarkCompressDict(b, blockSize, level, false) + }) + } + }) + } +} + +func BenchmarkCompressDictByRef(b *testing.B) { + for _, blockSize := range benchBlockSizes { + b.Run(fmt.Sprintf("blockSize_%d", blockSize), func(b *testing.B) { + for _, level := range benchCompressionLevels { + b.Run(fmt.Sprintf("level_%d", level), func(b *testing.B) { + benchmarkCompressDict(b, blockSize, level, true) }) } }) } } -func benchmarkCompressDict(b *testing.B, blockSize, level int) { +func benchmarkCompressDict(b *testing.B, blockSize, level int, byReference bool) { src := newBenchString(blockSize) - bd := getBenchDicts(level) + var bd *benchDicts + if byReference { + bd = getBenchDictsByRef(level) + } else { + bd = getBenchDicts(level) + } b.ReportAllocs() b.SetBytes(int64(len(src))) b.ResetTimer() @@ -89,15 +123,45 @@ func getBenchDicts(level int) *benchDicts { return tmp } +func getBenchDictsByRef(level int) *benchDicts { + benchDictsByRefLock.Lock() + tmp := benchDictsByRefMap[level] + if tmp == nil { + tmp = newBenchDictsByRef(level) + benchDictsByRefMap[level] = tmp + } + benchDictsByRefLock.Unlock() + return tmp +} + type benchDicts struct { cd *CDict dd *DDict } -var benchDictsMap = make(map[int]*benchDicts) -var benchDictsLock sync.Mutex +var ( + benchDictsMap = make(map[int]*benchDicts) + benchDictsLock sync.Mutex + benchDictsByRefMap = make(map[int]*benchDicts) + benchDictsByRefLock sync.Mutex +) func newBenchDicts(level int) *benchDicts { + return createNewBenchDicts(NewCDictLevel, NewDDict, level) +} + +func newBenchDictsByRef(level int) *benchDicts { + return createNewBenchDicts(NewCDictLevelByRef, NewDDictByRef, level) +} + +// Make it easier to toggle between copying the underlying bytes on creation +// vs. sharing by reference. +type ( + cdictFactory func(dict []byte, level int) (*CDict, error) + ddictFactory func(dict []byte) (*DDict, error) +) + +func createNewBenchDicts(createCDict cdictFactory, createDDict ddictFactory, level int) *benchDicts { var samples [][]byte for i := 0; i < 300; i++ { sampleLen := rand.Intn(300) @@ -106,11 +170,11 @@ func newBenchDicts(level int) *benchDicts { } dict := BuildDict(samples, 32*1024) - cd, err := NewCDictLevel(dict, level) + cd, err := createCDict(dict, level) if err != nil { panic(fmt.Errorf("cannot create CDict: %s", err)) } - dd, err := NewDDict(dict) + dd, err := createDDict(dict) if err != nil { panic(fmt.Errorf("cannot create DDict: %s", err)) }