Skip to content

Commit

Permalink
refactor(share/availability): Move window and constants to share/avai…
Browse files Browse the repository at this point in the history
…lability pkg (#3906)
  • Loading branch information
renaynay authored Nov 14, 2024
1 parent 1e7c109 commit be75628
Show file tree
Hide file tree
Showing 32 changed files with 141 additions and 165 deletions.
10 changes: 5 additions & 5 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"context"
"fmt"
"time"

"github.com/tendermint/tendermint/types"

Expand All @@ -15,9 +16,8 @@ import (
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/store"
)

Expand Down Expand Up @@ -61,16 +61,16 @@ func storeEDS(
eh *header.ExtendedHeader,
eds *rsmt2d.ExtendedDataSquare,
store *store.Store,
window pruner.AvailabilityWindow,
window time.Duration,
) error {
if !pruner.IsWithinAvailabilityWindow(eh.Time(), window) {
if !availability.IsWithinWindow(eh.Time(), window) {
log.Debugw("skipping storage of historic block", "height", eh.Height())
return nil
}

var err error
// archival nodes should not store Q4 outside the availability window.
if pruner.IsWithinAvailabilityWindow(eh.Time(), full.Window) {
if availability.IsWithinWindow(eh.Time(), availability.StorageWindow) {
err = store.PutODSQ4(ctx, eh.DAH, eh.Height(), eds)
} else {
err = store.PutODS(ctx, eh.DAH, eh.Height(), eds)
Expand Down
3 changes: 1 addition & 2 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/store"
)

Expand All @@ -23,7 +22,7 @@ type Exchange struct {
store *store.Store
construct header.ConstructFn

availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration

metrics *exchangeMetrics
}
Expand Down
3 changes: 1 addition & 2 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/celestiaorg/celestia-app/v3/test/util/testnode"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/store"
)
Expand Down Expand Up @@ -82,7 +81,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {
fetcher,
store,
header.MakeExtendedHeader,
WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond)), // all blocks will be "historic"
WithAvailabilityWindow(time.Nanosecond), // all blocks will be "historic"
)
require.NoError(t, err)

Expand Down
3 changes: 1 addition & 2 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)
Expand All @@ -38,7 +37,7 @@ type Listener struct {

construct header.ConstructFn
store *store.Store
availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand Down
3 changes: 1 addition & 2 deletions core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/celestiaorg/celestia-node/header"
nodep2p "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
"github.com/celestiaorg/celestia-node/store"
)
Expand Down Expand Up @@ -118,7 +117,7 @@ func TestListener_DoesNotStoreHistoric(t *testing.T) {
require.NoError(t, err)

// create Listener and start listening
opt := WithAvailabilityWindow(pruner.AvailabilityWindow(time.Nanosecond))
opt := WithAvailabilityWindow(time.Nanosecond)
cl := createListener(ctx, t, fetcher, ps0, eds, store, testChainID, opt)

dataRoots := generateNonEmptyBlocks(t, ctx, fetcher, cfg, cctx)
Expand Down
10 changes: 5 additions & 5 deletions core/option.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
package core

import (
"time"

"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/archival"
)

type Option func(*params)

type params struct {
metrics bool
chainID string
availabilityWindow pruner.AvailabilityWindow
availabilityWindow time.Duration
}

func defaultParams() params {
return params{
availabilityWindow: archival.Window,
availabilityWindow: time.Duration(0),
}
}

Expand All @@ -34,7 +34,7 @@ func WithChainID(id p2p.Network) Option {
}
}

