Skip to content

Commit

Permalink
Sync from PG in parallel using an in-memory backpressured buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Feb 28, 2025
1 parent 7a14088 commit 10c637f
Show file tree
Hide file tree
Showing 8 changed files with 571 additions and 41 deletions.
8 changes: 6 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ sync:
test:
devbox run "cd src && go test ./..."

test-function:
devbox run "cd src && go test ./... -run $(FUNC)"

debug:
devbox run "cd src && dlv test github.com/BemiHQ/BemiDB"

Expand Down Expand Up @@ -68,8 +71,9 @@ pg-sniff:

tpch-install:
devbox run "cd benchmark && \
git clone https://github.com/gregrahn/tpch-kit.git
cd tpch-kit/dbgen
rm -rf tpch-kit && \
git clone https://github.com/gregrahn/tpch-kit.git && \
cd tpch-kit/dbgen && \
make MACHINE=$$MACHINE DATABASE=POSTGRESQL"

tpch-generate:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ Primitive data types are mapped as follows:
| `time`, `timetz` | `INT64` (`TIME_MICROS` / `TIME_MILLIS`) | `time` |
| `timestamp` | `INT64` (`TIMESTAMP_MICROS` / `TIMESTAMP_MILLIS`) | `timestamp` / `timestamp_ns` |
| `timestamptz` | `INT64` (`TIMESTAMP_MICROS` / `TIMESTAMP_MILLIS`) | `timestamptz` / `timestamptz_ns` |
| `uuid` | `FIXED_LEN_BYTE_ARRAY` | `uuid` |
| `uuid` | `BYTE_ARRAY` (`UTF8`) | `uuid` |
| `bytea` | `BYTE_ARRAY` (`UTF8`) | `binary` |
| `interval` | `BYTE_ARRAY` (`UTF8`) | `string` |
| `point`, `line`, `lseg`, `box`, `path`, `polygon`, `circle` | `BYTE_ARRAY` (`UTF8`) | `string` |
Expand Down
3 changes: 2 additions & 1 deletion devbox.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
"$schema": "https://raw.githubusercontent.com/jetify-com/devbox/0.13.1/.schema/devbox.schema.json",
"packages": [
"go@latest",
"postgresql@latest"
"postgresql@latest",
"gcc@latest"
],
"shell": {
"init_hook": [],
Expand Down
84 changes: 84 additions & 0 deletions devbox.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,90 @@
{
"lockfile_version": "1",
"packages": {
"gcc@latest": {
"last_modified": "2025-02-07T11:26:36Z",
"resolved": "github:NixOS/nixpkgs/d98abf5cf5914e5e4e9d57205e3af55ca90ffc1d#gcc",
"source": "devbox-search",
"version": "14-20241116",
"systems": {
"aarch64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/3k1dfk03xkmaf6cksgpk492k3m8brvmp-gcc-wrapper-14-20241116",
"default": true
},
{
"name": "man",
"path": "/nix/store/cf67mihrjf3a1w4sw8jkgw49kfi54wpq-gcc-wrapper-14-20241116-man",
"default": true
},
{
"name": "info",
"path": "/nix/store/jgsywypzhdim8s6x25cnr31ygm28lhin-gcc-wrapper-14-20241116-info"
}
],
"store_path": "/nix/store/3k1dfk03xkmaf6cksgpk492k3m8brvmp-gcc-wrapper-14-20241116"
},
"aarch64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/pavcqvq7ycdbpal1mfjsscvgngzsg9sp-gcc-wrapper-14-20241116",
"default": true
},
{
"name": "man",
"path": "/nix/store/srwbca27wpylwzaqz9ssbhlkx910ryv1-gcc-wrapper-14-20241116-man",
"default": true
},
{
"name": "info",
"path": "/nix/store/yrdz1bra65cc9i2n2ghsz98g4fx2jra3-gcc-wrapper-14-20241116-info"
}
],
"store_path": "/nix/store/pavcqvq7ycdbpal1mfjsscvgngzsg9sp-gcc-wrapper-14-20241116"
},
"x86_64-darwin": {
"outputs": [
{
"name": "out",
"path": "/nix/store/g2462k2svl4zn5q7yypqirp6xxq0s9aq-gcc-wrapper-14-20241116",
"default": true
},
{
"name": "man",
"path": "/nix/store/s0vndfpg8gsz40wmf3v11csk3n497kqm-gcc-wrapper-14-20241116-man",
"default": true
},
{
"name": "info",
"path": "/nix/store/3x75ibh7rj95vm09q38iqj8l5jwbrarl-gcc-wrapper-14-20241116-info"
}
],
"store_path": "/nix/store/g2462k2svl4zn5q7yypqirp6xxq0s9aq-gcc-wrapper-14-20241116"
},
"x86_64-linux": {
"outputs": [
{
"name": "out",
"path": "/nix/store/4ijy8jbsiqmj37avrk83gn2m903486mr-gcc-wrapper-14-20241116",
"default": true
},
{
"name": "man",
"path": "/nix/store/vyax7fpbw688qwx32c1i8n1f5jbjkcii-gcc-wrapper-14-20241116-man",
"default": true
},
{
"name": "info",
"path": "/nix/store/m4641rrm7dw80bn23ab0812pm6aj6402-gcc-wrapper-14-20241116-info"
}
],
"store_path": "/nix/store/4ijy8jbsiqmj37avrk83gn2m903486mr-gcc-wrapper-14-20241116"
}
}
},
"github:NixOS/nixpkgs/nixpkgs-unstable": {
"resolved": "github:NixOS/nixpkgs/d9b69c3ec2a2e2e971c534065bdd53374bd68b97?lastModified=1740396192&narHash=sha256-ATMHHrg3sG1KgpQA5x8I%2BzcYpp5Sf17FaFj%2FfN%2B8OoQ%3D"
},
Expand Down
103 changes: 103 additions & 0 deletions src/capped_buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package main

