Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Filter replicated databases #407

Merged
merged 1 commit into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ type Client interface {
Commit(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64, r io.Reader) error

// Stream starts a long-running connection to stream changes from another node.
Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (Stream, error)
// If filter is specified, only those databases will be replicated.
Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (Stream, error)
}

// Stream represents a stream of frames.
Expand Down
3 changes: 3 additions & 0 deletions cmd/litefs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ type LeaseConfig struct {
// become primary again.
DemoteDelay time.Duration `yaml:"demote-delay"`

// Specifies a subset of databases to replica.
Databases []string `yaml:"databases"`

// Consul lease settings.
Consul struct {
URL string `yaml:"url"`
Expand Down
11 changes: 11 additions & 0 deletions cmd/litefs/mount_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type MountCommand struct {

// Used for generating the advertise URL for testing.
AdvertiseURLFn func() string

OnInitStore func()
}

// NewMountCommand returns a new instance of MountCommand.
Expand Down Expand Up @@ -158,6 +160,10 @@ func (c *MountCommand) Validate(ctx context.Context) (err error) {
return fmt.Errorf("invalid lease type, must be either 'consul' or 'static', got: '%v'", c.Config.Lease.Type)
}

if c.Config.Lease.Candidate && len(c.Config.Lease.Databases) > 0 {
return fmt.Errorf("cannot specify a database replication filter on candidate nodes")
}

return nil
}

Expand Down Expand Up @@ -362,8 +368,13 @@ func (c *MountCommand) initStore(ctx context.Context) error {
c.Store.ReconnectDelay = c.Config.Lease.ReconnectDelay
c.Store.DemoteDelay = c.Config.Lease.DemoteDelay
c.Store.Client = http.NewClient()
c.Store.DatabaseFilter = c.Config.Lease.Databases
c.initEnvironment(ctx)

if c.OnInitStore != nil {
c.OnInitStore()
}

if err := c.initStoreBackupClient(ctx); err != nil {
return err
}
Expand Down
34 changes: 34 additions & 0 deletions cmd/litefs/mount_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,40 @@ func TestMultiNode_Autopromotion(t *testing.T) {
}
}

func TestMultiNode_DatabaseFilter(t *testing.T) {
cmd0 := runMountCommand(t, newMountCommand(t, t.TempDir(), nil))
waitForPrimary(t, cmd0)
cmd1 := newMountCommand(t, t.TempDir(), cmd0)
cmd1.OnInitStore = func() {
cmd1.Store.DatabaseFilter = []string{"x.db"}
}
runMountCommand(t, cmd1)

db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "x.db"))
if _, err := db0.Exec(`CREATE TABLE t (x)`); err != nil {
t.Fatal(err)
} else if err := db0.Close(); err != nil {
t.Fatal(err)
}

db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "y.db"))
if _, err := db1.Exec(`CREATE TABLE t (y)`); err != nil {
t.Fatal(err)
} else if err := db1.Close(); err != nil {
t.Fatal(err)
}

waitForSync(t, "x.db", cmd0, cmd1)

// Only the filtered database should exist.
if _, err := os.Stat(filepath.Join(cmd1.Config.FUSE.Dir, "x.db")); err != nil {
t.Fatal(err)
}
if _, err := os.Stat(filepath.Join(cmd1.Config.FUSE.Dir, "y.db")); !os.IsNotExist(err) {
t.Fatal("expected second database to not exist on replica")
}
}

func TestMultiNode_StaticLeaser(t *testing.T) {
dir0, dir1 := t.TempDir(), t.TempDir()
cmd0 := newMountCommand(t, dir0, nil)
Expand Down
15 changes: 11 additions & 4 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"net/http"
"net/url"
"strconv"
"strings"

"github.com/superfly/litefs"
"github.com/superfly/litefs/internal/chunk"
Expand Down Expand Up @@ -354,7 +355,7 @@ func (c *Client) Commit(ctx context.Context, primaryURL string, nodeID uint64, n
}

// Stream returns a snapshot and continuous stream of WAL updates.
func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
u, err := url.Parse(primaryURL)
if err != nil {
return nil, fmt.Errorf("invalid client URL: %w", err)
Expand All @@ -364,11 +365,17 @@ func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, p
return nil, fmt.Errorf("URL host required")
}

q := make(url.Values)
if len(filter) > 0 {
q.Set("filter", strings.Join(filter, ","))
}

// Strip off everything but the scheme & host.
*u = url.URL{
Scheme: u.Scheme,
Host: u.Host,
Path: "/stream",
Scheme: u.Scheme,
Host: u.Host,
Path: "/stream",
RawQuery: q.Encode(),
}

var buf bytes.Buffer
Expand Down
18 changes: 18 additions & 0 deletions http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Upgrade to HTTP/2 required", http.StatusUpgradeRequired)
return
}
q := r.URL.Query()

