diff --git a/ikv-go-client/client.go b/ikv-go-client/client.go index 0ba1b0f..3d6b005 100644 --- a/ikv-go-client/client.go +++ b/ikv-go-client/client.go @@ -27,6 +27,17 @@ type IKVReader interface { // There can be a very small delay between writing document (with IKVWriter) and reading them. GetBytesValue(primaryKey interface{}, fieldname string) (bool, []byte, error) + // Multi-get version of GetBytesValue (multiple primary-keys and multiple fields). + // + // Args: + // primaryKeys - documents to fetch fields for, of type string or []byte, nil not allowed + // fieldNames - fields to fetch as a slice of string + // + // Returns field values as slice of byte slices, in document order, i.e: + // [doc0-field0][doc0-field1]..[doc0-fieldN][doc1-field0][doc1-field1]...[docN-fieldN] + // The inner byte slices will be nil ([]byte(nil)) if the field does not exist for the document. + MultiGetBytesValues(primaryKeys []interface{}, fieldNames []string) ([][]byte, error) + // Fetch an inner field of type string, by providing the primary-key // for the document and the field-name. // diff --git a/ikv-go-client/cmd/integ_test.go b/ikv-go-client/cmd/integ_test.go index a7304b1..0de58e8 100644 --- a/ikv-go-client/cmd/integ_test.go +++ b/ikv-go-client/cmd/integ_test.go @@ -12,6 +12,8 @@ import ( // Tests a GRPC hello-world call. func TestGrpcHelloWorldCall(t *testing.T) { + t.Skip("ignore-test") + clientOptions, _ := ikvclient.NewClientOptionsBuilder().WithAccountId("foo").WithAccountPasskey("bar").WithStoreName("baz").Build() writer, _ := ikvclient.NewDefaultIKVWriter(&clientOptions) @@ -55,14 +57,12 @@ func TestSingleSetGet(t *testing.T) { // Create and upsert a document - /* - document, err := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_1").PutStringField("firstname", "Alice").Build() - assert.Equal(t, err, nil) - err = writer.UpsertFields(&document) - assert.Equal(t, err, nil) - */ + document, err := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_1").PutStringField("firstname", "Alice").Build() + assert.Equal(t, err, nil) + err = writer.UpsertFields(&document) + assert.Equal(t, err, nil) - //time.Sleep(10 * time.Second) + time.Sleep(10 * time.Second) // read fields 10M times for i := 0; i < 10000000; i++ { @@ -81,6 +81,76 @@ func TestSingleSetGet(t *testing.T) { assert.Equal(t, writer.Shutdown(), nil) } +func TestMultiGet(t *testing.T) { + t.Skip("ignore-test") + + accountid := "foo" + accountpasskey := "bar" + + // initialize writer and reader + factory := ikvclient.IKVClientFactory{} + clientOptions, _ := ikvclient.NewClientOptionsBuilder().WithAccountId(accountid).WithAccountPasskey(accountpasskey).WithMountDirectory("/tmp/GoIntegTestStore").WithStoreName("testing-store").Build() + reader, _ := factory.CreateNewReader(&clientOptions) + defer reader.Shutdown() + writer, _ := factory.CreateNewWriter(&clientOptions) + defer writer.Shutdown() + assert.Equal(t, reader.Startup(), nil) + assert.Equal(t, writer.Startup(), nil) + + // upsert {"userid": "id_0", "firstname": "Alice"} + // upsert {"userid": "id_1", "firstname": "Bob"} + // upsert {"userid": "id_2"} + doc0, _ := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_0").PutStringField("firstname", "Alice").Build() + doc1, _ := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_1").PutStringField("firstname", "Bob").Build() + doc2, _ := ikvclient.NewIKVDocumentBuilder().PutStringField("userid", "id_2").Build() + + // delete to clear + if err := writer.DeleteDocument(&doc0); err != nil { + assert.Equal(t, err, nil) + } + if err := writer.DeleteDocument(&doc1); err != nil { + assert.Equal(t, err, nil) + } + if err := writer.DeleteDocument(&doc2); err != nil { + assert.Equal(t, err, nil) + } + + // upsert + if err := writer.UpsertFields(&doc0); err != nil { + assert.Equal(t, err, nil) + } + if err := writer.UpsertFields(&doc1); err != nil { + assert.Equal(t, err, nil) + } + if err := writer.UpsertFields(&doc2); err != nil { + assert.Equal(t, err, nil) + } + + time.Sleep(5 * time.Second) + + results, err := reader.MultiGetBytesValues([]interface{}{"id_1", "id_0", "foo", "id_2"}, []string{"firstname", "bar"}) + assert.Equal(t, err, nil) + + // reads + assert.Equal(t, string(results[0]), "Bob") + assert.Nil(t, results[1]) + assert.Equal(t, string(results[2]), "Alice") + assert.Nil(t, results[3]) + // invalid doc + assert.Nil(t, results[4]) + assert.Nil(t, results[5]) + // both fields don't exist + assert.Nil(t, results[6]) + assert.Nil(t, results[7]) + + // no fields - empty results + results, _ = reader.MultiGetBytesValues([]interface{}{"id_1"}, []string{}) + assert.Equal(t, 0, len(results)) + + _, err = reader.MultiGetBytesValues([]interface{}{"id_1", []byte(nil)}, []string{"firstname"}) + assert.NotNil(t, err, nil) +} + func TestUpsertAndDelete(t *testing.T) { t.Skip("ignore-test") diff --git a/ikv-go-client/native_reader_v2.go b/ikv-go-client/native_reader_v2.go index f680f78..8542c94 100644 --- a/ikv-go-client/native_reader_v2.go +++ b/ikv-go-client/native_reader_v2.go @@ -19,6 +19,7 @@ int64_t health_check(const char *input); int64_t open_index(const char *config, int32_t config_len); void close_index(int64_t handle); BytesBuffer get_field_value(int64_t handle, const char *pkey, int32_t pkey_len, const char *field_name); +BytesBuffer multiget_field_values(int64_t handle, const char *pkeys, int32_t pkeys_len, const char *field_names, int32_t field_names_len); void free_bytes_buffer(BytesBuffer buf); // End of common C code (Go, Python) @@ -50,6 +51,13 @@ BytesBuffer go_get_field_value(void* f, int64_t handle, const char *pkey, int32_ return ((go_get_field_value_type) f)(handle, pkey, pkey_len, field_name); } +// function pointer type +typedef BytesBuffer (*go_multiget_field_values_type)(int64_t, const char*, int32_t, const char*, int32_t); +// wrapper function +BytesBuffer go_multiget_field_values(void* f, int64_t handle, const char *pkeys, int32_t pkeys_len, const char *field_names, int32_t field_names_len) { + return ((go_multiget_field_values_type) f)(handle, pkeys, pkeys_len, field_names, field_names_len); +} + // function pointer type typedef void (*go_free_bytes_buffer_type)(BytesBuffer); // wrapper function @@ -60,6 +68,8 @@ void go_free_bytes_buffer(void* f, BytesBuffer buffer) { */ import "C" import ( + "bytes" + "encoding/binary" "errors" "fmt" "unsafe" @@ -73,11 +83,12 @@ type NativeReaderV2 struct { dll_path_cstr unsafe.Pointer // native function pointers - health_check_fptr unsafe.Pointer - open_index_fptr unsafe.Pointer - close_index_fptr unsafe.Pointer - get_field_value_fptr unsafe.Pointer - free_bytes_buffer_fptr unsafe.Pointer + health_check_fptr unsafe.Pointer + open_index_fptr unsafe.Pointer + close_index_fptr unsafe.Pointer + get_field_value_fptr unsafe.Pointer + multiget_field_values_fptr unsafe.Pointer + free_bytes_buffer_fptr unsafe.Pointer } func NewNativeReaderV2(dllPath string) (*NativeReaderV2, error) { @@ -101,6 +112,7 @@ func NewNativeReaderV2(dllPath string) (*NativeReaderV2, error) { nr.open_index_fptr = C.dlsym(dllhandle, C.CString("open_index")) nr.close_index_fptr = C.dlsym(dllhandle, C.CString("close_index")) nr.get_field_value_fptr = C.dlsym(dllhandle, C.CString("get_field_value")) + nr.multiget_field_values_fptr = C.dlsym(dllhandle, C.CString("multiget_field_values")) nr.free_bytes_buffer_fptr = C.dlsym(dllhandle, C.CString("free_bytes_buffer")) return nr, nil @@ -169,3 +181,79 @@ func (nr *NativeReaderV2) GetFieldValue(primaryKey []byte, fieldName string) []b copy(result, src) return result } + +func (nr *NativeReaderV2) MultiGetFieldValues(numPrimaryKeys int32, sizePrefixedPrimaryKeys []byte, fieldNames []string) ([][]byte, error) { + if numPrimaryKeys == 0 || len(fieldNames) == 0 { + return make([][]byte, 0), nil + } + + // concatenate field names by size prefixing them + var capacity = 0 + for _, fieldName := range fieldNames { + capacity += 4 + len(fieldName) + } + buff := bytes.NewBuffer(make([]byte, 0, capacity)) + + for _, fieldName := range fieldNames { + value := []byte(fieldName) + binary.Write(buff, binary.LittleEndian, int32(len(value))) + buff.Write([]byte(fieldName)) + } + var sizePrefixedFieldNames = buff.Bytes() + + // c bytes + primaryKeys_cbytes := C.CBytes(sizePrefixedPrimaryKeys) + defer C.free(unsafe.Pointer(primaryKeys_cbytes)) + fieldNames_cbytes := C.CBytes(sizePrefixedFieldNames) + defer C.free(unsafe.Pointer(fieldNames_cbytes)) + + // make FFI call + var bb C.BytesBuffer = C.go_multiget_field_values( + nr.multiget_field_values_fptr, + C.int64_t(nr.handle), + (*C.char)(unsafe.Pointer(primaryKeys_cbytes)), + C.int32_t(len(sizePrefixedPrimaryKeys)), + (*C.char)(unsafe.Pointer(fieldNames_cbytes)), + C.int32_t(len(sizePrefixedFieldNames)), + ) + + expectedNumResults := numPrimaryKeys * int32(len(fieldNames)) + + length := int32(bb.length) + if length == 0 || bb.start == nil { + return make([][]byte, expectedNumResults), errors.New("unreachable - should only occur if primarykeys or fieldnames are empty") + } + + // only need to free for non empty response bb + defer C.go_free_bytes_buffer(nr.free_bytes_buffer_fptr, bb) + + src := unsafe.Slice((*byte)(bb.start), length) + + // unpack size prefixed results in `result` to [][]byte + results := make([][]byte, expectedNumResults) + var resultId int32 = 0 + + bufReader := bytes.NewReader(src) + + for resultId < expectedNumResults { + var size int32 + if err := binary.Read(bufReader, binary.LittleEndian, &size); err != nil { + return make([][]byte, expectedNumResults), err + } + + if size == -1 { + results[resultId] = []byte(nil) + } else if size == 0 { + results[resultId] = make([]byte, 0) + } else { + results[resultId] = make([]byte, size) + if _, err := bufReader.Read(results[resultId]); err != nil { + return make([][]byte, expectedNumResults), err + } + } + + resultId++ + } + + return results, nil +} diff --git a/ikv-go-client/reader.go b/ikv-go-client/reader.go index e3fcb81..237f637 100644 --- a/ikv-go-client/reader.go +++ b/ikv-go-client/reader.go @@ -1,6 +1,8 @@ package ikvclient import ( + "bytes" + "encoding/binary" "errors" "fmt" @@ -88,26 +90,80 @@ func (reader *DefaultIKVReader) HealthCheck() (bool, error) { return reader.native_reader.HealthCheck("healthcheck") } -func (reader *DefaultIKVReader) GetBytesValue(key interface{}, fieldname string) (bool, []byte, error) { +func (reader *DefaultIKVReader) GetBytesValue(primaryKey interface{}, fieldname string) (bool, []byte, error) { var nullable_value []byte - switch primaryKey := key.(type) { + switch typedPrimaryKey := primaryKey.(type) { case string: nullable_value = reader.native_reader.GetFieldValue( - []byte(primaryKey), + []byte(typedPrimaryKey), fieldname) case []byte: + if typedPrimaryKey == nil { + return false, nil, errors.New("primaryKey can only be a string or []byte") + } nullable_value = reader.native_reader.GetFieldValue( - primaryKey, + typedPrimaryKey, fieldname) default: - return false, nil, errors.New("key can only be a string or []byte") + // also handles the case where primaryKey == nil + return false, nil, errors.New("primaryKey can only be a string or []byte") } return nullable_value != nil, nullable_value, nil } -func (reader *DefaultIKVReader) GetStringValue(key interface{}, fieldname string) (bool, string, error) { - exists, bytes_value, err := reader.GetBytesValue(key, fieldname) +func (reader *DefaultIKVReader) MultiGetBytesValues(primaryKeys []interface{}, fieldNames []string) ([][]byte, error) { + if primaryKeys == nil { + return nil, errors.New("primaryKeys slice cannot be nil") + } + + if fieldNames == nil { + return nil, errors.New("fieldNames slice cannot be nil") + } + + if len(primaryKeys) == 0 || len(fieldNames) == 0 { + return make([][]byte, 0), nil + } + + // serialize typed keys + // calculate capacity + var capacity = 0 + for _, primaryKey := range primaryKeys { + switch typedPrimaryKey := primaryKey.(type) { + case string: + capacity += 4 + len(typedPrimaryKey) + case []byte: + if typedPrimaryKey == nil { + return nil, errors.New("primaryKey can only be a string or []byte") + } + capacity += 4 + len(typedPrimaryKey) + default: + // also handles the case where primaryKey == nil + return nil, errors.New("primaryKey can only be a string or []byte") + } + } + sizePrefixedPrimaryKeys := bytes.NewBuffer(make([]byte, 0, capacity)) + + for _, primaryKey := range primaryKeys { + switch typedPrimaryKey := primaryKey.(type) { + case string: + value := []byte(typedPrimaryKey) + binary.Write(sizePrefixedPrimaryKeys, binary.LittleEndian, int32(len(value))) + sizePrefixedPrimaryKeys.Write([]byte(typedPrimaryKey)) + case []byte: + binary.Write(sizePrefixedPrimaryKeys, binary.LittleEndian, int32(len(typedPrimaryKey))) + sizePrefixedPrimaryKeys.Write(typedPrimaryKey) + default: + // also handles the case where primaryKey == nil + return nil, errors.New("primaryKey can only be a string or []byte") + } + } + + return reader.native_reader.MultiGetFieldValues(int32(len(primaryKeys)), sizePrefixedPrimaryKeys.Bytes(), fieldNames) +} + +func (reader *DefaultIKVReader) GetStringValue(primaryKey interface{}, fieldname string) (bool, string, error) { + exists, bytes_value, err := reader.GetBytesValue(primaryKey, fieldname) if !exists || err != nil { return false, EMPTY_STRING, err }