func WithAvailabilityWindow(window pruner.AvailabilityWindow) Option {
func WithAvailabilityWindow(window time.Duration) Option {
return func(p *params) {
p.availabilityWindow = window
}
Expand Down
4 changes: 2 additions & 2 deletions das/daser.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
libhead "github.com/celestiaorg/go-header"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
)
Expand Down Expand Up @@ -162,7 +162,7 @@ func (d *DASer) Stop(ctx context.Context) error {
func (d *DASer) sample(ctx context.Context, h *header.ExtendedHeader) error {
// short-circuit if pruning is enabled and the header is outside the
// availability window
if !pruner.IsWithinAvailabilityWindow(h.Time(), d.params.samplingWindow) {
if !availability.IsWithinWindow(h.Time(), d.params.samplingWindow) {
log.Debugw("skipping header outside sampling window", "height", h.Height(),
"time", h.Time())
return errOutsideSamplingWindow
Expand Down
6 changes: 3 additions & 3 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/availability/mocks"
"github.com/celestiaorg/celestia-node/share/eds/edstest"
)
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestDASer_SamplingWindow(t *testing.T) {

// create and start DASer
daser, err := NewDASer(avail, sub, getter, ds, fserv, newBroadcastMock(1),
WithSamplingWindow(pruner.AvailabilityWindow(time.Second)))
WithSamplingWindow(time.Second))
require.NoError(t, err)

tests := []struct {
Expand All @@ -276,7 +276,7 @@ func TestDASer_SamplingWindow(t *testing.T) {
assert.Equal(
t,
tt.withinWindow,
pruner.IsWithinAvailabilityWindow(eh.Time(), daser.params.samplingWindow),
availability.IsWithinWindow(eh.Time(), daser.params.samplingWindow),
)
})
}
Expand Down
6 changes: 2 additions & 4 deletions das/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"time"

"github.com/celestiaorg/celestia-node/pruner"
)

// ErrInvalidOption is an error that is returned by Parameters.Validate
Expand Down Expand Up @@ -47,7 +45,7 @@ type Parameters struct {
// samplingWindow determines the time window that headers should fall into
// in order to be sampled. If set to 0, the sampling window will include
// all headers.
samplingWindow pruner.AvailabilityWindow
samplingWindow time.Duration
}

// DefaultParameters returns the default configuration values for the daser parameters
Expand Down Expand Up @@ -166,7 +164,7 @@ func WithSampleTimeout(sampleTimeout time.Duration) Option {

// WithSamplingWindow is a functional option to configure the DASer's
// `samplingWindow` parameter.
func WithSamplingWindow(samplingWindow pruner.AvailabilityWindow) Option {
func WithSamplingWindow(samplingWindow time.Duration) Option {
return func(d *DASer) {
d.params.samplingWindow = samplingWindow
}
Expand Down
6 changes: 3 additions & 3 deletions nodebuilder/das/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/celestiaorg/celestia-node/das"
"github.com/celestiaorg/celestia-node/header"
modfraud "github.com/celestiaorg/celestia-node/nodebuilder/fraud"
"github.com/celestiaorg/celestia-node/pruner"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/eds/byzantine"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexsub"
Expand Down Expand Up @@ -45,10 +45,10 @@ func newDASer(
batching datastore.Batching,
fraudServ fraud.Service[*header.ExtendedHeader],
bFn shrexsub.BroadcastFn,
availWindow pruner.AvailabilityWindow,
availWindow modshare.Window,
options ...das.Option,
) (*das.DASer, *modfraud.ServiceBreaker[*das.DASer, *header.ExtendedHeader], error) {
options = append(options, das.WithSamplingWindow(availWindow))
options = append(options, das.WithSamplingWindow(availWindow.Duration()))

ds, err := das.NewDASer(da, hsub, store, batching, fraudServ, bFn, options...)
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions nodebuilder/pruner/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,18 @@ import (

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
)

func newPrunerService(
p pruner.Pruner,
window pruner.AvailabilityWindow,
window modshare.Window,
getter libhead.Store[*header.ExtendedHeader],
ds datastore.Batching,
opts ...pruner.Option,
) (*pruner.Service, error) {
serv, err := pruner.NewService(p, window, getter, ds, p2p.BlockTime, opts...)
serv, err := pruner.NewService(p, window.Duration(), getter, ds, p2p.BlockTime, opts...)
if err != nil {
return nil, err
}
Expand Down
36 changes: 27 additions & 9 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package pruner

import (
"context"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -10,10 +11,12 @@ import (
"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/archival"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/pruner/light"
"github.com/celestiaorg/celestia-node/share/availability"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/discovery"
)

var log = logging.Logger("module/pruner")
Expand All @@ -22,6 +25,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
advertiseArchival(tp, cfg),
)

prunerService := fx.Options(
Expand All @@ -45,6 +49,9 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
return fx.Module("prune",
baseComponents,
prunerService,
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
// note this provide exists in pruner module to avoid cyclical imports
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
)
}
// We do not trigger DetectPreviousRun for Light nodes, to allow them to disable pruning at wish.
Expand Down Expand Up @@ -73,8 +80,8 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Provide(func(window pruner.AvailabilityWindow) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window)}
fx.Provide(func(window modshare.Window) []core.Option {
return []core.Option{core.WithAvailabilityWindow(window.Duration())}
}),
)
}
Expand All @@ -92,20 +99,31 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
}
}

func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
if (tp == node.Full || tp == node.Bridge) && !pruneCfg.EnableService {
return fx.Supply(discovery.WithAdvertise())
}
return fx.Provide(func() discovery.Option {
var opt discovery.Option
return opt
})
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() pruner.AvailabilityWindow {
return light.Window
return fx.Provide(func() modshare.Window {
return modshare.Window(availability.StorageWindow)
})
case node.Full, node.Bridge:
return fx.Provide(func() pruner.AvailabilityWindow {
return fx.Provide(func() modshare.Window {
if pruneEnabled {
return full.Window
return modshare.Window(availability.StorageWindow)
}
return archival.Window
// implicitly disable pruning by setting the window to 0
return modshare.Window(time.Duration(0))
})
default:
panic("unknown node type")
Expand Down
5 changes: 2 additions & 3 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"go.uber.org/fx"

headerServ "github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/shwap"
"github.com/celestiaorg/celestia-node/share/shwap/getters"
Expand All @@ -23,9 +22,9 @@ func bitswapGetter(
lc fx.Lifecycle,
exchange exchange.SessionExchange,
bstore blockstore.Blockstore,
wndw pruner.AvailabilityWindow,
wndw Window,
) *bitswap.Getter {
getter := bitswap.NewGetter(exchange, bstore, wndw)
getter := bitswap.NewGetter(exchange, bstore, wndw.Duration())
lc.Append(fx.StartStopHook(getter.Start, getter.Stop))
return getter
}
Expand Down
Loading

0 comments on commit be75628

Please sign in to comment.