Skip to content

Commit

Permalink
very draft
Browse files Browse the repository at this point in the history
  • Loading branch information
BarkovBG committed Feb 1, 2025
1 parent 13ef599 commit 22be1a9
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 19 deletions.
11 changes: 7 additions & 4 deletions cloud/tasks/persistence/health.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package persistence

import (
"context"
"time"

"github.com/ydb-platform/nbs/cloud/tasks/metrics"
Expand All @@ -11,18 +12,19 @@ import (
type HealthCheck struct {
queriesCount uint64
successQueriesCount uint64
storage *healthCheckStorage
storage HealthStorage
registry metrics.Registry
metricsCollectionInterval time.Duration
}

func (h *HealthCheck) reportSuccessRate() {
func (h *HealthCheck) reportSuccessRate(ctx context.Context) {
if h.queriesCount == 0 {
h.registry.Gauge("successRate").Set(0)
return
}

h.registry.Gauge("successRate").Set(float64(h.successQueriesCount) / float64(h.queriesCount))
h.storage.HeartbeatNode(ctx, time.Now())
}

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -35,8 +37,9 @@ func (h *HealthCheck) AccountQuery(err error) {
}

func NewHealthCheck(
ctx context.Context,
componentName string,
storage *healthCheckStorage,
storage HealthStorage,
registry metrics.Registry,
) *HealthCheck {

Expand All @@ -55,7 +58,7 @@ func NewHealthCheck(
defer ticker.Stop()

for range ticker.C {
h.reportSuccessRate()
h.reportSuccessRate(ctx)
}
}()

Expand Down
65 changes: 59 additions & 6 deletions cloud/tasks/persistence/health_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@ package persistence

import (
"context"
"fmt"
"time"

"github.com/ydb-platform/nbs/cloud/tasks/logging"
)

// import (
// "time"
// )

////////////////////////////////////////////////////////////////////////////////

func healthCheckTableDescription() CreateTableDescription {
Expand Down Expand Up @@ -42,10 +40,24 @@ func CreateYDBTables(
}

type healthCheckStorage struct {
db *YDBClient
tablesPath string
}

func NewStorage(db *YDBClient) *healthCheckStorage {
return &healthCheckStorage{}
type HealthStorage interface {
HeartbeatNode(
ctx context.Context,
// host string,
ts time.Time,
// value float64,
) error
}

func NewStorage(db *YDBClient) HealthStorage {
return &healthCheckStorage{
db: db,
tablesPath: db.AbsolutePath("test"),
}
}

// type HealthCheck struct {
Expand Down Expand Up @@ -110,3 +122,44 @@ func NewStorage(db *YDBClient) *healthCheckStorage {
////////////////////////////////////////////////////////////////////////////////

// Updates heartbeat timestamp and the current number of inflight tasks.
func (s *healthCheckStorage) heartbeatNode(
ctx context.Context,
session *Session,
// host string,
ts time.Time,
// value float64,
) error {

logging.Debug(
ctx,
"KEK IS",
)

_, err := session.ExecuteRW(ctx, fmt.Sprintf(`
--!syntax_v1
pragma TablePathPrefix = "%v";
declare $component as Utf8;
declare $update_at as Timestamp;
upsert into nodes (component, update_at)
values ($component, $update_at);
`, s.tablesPath),
ValueParam("$host", UTF8Value("BORIS_BG")),
ValueParam("$update_at", TimestampValue(ts)),
)
return err
}

////////////////////////////////////////////////////////////////////////////////

func (s *healthCheckStorage) HeartbeatNode(
ctx context.Context,
// host string,
ts time.Time,
// value float64,
) error {

return s.db.Execute(ctx, func(ctx context.Context, session *Session) error {
return s.heartbeatNode(ctx, session, ts)
})
}
38 changes: 38 additions & 0 deletions cloud/tasks/persistence/health_storage_mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package persistence

import (
"context"
"time"

"github.com/stretchr/testify/mock"
)

////////////////////////////////////////////////////////////////////////////////

type StorageMock struct {
mock.Mock
}

func (s *StorageMock) HeartbeatNode(
ctx context.Context,
// host string,
ts time.Time,
// inflightTaskCount uint32,
) error {

args := s.Called(ctx, ts)
return args.Error(0)
}

////////////////////////////////////////////////////////////////////////////////

func NewStorageMock() *StorageMock {
return &StorageMock{}
}

////////////////////////////////////////////////////////////////////////////////

// // Ensure that StorageMock implements tasks_storage.Storage.
// func assertStorageMockIsStorage(arg *StorageMock) tasks_storage.Storage {
// return arg
// }
10 changes: 7 additions & 3 deletions cloud/tasks/persistence/health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newStorage(
t *testing.T,
ctx context.Context,
db *YDBClient,
) *healthCheckStorage {
) HealthStorage {

err := CreateYDBTables(
ctx,
Expand All @@ -43,10 +43,10 @@ func TestHealthCheckMetric(t *testing.T) {
require.NoError(t, err)
defer db.Close(ctx)

storage := newStorage(t, ctx, db)
storage := NewStorageMock()

registry := mocks.NewRegistryMock()
healthCheck := NewHealthCheck("test", storage, registry)
healthCheck := NewHealthCheck(ctx, "test", storage, registry)

gaugeSetWg := sync.WaitGroup{}

Expand All @@ -59,6 +59,8 @@ func TestHealthCheckMetric(t *testing.T) {
gaugeSetWg.Done()
},
)

storage.On("HeartbeatNode", ctx, mock.Anything).Return(nil)
gaugeSetWg.Wait()

healthCheck.AccountQuery(nil)
Expand All @@ -78,4 +80,6 @@ func TestHealthCheckMetric(t *testing.T) {
gaugeSetWg.Wait()

registry.AssertAllExpectations(t)

require.NotNil(t, nil)
}
8 changes: 7 additions & 1 deletion cloud/tasks/persistence/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,18 @@ type S3Client struct {
}

func NewS3Client(
ctx context.Context,
endpoint string,
region string,
credentials S3Credentials,
callTimeout time.Duration,
s3MetricsRegistry metrics.Registry,
healthCheckStorage HealthStorage,
healthMetricsRegistry metrics.Registry,
maxRetriableErrorCount uint64,
) (*S3Client, error) {

s3Metrics := newS3Metrics(callTimeout, s3MetricsRegistry, healthMetricsRegistry)
s3Metrics := newS3Metrics(ctx, callTimeout, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry)

sessionConfig := &aws.Config{
Credentials: aws_credentials.NewStaticCredentials(
Expand Down Expand Up @@ -90,8 +92,10 @@ func NewS3Client(
}

func NewS3ClientFromConfig(
ctx context.Context,
config *persistence_config.S3Config,
s3MetricsRegistry metrics.Registry,
healthCheckStorage *healthCheckStorage,
healthMetricsRegistry metrics.Registry,
) (*S3Client, error) {

Expand All @@ -109,11 +113,13 @@ func NewS3ClientFromConfig(
}

return NewS3Client(
ctx,
config.GetEndpoint(),
config.GetRegion(),
credentials,
callTimeout,
s3MetricsRegistry,
healthCheckStorage,
healthMetricsRegistry,
config.GetMaxRetriableErrorCount(),
)
Expand Down
4 changes: 3 additions & 1 deletion cloud/tasks/persistence/s3_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,13 +123,15 @@ func (m *s3Metrics) OnRetry(req *request.Request) {
}

func newS3Metrics(
ctx context.Context,
callTimeout time.Duration,
s3MetricsRegistry metrics.Registry,
healthCheckStorage HealthStorage,
healthMetricsRegistry metrics.Registry,
) *s3Metrics {
return &s3Metrics{
callTimeout: callTimeout,
s3MetricsRegistry: s3MetricsRegistry,
healthCheck: NewHealthCheck("s3", healthMetricsRegistry),
healthCheck: NewHealthCheck(ctx, "s3", healthCheckStorage, healthMetricsRegistry),
}
}
37 changes: 33 additions & 4 deletions cloud/tasks/persistence/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/ydb-platform/nbs/cloud/tasks/errors"
"github.com/ydb-platform/nbs/cloud/tasks/metrics/empty"
"github.com/ydb-platform/nbs/cloud/tasks/metrics/mocks"
)

Expand All @@ -17,18 +18,22 @@ const maxRetriableErrorCount = 3
////////////////////////////////////////////////////////////////////////////////

func newS3Client(
ctx context.Context,
callTimeout time.Duration,
s3MetricsRegistry *mocks.RegistryMock,
healthCheckStorage HealthStorage,
healthMetricsRegistry *mocks.RegistryMock,
) (*S3Client, error) {

credentials := NewS3Credentials("test", "test")
return NewS3Client(
ctx,
"endpoint",
"region",
credentials,
callTimeout,
s3MetricsRegistry,
healthCheckStorage,
healthMetricsRegistry,
maxRetriableErrorCount,
)
Expand All @@ -40,9 +45,15 @@ func TestS3ShouldSendErrorCanceledMetric(t *testing.T) {
ctx, cancel := context.WithCancel(newContext())

s3MetricsRegistry := mocks.NewRegistryMock()

db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

healthCheckStorage := newStorage(t, ctx, db)
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
s3, err := newS3Client(ctx, 10*time.Second /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry)
require.NoError(t, err)

cancel()
Expand All @@ -68,9 +79,15 @@ func TestS3ShouldSendErrorTimeoutMetric(t *testing.T) {
defer cancel()

s3MetricsRegistry := mocks.NewRegistryMock()

db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

healthCheckStorage := newStorage(t, ctx, db)
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(0 /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
s3, err := newS3Client(ctx, 0 /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry)
require.NoError(t, err)

s3MetricsRegistry.GetCounter(
Expand Down Expand Up @@ -99,9 +116,15 @@ func TestS3ShouldRetryRequests(t *testing.T) {
defer cancel()

s3MetricsRegistry := mocks.NewRegistryMock()

db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

healthCheckStorage := newStorage(t, ctx, db)
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
s3, err := newS3Client(ctx, 10*time.Second /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry)
require.NoError(t, err)

s3MetricsRegistry.GetCounter(
Expand All @@ -125,9 +148,15 @@ func TestS3ShouldSendHealthMetric(t *testing.T) {
defer cancel()

s3MetricsRegistry := mocks.NewRegistryMock()

db, err := newYDB(ctx, empty.NewRegistry())
require.NoError(t, err)
defer db.Close(ctx)

healthCheckStorage := newStorage(t, ctx, db)
healthMetricsRegistry := mocks.NewRegistryMock()

s3, err := newS3Client(10*time.Second /* callTimeout */, s3MetricsRegistry, healthMetricsRegistry)
s3, err := newS3Client(ctx, 10*time.Second /* callTimeout */, s3MetricsRegistry, healthCheckStorage, healthMetricsRegistry)
require.NoError(t, err)

s3MetricsRegistry.GetCounter(
Expand Down
1 change: 1 addition & 0 deletions cloud/tasks/persistence/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ SRCS(

health.go
health_storage.go
health_storage_mocks.go
)

GO_TEST_SRCS(
Expand Down

0 comments on commit 22be1a9

Please sign in to comment.