Skip to content

Commit

Permalink
chore: implement Snapshotting on schema state machine (#4246)
Browse files Browse the repository at this point in the history
Converts the state to a list of schemas, and back by using runtime
values.

This assumes that the runtime values match the schema state structure.
If not, it is a bug we need to fix.

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
jvmakine and github-actions[bot] authored Jan 30, 2025
1 parent 38be24f commit d577494
Show file tree
Hide file tree
Showing 9 changed files with 649 additions and 410 deletions.
70 changes: 70 additions & 0 deletions backend/schemaservice/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ package schemaservice
import (
"context"
"fmt"
"io"
"maps"
"slices"
"sync"

"google.golang.org/protobuf/proto"

schemapb "github.com/block/ftl/common/protos/xyz/block/ftl/schema/v1"
"github.com/block/ftl/common/reflect"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/channels"
Expand Down Expand Up @@ -40,6 +44,44 @@ func NewInMemorySchemaState(ctx context.Context) *statemachine.SingleQueryHandle
return statemachine.NewSingleQueryHandle(handle, struct{}{})
}

func (r *SchemaState) Marshal() ([]byte, error) {
state := &schema.SchemaState{
Modules: append(slices.Collect(maps.Values(r.deployments)), slices.Collect(maps.Values(r.provisioning))...),
}
stateProto := state.ToProto()
bytes, err := proto.Marshal(stateProto)
if err != nil {
return nil, fmt.Errorf("failed to marshal schema state: %w", err)
}
return bytes, nil
}

func (r *SchemaState) Unmarshal(data []byte) error {
stateProto := &schemapb.SchemaState{}
if err := proto.Unmarshal(data, stateProto); err != nil {
return fmt.Errorf("failed to unmarshal schema state: %w", err)
}

state, err := schema.SchemaStateFromProto(stateProto)
if err != nil {
return fmt.Errorf("failed to unmarshal schema state: %w", err)
}

for _, module := range state.Modules {
dkey := module.GetRuntime().GetDeployment().GetDeploymentKey()
if dkey.IsZero() {
r.provisioning[module.Name] = module
} else {
r.deployments[dkey] = module
if module.GetRuntime().GetDeployment().ActivatedAt.Ok() {
r.activeDeployments[dkey] = true
}
}
}

return nil
}

func (r *SchemaState) GetDeployment(deployment key.Deployment) (*schema.Module, error) {
d, ok := r.deployments[deployment]
if !ok {
Expand Down Expand Up @@ -83,6 +125,7 @@ type schemaStateMachine struct {
lock sync.Mutex
}

var _ statemachine.Snapshotting[struct{}, SchemaState, schema.Event] = &schemaStateMachine{}
var _ statemachine.Listenable[struct{}, SchemaState, schema.Event] = &schemaStateMachine{}

func (c *schemaStateMachine) Lookup(key struct{}) (SchemaState, error) {
Expand All @@ -107,3 +150,30 @@ func (c *schemaStateMachine) Publish(msg schema.Event) error {
func (c *schemaStateMachine) Subscribe(ctx context.Context) (<-chan struct{}, error) {
return c.notifier.Subscribe(), nil
}

func (c *schemaStateMachine) Close() error {
return nil
}

func (c *schemaStateMachine) Recover(snapshot io.Reader) error {
snapshotBytes, err := io.ReadAll(snapshot)
if err != nil {
return fmt.Errorf("failed to read snapshot: %w", err)
}
if err := c.state.Unmarshal(snapshotBytes); err != nil {
return fmt.Errorf("failed to unmarshal snapshot: %w", err)
}
return nil
}

func (c *schemaStateMachine) Save(w io.Writer) error {
snapshotBytes, err := c.state.Marshal()
if err != nil {
return fmt.Errorf("failed to marshal snapshot: %w", err)
}
_, err = w.Write(snapshotBytes)
if err != nil {
return fmt.Errorf("failed to write snapshot: %w", err)
}
return nil
}
37 changes: 37 additions & 0 deletions backend/schemaservice/state_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package schemaservice

import (
"testing"

"github.com/alecthomas/assert/v2"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/internal/key"
)

func TestSchemaStateMarshalling(t *testing.T) {
k := key.NewDeploymentKey("test")
state := SchemaState{
deployments: map[key.Deployment]*schema.Module{
k: {
Name: "test",
Runtime: &schema.ModuleRuntime{
Deployment: &schema.ModuleRuntimeDeployment{
DeploymentKey: k,
},
},
},
},
}

bytes, err := state.Marshal()
if err != nil {
t.Fatalf("failed to marshal schema state: %v", err)
}

unmarshalledState := NewSchemaState()
if err := unmarshalledState.Unmarshal(bytes); err != nil {
t.Fatalf("failed to unmarshal schema state: %v", err)
}

assert.Equal(t, state, unmarshalledState)
}
Loading

0 comments on commit d577494

Please sign in to comment.