import (
"errors"
"io"
"sync"
)

type CappedBuffer struct {
config *Config
maxSizeBytes int

buffer []byte
mutex sync.Mutex
conditionalSync *sync.Cond

closeOnceSync sync.Once
closed bool
}

func NewCappedBuffer(maxSizeBytes int, config *Config) *CappedBuffer {
sizedBuffer := &CappedBuffer{
config: config,
buffer: make([]byte, 0, maxSizeBytes),
maxSizeBytes: maxSizeBytes,
}
sizedBuffer.conditionalSync = sync.NewCond(&sizedBuffer.mutex)
return sizedBuffer
}

// Implements io.Writer
func (buf *CappedBuffer) Write(payload []byte) (writtenBytes int, err error) {
if len(payload) == 0 {
return 0, nil
}

buf.mutex.Lock()
defer buf.mutex.Unlock()

if buf.closed {
return 0, errors.New("buffer is closed")
}

for len(buf.buffer)+len(payload) > buf.maxSizeBytes && !buf.closed {
LogTrace(buf.config, ">> Waiting for more space in capped buffer...")
buf.conditionalSync.Wait() // Wait for the reader
}

// Check again if buffer was closed while waiting
if buf.closed {
return 0, errors.New("buffer is closed")
}

writtenBytes = len(payload)
buf.buffer = append(buf.buffer, payload...)
LogTrace(buf.config, ">> Writing", writtenBytes, "bytes to capped buffer...")

buf.conditionalSync.Broadcast() // Notify the reader that new data is available

return writtenBytes, nil
}

// Implements io.Reader
func (buf *CappedBuffer) Read(payload []byte) (readBytes int, err error) {
if len(payload) == 0 {
return 0, nil
}

buf.mutex.Lock()
defer buf.mutex.Unlock()

for len(buf.buffer) == 0 && !buf.closed {
LogTrace(buf.config, "<< Waiting for more data in capped buffer...")
buf.conditionalSync.Wait() // Wait for the writer
}

if len(buf.buffer) == 0 && buf.closed {
return 0, io.EOF
}

maxReadBytes := len(payload)
readBytes = copy(payload, buf.buffer)
buf.buffer = buf.buffer[readBytes:]
LogTrace(buf.config, "<< Reading "+IntToString(readBytes)+"/"+IntToString(maxReadBytes)+" bytes from capped buffer...")

buf.conditionalSync.Broadcast() // Notify the writer that space is now available

return readBytes, nil
}

func (buf *CappedBuffer) Close() error {
buf.closeOnceSync.Do(func() {
buf.mutex.Lock()

LogTrace(buf.config, "== Closing capped buffer...")
buf.closed = true

buf.conditionalSync.Broadcast() // Wake up any waiting writers/readers

buf.mutex.Unlock()
})
return nil
}
Loading

0 comments on commit 10c637f

Please sign in to comment.