diff --git a/cmd/litefs/config.go b/cmd/litefs/config.go index 15a0727..7385d2d 100644 --- a/cmd/litefs/config.go +++ b/cmd/litefs/config.go @@ -58,6 +58,7 @@ func NewConfig() Config { config.Log.Format = "text" config.Proxy.MaxLag = http.DefaultMaxLag + config.Proxy.PrimaryRedirectTimeout = http.DefaultPrimaryRedirectTimeout config.Tracing.Enabled = true config.Tracing.MaxSize = DefaultTracingMaxSize @@ -113,12 +114,13 @@ type HTTPConfig struct { // ProxyConfig represents the configuration for the HTTP proxy server. type ProxyConfig struct { - Addr string `yaml:"addr"` - Target string `yaml:"target"` - DB string `yaml:"db"` - MaxLag time.Duration `yaml:"max-lag"` - Debug bool `yaml:"debug"` - Passthrough []string `yaml:"passthrough"` + Addr string `yaml:"addr"` + Target string `yaml:"target"` + DB string `yaml:"db"` + MaxLag time.Duration `yaml:"max-lag"` + Debug bool `yaml:"debug"` + Passthrough []string `yaml:"passthrough"` + PrimaryRedirectTimeout time.Duration `yaml:"primary-redirect-timeout"` } // LeaseConfig represents a generic configuration for all lease types. diff --git a/cmd/litefs/mount_linux.go b/cmd/litefs/mount_linux.go index 81f0811..811e061 100644 --- a/cmd/litefs/mount_linux.go +++ b/cmd/litefs/mount_linux.go @@ -529,6 +529,7 @@ func (c *MountCommand) runProxyServer(ctx context.Context) error { server.MaxLag = c.Config.Proxy.MaxLag server.Debug = c.Config.Proxy.Debug server.Passthroughs = passthroughs + server.PrimaryRedirectTimeout = c.Config.Proxy.PrimaryRedirectTimeout if err := server.Listen(); err != nil { return err } diff --git a/http/proxy_server.go b/http/proxy_server.go index 7f565f2..204e76d 100644 --- a/http/proxy_server.go +++ b/http/proxy_server.go @@ -24,6 +24,8 @@ const ( DefaultPollTXIDInterval = 1 * time.Millisecond DefaultPollTXIDTimeout = 5 * time.Second + DefaultPrimaryRedirectTimeout = 5 * time.Second + DefaultMaxLag = 10 * time.Second DefaultCookieExpiry = 5 * time.Minute @@ -63,6 +65,8 @@ type ProxyServer struct { PollTXIDInterval time.Duration PollTXIDTimeout time.Duration + PrimaryRedirectTimeout time.Duration + // Maximum allowable lag before the health endpoint returns an error code. MaxLag time.Duration @@ -77,10 +81,11 @@ func NewProxyServer(store *litefs.Store) *ProxyServer { s := &ProxyServer{ store: store, - PollTXIDInterval: DefaultPollTXIDInterval, - PollTXIDTimeout: DefaultPollTXIDTimeout, - MaxLag: DefaultMaxLag, - CookieExpiry: DefaultCookieExpiry, + PollTXIDInterval: DefaultPollTXIDInterval, + PollTXIDTimeout: DefaultPollTXIDTimeout, + MaxLag: DefaultMaxLag, + CookieExpiry: DefaultCookieExpiry, + PrimaryRedirectTimeout: DefaultPrimaryRedirectTimeout, } s.ctx, s.cancel = context.WithCancelCause(context.Background()) @@ -253,7 +258,9 @@ LOOP: } func (s *ProxyServer) serveNonRead(w http.ResponseWriter, r *http.Request) { - isPrimary, info := s.store.PrimaryInfo() + ctx, cancel := context.WithTimeout(r.Context(), s.PrimaryRedirectTimeout) + defer cancel() + isPrimary, info := s.store.PrimaryInfoWithContext(ctx) // If this is the primary, send the request to the target. if isPrimary { diff --git a/store.go b/store.go index 713e75c..b68e12f 100644 --- a/store.go +++ b/store.go @@ -485,6 +485,28 @@ func (s *Store) PrimaryInfo() (isPrimary bool, info *PrimaryInfo) { return s.isPrimary(), s.primaryInfo.Clone() } +// PrimaryInfoWithContext continually attempts to fetch the primary info until available. +// Returns when isPrimary is true, info is non-nil, or when ctx is done. +func (s *Store) PrimaryInfoWithContext(ctx context.Context) (isPrimary bool, info *PrimaryInfo) { + if isPrimary, info = s.PrimaryInfo(); isPrimary || info != nil { + return isPrimary, info + } + + ticker := time.NewTicker(100 * time.Microsecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return isPrimary, info + case <-ticker.C: + if isPrimary, info = s.PrimaryInfo(); isPrimary || info != nil { + return isPrimary, info + } + } + } +} + func (s *Store) setPrimaryInfo(info *PrimaryInfo) { s.primaryInfo = info s.notifyPrimaryChange()