Skip to content

Commit

Permalink
Minor doc update, near cache calc update (#100)
Browse files Browse the repository at this point in the history
* Minor doc update, near cache calc update
* Updates to resolver
  • Loading branch information
tmiddlet2666 authored Dec 8, 2024
1 parent a2705c6 commit a2ce8e1
Show file tree
Hide file tree
Showing 13 changed files with 178 additions and 32 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ generate-proto-v1: $(TOOLS_BIN)/protoc ## Generate Proto Files v1
# ----------------------------------------------------------------------------------------------------------------------
.PHONY: show-docs
show-docs: ## Show the Documentation
@echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/"
@echo "Serving documentation on http://localhost:6060/pkg/github.com/oracle/coherence-go-client/v2"
go install golang.org/x/tools/cmd/godoc@latest
godoc -goroot $(GOROOT) -http=:6060

Expand Down Expand Up @@ -421,7 +421,7 @@ getcopyright: ## Download copyright jar locally if necessary.
$(TOOLS_BIN)/protoc:
@mkdir -p $(TOOLS_BIN)
./scripts/download-protoc.sh $(TOOLS_DIRECTORY)
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.30.0
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.33.0
go install google.golang.org/grpc/cmd/[email protected]


Expand Down
3 changes: 3 additions & 0 deletions coherence/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ const (
// envResolverDebug enables resolver debug messages to be displayed.
envResolverDebug = "COHERENCE_RESOLVER_DEBUG"

// envResolverDebug sets the number of retries when the resolver fails.
envResolverRetries = "COHERENCE_RESOLVER_RETRIES"

// envResolverDebug enables randomization of addresses returned by resolver
envResolverRandomize = "COHERENCE_RESOLVER_RANDOMIZE"

Expand Down
32 changes: 27 additions & 5 deletions coherence/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const (
// MapEventType describes an event raised by a cache mutation.
type MapEventType string

// MapLifecycleEventType describes an event that may be raised during the lifecycle
// MapLifecycleEventType describes an event type that may be raised during the lifecycle
// of a cache.
type MapLifecycleEventType string

Expand Down Expand Up @@ -159,8 +159,13 @@ func (se *sessionLifecycleEvent) String() string {
return fmt.Sprintf("SessionLifecycleEvent{source=%v, format=%s}", se.Source(), se.Type())
}

// MapLifecycleEvent describes an event that may be raised during the lifecycle
// of a cache.
type MapLifecycleEvent[K comparable, V any] interface {
// Source returns the source of this MapLifecycleEvent.
Source() NamedMap[K, V]

// Type returns the MapLifecycleEventType for this MapLifecycleEvent.
Type() MapLifecycleEventType
}

Expand All @@ -186,22 +191,39 @@ func (l *mapLifecycleEvent[K, V]) Source() NamedMap[K, V] {
return l.source
}

// String returns a string representation of a MapLifecycleEvent.
// String returns a string representation of a [MapLifecycleEvent].
func (l *mapLifecycleEvent[K, V]) String() string {
return fmt.Sprintf("MapLifecycleEvent{source=%v, type=%s}", l.Source().GetCacheName(), l.Type())
}

// MapEvent an event which indicates that the content of the NamedMap or
// NamedCache has changed (i.e., an entry has been added, updated, and/or
// MapEvent an event which indicates that the content of the [NamedMap] or
// [NamedCache] has changed (i.e., an entry has been added, updated, and/or
// removed).
type MapEvent[K comparable, V any] interface {
// Source returns the source of this MapEvent.
Source() NamedMap[K, V]

// Key returns the key of the entry for which this event was raised.
Key() (*K, error)

// OldValue returns the old value, if any, of the entry for which this event
// was raised.
OldValue() (*V, error)

// NewValue returns the new value, if any, of the entry for which this event
// was raised.
NewValue() (*V, error)

// Type returns the MapEventType for this MapEvent.
Type() MapEventType

// IsExpired returns true if the event was generated from an expiry event. Only valid for gRPC v1 connections.
IsExpired() (bool, error)

// IsPriming returns true if the event is a priming event. Only valid for gRPC v1 connections.
IsPriming() (bool, error)

// IsSynthetic returns true if the event is a synthetic event. Only valid for gRPC v1 connections.
IsSynthetic() (bool, error)
}

Expand Down Expand Up @@ -1197,7 +1219,7 @@ func reRegisterListeners[K comparable, V any](ctx context.Context, namedMap *Nam
bc.filterListenersV1 = make(map[filters.Filter]*listenerGroupV1[K, V], 0)
bc.filterIDToGroupV1 = make(map[int64]*listenerGroupV1[K, V], 0)

// re-ensure all the caches as the connected has gone and so has the gRPC Proxy
// re-ensure all the caches as the connection has gone and so has the gRPC Proxy
for _, c := range cacheNames {
cacheID, err3 := bc.session.v1StreamManagerCache.ensureCache(context.Background(), c)
if err3 != nil {
Expand Down
5 changes: 3 additions & 2 deletions coherence/localcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type localCache[K comparable, V any] interface {
GetStats() CacheStats
}

// CacheStats defines various statics for near caches.
// CacheStats contains various statistics for near caches.
type CacheStats interface {
GetCacheHits() int64 // the number of entries served from the near cache
GetCacheMisses() int64 // the number of entries that had to be retrieved from the cluster
Expand Down Expand Up @@ -479,7 +479,8 @@ func (l *localCacheImpl[K, V]) String() string {
// updateEntrySize updates the cacheMemory size based upon a local entry. The sign indicates to either remove or add.
func (l *localCacheImpl[K, V]) updateEntrySize(entry *localCacheEntry[K, V], sign int) {
l.updateCacheMemory(int64(sign)*(int64(unsafe.Sizeof(entry.key))+int64(unsafe.Sizeof(entry.value))+
(int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) + (int64(unsafe.Sizeof(entry.lastAccess))))
(int64(unsafe.Sizeof(entry.ttl)))+(int64(unsafe.Sizeof(entry.insertTime)))) +
(int64(unsafe.Sizeof(entry.lastAccess))) + int64(unsafe.Sizeof(entry)))
}

func formatMemory(bytesValue int64) string {
Expand Down
11 changes: 9 additions & 2 deletions coherence/queue_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ func (l *queueLifecycleEvent[V]) String() string {
// QueueLifecycleListener allows registering callbacks to be notified when lifecycle events
// (truncated, released or destroyed) occur against a [NamedQueue].
type QueueLifecycleListener[V any] interface {
// OnAny registers a callback that will be notified when any [NamedQueue] event occurs.
OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

// OnDestroyed registers a callback that will be notified when a [NamedQueue] is destroyed.
OnDestroyed(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

// OnTruncated registers a callback that will be notified when a [Queue] is truncated.
OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]

// OnReleased registers a callback that will be notified when a [NamedQueue] is released.
OnReleased(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V]
getEmitter() *eventEmitter[QueueLifecycleEventType, QueueLifecycleEvent[V]]
}
Expand Down Expand Up @@ -89,7 +96,7 @@ func (q *queueLifecycleListener[V]) OnReleased(callback func(QueueLifecycleEvent
return q.on(QueueReleased, callback)
}

// OnTruncated registers a callback that will be notified when a [NamedMap] is truncated.
// OnTruncated registers a callback that will be notified when a [Queue] is truncated.
func (q *queueLifecycleListener[V]) OnTruncated(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] {
return q.on(QueueTruncated, callback)
}
Expand All @@ -98,7 +105,7 @@ func (q *queueLifecycleListener[V]) getEmitter() *eventEmitter[QueueLifecycleEve
return q.emitter
}

// OnAny registers a callback that will be notified when any [NamedMap] event occurs.
// OnAny registers a callback that will be notified when any [NamedQueue] event occurs.
func (q *queueLifecycleListener[V]) OnAny(callback func(QueueLifecycleEvent[V])) QueueLifecycleListener[V] {
return q.OnTruncated(callback).OnDestroyed(callback).OnReleased(callback)
}
Expand Down
32 changes: 23 additions & 9 deletions coherence/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ import (
"github.com/oracle/coherence-go-client/v2/coherence/discovery"
"google.golang.org/grpc/resolver"
"math/rand"
"strconv"
"strings"
"sync"
"time"
)

const (
nsLookupScheme = "coherence"
nsLookupScheme = "coherence"
defaultRetries = 20
defaultResolverDelay = 1000 // ms
)

var (
Expand All @@ -42,16 +45,27 @@ func (b *nsLookupResolverBuilder) Build(target resolver.Target, cc resolver.Clie
}
checkResolverDebug()

// set the number of resolver retried
retries := getStringValueFromEnvVarOrDefault(envResolverRetries, "20")
retriesValue, err := strconv.Atoi(retries)
if err != nil {
retriesValue = defaultRetries
}

resolverDebug("resolver retries=%v", retriesValue)
r.resolverRetries = retriesValue

r.start()
return r, nil
}
func (*nsLookupResolverBuilder) Scheme() string { return nsLookupScheme }

type nsLookupResolver struct {
target resolver.Target
cc resolver.ClientConn
mutex sync.Mutex
addrStore map[string][]string
target resolver.Target
cc resolver.ClientConn
mutex sync.Mutex
addrStore map[string][]string
resolverRetries int
}

func (r *nsLookupResolver) resolve() {
Expand All @@ -60,10 +74,10 @@ func (r *nsLookupResolver) resolve() {
defer r.mutex.Unlock()

if len(grpcEndpoints) == 0 {
// try 8 times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
for i := 0; i < 8; i++ {
resolverDebug("retrying NSLookup attempt", i)
time.Sleep(time.Duration(250) * time.Millisecond)
// try r.resolverRetries; times over 2 seconds to get gRPC addresses as we may be in the middle of fail-over
for i := 1; i <= r.resolverRetries; i++ {
resolverDebug("retrying NSLookup attempt: %v", i)
time.Sleep(time.Duration(defaultResolverDelay) * time.Millisecond)
grpcEndpoints = generateNSAddresses(r.target.Endpoint())
if len(grpcEndpoints) != 0 {
break
Expand Down
8 changes: 7 additions & 1 deletion coherence/serializers.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
Expand Down Expand Up @@ -28,8 +28,13 @@ type mathValue[T any] struct {

// Serializer defines how to serialize/ de-serialize objects.
type Serializer[T any] interface {
// Serialize serializes an object of type T and returns the []byte representation.
Serialize(object T) ([]byte, error)

// Deserialize deserialized an object and returns the correct type of T.
Deserialize(data []byte) (*T, error)

// Format returns the format used for the serializer.
Format() string
}

Expand Down Expand Up @@ -111,6 +116,7 @@ func (s JSONSerializer[T]) Deserialize(data []byte) (*T, error) {
return &zeroValue, fmt.Errorf("invalid serialization prefix %v", data[0])
}

// Format returns the format used for the serializer.
func (s JSONSerializer[T]) Format() string {
return s.format
}
16 changes: 12 additions & 4 deletions coherence/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
"log"
"os"
"reflect"
Expand Down Expand Up @@ -510,7 +511,14 @@ func (s *Session) ensureConnection() error {
s.v1StreamManagerCache = manager
apiMessage = fmt.Sprintf(" %v", manager)
} else {
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
// check if this is a gRPC status error
if sts, ok := status.FromError(err1); ok {
if sts.Message() == "Method not found: coherence.proxy.v1.ProxyService/subChannel" {
s.debug("error connecting to session via v1, falling back to v0: %v", err1)
} else {
s.debug("received a different gRPC error: %v", err1)
}
}
}

logMessage(INFO, "Session [%s] connected to [%s]%s", s.sessionID, s.sessOpts.Address, apiMessage)
Expand Down Expand Up @@ -547,14 +555,14 @@ func (s *Session) ensureConnection() error {
return
}

if newState == connectivity.Ready || newState == connectivity.Idle {
if newState == connectivity.Ready {
if !firstConnect && !connected {
// Reconnect
// Reconnected
disconnectTime = 0
session.closed = false
connected = true

logMessage(INFO, "Session [%s] re-connected to address %s", session.sessionID, session.sessOpts.Address)
logMessage(INFO, "Session [%s] re-connected to address %s (%v)", session.sessionID, session.sessOpts.Address, newState)
session.dispatch(Reconnected, func() SessionLifecycleEvent {
return newSessionLifecycleEvent(session, Reconnected)
})
Expand Down
5 changes: 4 additions & 1 deletion coherence/session_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
* Copyright (c) 2022, 2024 Oracle and/or its affiliates.
* Licensed under the Universal Permissive License v 1.0 as shown at
* https://oss.oracle.com/licenses/upl.
*/
Expand All @@ -9,6 +9,7 @@ package coherence
import (
"context"
"github.com/onsi/gomega"
"os"
"strconv"
"testing"
"time"
Expand All @@ -21,6 +22,8 @@ func TestSessionValidation(t *testing.T) {
ctx = context.Background()
)

os.Setenv("COHERENCE_SESSION_DEBUG", "true")

_, err = NewSession(ctx, WithFormat("not-json"))
g.Expect(err).To(gomega.Equal(ErrInvalidFormat))

Expand Down
3 changes: 3 additions & 0 deletions coherence/v1client.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,19 +843,22 @@ func (m *streamManagerV1) putGenericRequest(ctx context.Context, reqType pb1.Nam
return unwrapBytes(result)
}

// BinaryKeyAndValue is an internal type exported only for serialization.
type BinaryKeyAndValue struct {
Key []byte
Value []byte
Err error
Cookie []byte
}

// BinaryKey is an internal type exported only for serialization.
type BinaryKey struct {
Key []byte
Err error
Cookie []byte
}

// BinaryValue is an internal type exported only for serialization.
type BinaryValue struct {
Value []byte
Err error
Expand Down
5 changes: 1 addition & 4 deletions java/coherence-go-queues/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,7 @@
<profile>
<id>javax</id>
<activation>
<!-- This is a work-around for the fact that activeByDefault does not do what you'd think it should -->
<file>
<exists>.</exists>
</file>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
Expand Down
19 changes: 19 additions & 0 deletions scripts/run-checkin-test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

#
# Copyright (c) 2022, 2024 Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at
# https://oss.oracle.com/licenses/upl.
#

# This script runs some tests that should succeed to be sure we can push
set -e

echo "Coherence CE 24.09 All Tests gRPC v1"
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone

echo "Coherence CE 24.09 with queues"
COHERENCE_BASE_IMAGE=gcr.io/distroless/java17 PROFILES=,jakarta,-javax,queues COHERENCE_VERSION=24.09 make clean generate-proto generate-proto-v1 build-test-images test-e2e-standalone-queues

echo "Coherence CE 22.06.10"
COHERENCE_VERSION=22.06.10 PROFILES=,-jakarta,javax make clean generate-proto build-test-images test-e2e-standalone
Loading

0 comments on commit a2ce8e1

Please sign in to comment.