// Prevent nodes from connecting to themselves.
id, _ := litefs.ParseNodeID(r.Header.Get(HeaderNodeID))
Expand Down Expand Up @@ -536,6 +537,14 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
dirtySet[db.Name()] = struct{}{}
}

// Determine filtered set of databases, if any.
filterSet := make(map[string]struct{})
if filter := q.Get("filter"); filter != "" {
for _, name := range strings.Split(filter, ",") {
filterSet[name] = struct{}{}
}
}

// Flush header so client can resume control.
w.WriteHeader(http.StatusOK)
w.(http.Flusher).Flush()
Expand All @@ -554,6 +563,15 @@ func (s *Server) handlePostStream(w http.ResponseWriter, r *http.Request) {
var readySent bool
var handoffLeaseID string
for {
// Restrict dirty set to only databases in the filter set.
if len(filterSet) > 0 && len(dirtySet) > 0 {
for name := range dirtySet {
if _, ok := filterSet[name]; !ok {
delete(dirtySet, name)
}
}
}

// Send pending transactions for each database.
for name := range dirtySet {
if err := s.streamDB(r.Context(), w, name, posMap); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions mock/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Client struct {
AcquireHaltLockFunc func(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) (*litefs.HaltLock, error)
ReleaseHaltLockFunc func(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) error
CommitFunc func(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64, r io.Reader) error
StreamFunc func(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error)
StreamFunc func(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error)
}

func (c *Client) AcquireHaltLock(ctx context.Context, primaryURL string, nodeID uint64, name string, lockID int64) (*litefs.HaltLock, error) {
Expand All @@ -29,8 +29,8 @@ func (c *Client) Commit(ctx context.Context, primaryURL string, nodeID uint64, n
return c.CommitFunc(ctx, primaryURL, nodeID, name, lockID, r)
}

func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
return c.StreamFunc(ctx, primaryURL, nodeID, posMap)
func (c *Client) Stream(ctx context.Context, primaryURL string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
return c.StreamFunc(ctx, primaryURL, nodeID, posMap, filter)
}

type Stream struct {
Expand Down
5 changes: 4 additions & 1 deletion store.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ type Store struct {
// Interface to interact with the host environment.
Environment Environment

// Specifies a subset of databases to replicate from the primary.
DatabaseFilter []string

// If true, computes and verifies the checksum of the entire database
// after every transaction. Should only be used during testing.
StrictVerify bool
Expand Down Expand Up @@ -1357,7 +1360,7 @@ func (s *Store) monitorLeaseAsReplica(ctx context.Context, info PrimaryInfo) (ha
}()

posMap := s.PosMap()
st, err := s.Client.Stream(ctx, info.AdvertiseURL, s.id, posMap)
st, err := s.Client.Stream(ctx, info.AdvertiseURL, s.id, posMap, s.DatabaseFilter)
if err != nil {
return "", fmt.Errorf("connect to primary: %s ('%s')", err, info.AdvertiseURL)
}
Expand Down
4 changes: 2 additions & 2 deletions store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ func TestStore_PrimaryCtx(t *testing.T) {
}

client := mock.Client{
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
return &mock.Stream{
ReadCloser: io.NopCloser(&bytes.Buffer{}),
ClusterIDFunc: func() string { return "" },
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestStore_PrimaryCtx(t *testing.T) {
t.Run("InitialReplica", func(t *testing.T) {
leaser := litefs.NewStaticLeaser(false, "localhost", "http://localhost:20202")
client := mock.Client{
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos) (litefs.Stream, error) {
StreamFunc: func(ctx context.Context, rawurl string, nodeID uint64, posMap map[string]ltx.Pos, filter []string) (litefs.Stream, error) {
var buf bytes.Buffer
if err := litefs.WriteStreamFrame(&buf, &litefs.ReadyStreamFrame{}); err != nil {
return nil, err
Expand Down
Loading