Skip to content

Commit

Permalink
Merge pull request #174 from pushkarmoi/pugupta/multiget-go
Browse files Browse the repository at this point in the history
multi getter apis for go client
  • Loading branch information
pushkarmoi authored Apr 26, 2024
2 parents 2675fe7 + ddd9169 commit d0b91cc
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 19 deletions.
11 changes: 11 additions & 0 deletions ikv-go-client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,17 @@ type IKVReader interface {
// There can be a very small delay between writing document (with IKVWriter) and reading them.
GetBytesValue(primaryKey interface{}, fieldname string) (bool, []byte, error)

// Multi-get version of GetBytesValue (multiple primary-keys and multiple fields).
//
// Args:
// primaryKeys - documents to fetch fields for, of type string or []byte, nil not allowed
// fieldNames - fields to fetch as a slice of string
//
// Returns field values as slice of byte slices, in document order, i.e:
// [doc0-field0][doc0-field1]..[doc0-fieldN][doc1-field0][doc1-field1]...[docN-fieldN]
// The inner byte slices will be nil ([]byte(nil)) if the field does not exist for the document.
MultiGetBytesValues(primaryKeys []interface{}, fieldNames []string) ([][]byte, error)

// Fetch an inner field of type string, by providing the primary-key
// for the document and the field-name.
//
Expand Down
84 changes: 77 additions & 7 deletions ikv-go-client/cmd/integ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

// Tests a GRPC hello-world call.
func TestGrpcHelloWorldCall(t *testing.T) {
t.Skip("ignore-test")

clientOptions, _ := ikvclient.NewClientOptionsBuilder().WithAccountId("foo").WithAccountPasskey("bar").WithStoreName("baz").Build()
writer, _ := ikvclient.NewDefaultIKVWriter(&clientOptions)

Expand Down Expand Up @@ -55,14 +57,12 @@ func TestSingleSetGet(t *testing.T) {

// Create and upsert a document

/*
document, err := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_1").PutStringField("firstname", "Alice").Build()
assert.Equal(t, err, nil)
err = writer.UpsertFields(&document)
assert.Equal(t, err, nil)
*/
document, err := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_1").PutStringField("firstname", "Alice").Build()
assert.Equal(t, err, nil)
err = writer.UpsertFields(&document)
assert.Equal(t, err, nil)

//time.Sleep(10 * time.Second)
time.Sleep(10 * time.Second)

// read fields 10M times
for i := 0; i < 10000000; i++ {
Expand All @@ -81,6 +81,76 @@ func TestSingleSetGet(t *testing.T) {
assert.Equal(t, writer.Shutdown(), nil)
}

func TestMultiGet(t *testing.T) {
t.Skip("ignore-test")

accountid := "foo"
accountpasskey := "bar"

// initialize writer and reader
factory := ikvclient.IKVClientFactory{}
clientOptions, _ := ikvclient.NewClientOptionsBuilder().WithAccountId(accountid).WithAccountPasskey(accountpasskey).WithMountDirectory("/tmp/GoIntegTestStore").WithStoreName("testing-store").Build()
reader, _ := factory.CreateNewReader(&clientOptions)
defer reader.Shutdown()
writer, _ := factory.CreateNewWriter(&clientOptions)
defer writer.Shutdown()
assert.Equal(t, reader.Startup(), nil)
assert.Equal(t, writer.Startup(), nil)

// upsert {"userid": "id_0", "firstname": "Alice"}
// upsert {"userid": "id_1", "firstname": "Bob"}
// upsert {"userid": "id_2"}
doc0, _ := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_0").PutStringField("firstname", "Alice").Build()
doc1, _ := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_1").PutStringField("firstname", "Bob").Build()
doc2, _ := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_2").Build()

// delete to clear
if err := writer.DeleteDocument(&doc0); err != nil {
assert.Equal(t, err, nil)
}
if err := writer.DeleteDocument(&doc1); err != nil {
assert.Equal(t, err, nil)
}
if err := writer.DeleteDocument(&doc2); err != nil {
assert.Equal(t, err, nil)
}

// upsert
if err := writer.UpsertFields(&doc0); err != nil {
assert.Equal(t, err, nil)
}
if err := writer.UpsertFields(&doc1); err != nil {
assert.Equal(t, err, nil)
}
if err := writer.UpsertFields(&doc2); err != nil {
assert.Equal(t, err, nil)
}

time.Sleep(5 * time.Second)

results, err := reader.MultiGetBytesValues([]interface{}{"id_1", "id_0", "foo", "id_2"}, []string{"firstname", "bar"})
assert.Equal(t, err, nil)

// reads
assert.Equal(t, string(results[0]), "Bob")
assert.Nil(t, results[1])
assert.Equal(t, string(results[2]), "Alice")
assert.Nil(t, results[3])
// invalid doc
assert.Nil(t, results[4])
assert.Nil(t, results[5])
// both fields don't exist
assert.Nil(t, results[6])
assert.Nil(t, results[7])

// no fields - empty results
results, _ = reader.MultiGetBytesValues([]interface{}{"id_1"}, []string{})
assert.Equal(t, 0, len(results))

_, err = reader.MultiGetBytesValues([]interface{}{"id_1", []byte(nil)}, []string{"firstname"})
assert.NotNil(t, err, nil)
}

func TestUpsertAndDelete(t *testing.T) {
t.Skip("ignore-test")

Expand Down
98 changes: 93 additions & 5 deletions ikv-go-client/native_reader_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ int64_t health_check(const char *input);
int64_t open_index(const char *config, int32_t config_len);
void close_index(int64_t handle);
BytesBuffer get_field_value(int64_t handle, const char *pkey, int32_t pkey_len, const char *field_name);
BytesBuffer multiget_field_values(int64_t handle, const char *pkeys, int32_t pkeys_len, const char *field_names, int32_t field_names_len);
void free_bytes_buffer(BytesBuffer buf);
// End of common C code (Go, Python)
Expand Down Expand Up @@ -50,6 +51,13 @@ BytesBuffer go_get_field_value(void* f, int64_t handle, const char *pkey, int32_
return ((go_get_field_value_type) f)(handle, pkey, pkey_len, field_name);
}
// function pointer type
typedef BytesBuffer (*go_multiget_field_values_type)(int64_t, const char*, int32_t, const char*, int32_t);
// wrapper function
BytesBuffer go_multiget_field_values(void* f, int64_t handle, const char *pkeys, int32_t pkeys_len, const char *field_names, int32_t field_names_len) {
return ((go_multiget_field_values_type) f)(handle, pkeys, pkeys_len, field_names, field_names_len);
}
// function pointer type
typedef void (*go_free_bytes_buffer_type)(BytesBuffer);
// wrapper function
Expand All @@ -60,6 +68,8 @@ void go_free_bytes_buffer(void* f, BytesBuffer buffer) {
*/
import "C"
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"unsafe"
Expand All @@ -73,11 +83,12 @@ type NativeReaderV2 struct {
dll_path_cstr unsafe.Pointer

// native function pointers
health_check_fptr unsafe.Pointer
open_index_fptr unsafe.Pointer
close_index_fptr unsafe.Pointer
get_field_value_fptr unsafe.Pointer
free_bytes_buffer_fptr unsafe.Pointer
health_check_fptr unsafe.Pointer
open_index_fptr unsafe.Pointer
close_index_fptr unsafe.Pointer
get_field_value_fptr unsafe.Pointer
multiget_field_values_fptr unsafe.Pointer
free_bytes_buffer_fptr unsafe.Pointer
}

func NewNativeReaderV2(dllPath string) (*NativeReaderV2, error) {
Expand All @@ -101,6 +112,7 @@ func NewNativeReaderV2(dllPath string) (*NativeReaderV2, error) {
nr.open_index_fptr = C.dlsym(dllhandle, C.CString("open_index"))
nr.close_index_fptr = C.dlsym(dllhandle, C.CString("close_index"))
nr.get_field_value_fptr = C.dlsym(dllhandle, C.CString("get_field_value"))
nr.multiget_field_values_fptr = C.dlsym(dllhandle, C.CString("multiget_field_values"))
nr.free_bytes_buffer_fptr = C.dlsym(dllhandle, C.CString("free_bytes_buffer"))

return nr, nil
Expand Down Expand Up @@ -169,3 +181,79 @@ func (nr *NativeReaderV2) GetFieldValue(primaryKey []byte, fieldName string) []b
copy(result, src)
return result
}

func (nr *NativeReaderV2) MultiGetFieldValues(numPrimaryKeys int32, sizePrefixedPrimaryKeys []byte, fieldNames []string) ([][]byte, error) {
if numPrimaryKeys == 0 || len(fieldNames) == 0 {
return make([][]byte, 0), nil
}

// concatenate field names by size prefixing them
var capacity = 0
for _, fieldName := range fieldNames {
capacity += 4 + len(fieldName)
}
buff := bytes.NewBuffer(make([]byte, 0, capacity))

for _, fieldName := range fieldNames {
value := []byte(fieldName)
binary.Write(buff, binary.LittleEndian, int32(len(value)))
buff.Write([]byte(fieldName))
}
var sizePrefixedFieldNames = buff.Bytes()

// c bytes
primaryKeys_cbytes := C.CBytes(sizePrefixedPrimaryKeys)
defer C.free(unsafe.Pointer(primaryKeys_cbytes))
fieldNames_cbytes := C.CBytes(sizePrefixedFieldNames)
defer C.free(unsafe.Pointer(fieldNames_cbytes))

// make FFI call
var bb C.BytesBuffer = C.go_multiget_field_values(
nr.multiget_field_values_fptr,
C.int64_t(nr.handle),
(*C.char)(unsafe.Pointer(primaryKeys_cbytes)),
C.int32_t(len(sizePrefixedPrimaryKeys)),
(*C.char)(unsafe.Pointer(fieldNames_cbytes)),
C.int32_t(len(sizePrefixedFieldNames)),
)

expectedNumResults := numPrimaryKeys * int32(len(fieldNames))

length := int32(bb.length)
if length == 0 || bb.start == nil {
return make([][]byte, expectedNumResults), errors.New("unreachable - should only occur if primarykeys or fieldnames are empty")
}

// only need to free for non empty response bb
defer C.go_free_bytes_buffer(nr.free_bytes_buffer_fptr, bb)

src := unsafe.Slice((*byte)(bb.start), length)

// unpack size prefixed results in `result` to [][]byte
results := make([][]byte, expectedNumResults)
var resultId int32 = 0

bufReader := bytes.NewReader(src)

for resultId < expectedNumResults {
var size int32
if err := binary.Read(bufReader, binary.LittleEndian, &size); err != nil {
return make([][]byte, expectedNumResults), err
}

if size == -1 {
results[resultId] = []byte(nil)
} else if size == 0 {
results[resultId] = make([]byte, 0)
} else {
results[resultId] = make([]byte, size)
if _, err := bufReader.Read(results[resultId]); err != nil {
return make([][]byte, expectedNumResults), err
}
}

resultId++
}

return results, nil
}
70 changes: 63 additions & 7 deletions ikv-go-client/reader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package ikvclient

import (
"bytes"
"encoding/binary"
"errors"
"fmt"

Expand Down Expand Up @@ -88,26 +90,80 @@ func (reader *DefaultIKVReader) HealthCheck() (bool, error) {
return reader.native_reader.HealthCheck("healthcheck")
}

func (reader *DefaultIKVReader) GetBytesValue(key interface{}, fieldname string) (bool, []byte, error) {
func (reader *DefaultIKVReader) GetBytesValue(primaryKey interface{}, fieldname string) (bool, []byte, error) {
var nullable_value []byte
switch primaryKey := key.(type) {
switch typedPrimaryKey := primaryKey.(type) {
case string:
nullable_value = reader.native_reader.GetFieldValue(
[]byte(primaryKey),
[]byte(typedPrimaryKey),
fieldname)
case []byte:
if typedPrimaryKey == nil {
return false, nil, errors.New("primaryKey can only be a string or []byte")
}
nullable_value = reader.native_reader.GetFieldValue(
primaryKey,
typedPrimaryKey,
fieldname)
default:
return false, nil, errors.New("key can only be a string or []byte")
// also handles the case where primaryKey == nil
return false, nil, errors.New("primaryKey can only be a string or []byte")
}

return nullable_value != nil, nullable_value, nil
}

func (reader *DefaultIKVReader) GetStringValue(key interface{}, fieldname string) (bool, string, error) {
exists, bytes_value, err := reader.GetBytesValue(key, fieldname)
func (reader *DefaultIKVReader) MultiGetBytesValues(primaryKeys []interface{}, fieldNames []string) ([][]byte, error) {
if primaryKeys == nil {
return nil, errors.New("primaryKeys slice cannot be nil")
}

if fieldNames == nil {
return nil, errors.New("fieldNames slice cannot be nil")
}

if len(primaryKeys) == 0 || len(fieldNames) == 0 {
return make([][]byte, 0), nil
}

// serialize typed keys
// calculate capacity
var capacity = 0
for _, primaryKey := range primaryKeys {
switch typedPrimaryKey := primaryKey.(type) {
case string:
capacity += 4 + len(typedPrimaryKey)
case []byte:
if typedPrimaryKey == nil {
return nil, errors.New("primaryKey can only be a string or []byte")
}
capacity += 4 + len(typedPrimaryKey)
default:
// also handles the case where primaryKey == nil
return nil, errors.New("primaryKey can only be a string or []byte")
}
}
sizePrefixedPrimaryKeys := bytes.NewBuffer(make([]byte, 0, capacity))

for _, primaryKey := range primaryKeys {
switch typedPrimaryKey := primaryKey.(type) {
case string:
value := []byte(typedPrimaryKey)
binary.Write(sizePrefixedPrimaryKeys, binary.LittleEndian, int32(len(value)))
sizePrefixedPrimaryKeys.Write([]byte(typedPrimaryKey))
case []byte:
binary.Write(sizePrefixedPrimaryKeys, binary.LittleEndian, int32(len(typedPrimaryKey)))
sizePrefixedPrimaryKeys.Write(typedPrimaryKey)
default:
// also handles the case where primaryKey == nil
return nil, errors.New("primaryKey can only be a string or []byte")
}
}

return reader.native_reader.MultiGetFieldValues(int32(len(primaryKeys)), sizePrefixedPrimaryKeys.Bytes(), fieldNames)
}

func (reader *DefaultIKVReader) GetStringValue(primaryKey interface{}, fieldname string) (bool, string, error) {
exists, bytes_value, err := reader.GetBytesValue(primaryKey, fieldname)
if !exists || err != nil {
return false, EMPTY_STRING, err
}
Expand Down

0 comments on commit d0b91cc

Please sign in to comment.