diff --git a/pkg/services/blockfetcher/blockfetcher.go b/pkg/services/blockfetcher/blockfetcher.go index 308edeb9c8..daccec2998 100644 --- a/pkg/services/blockfetcher/blockfetcher.go +++ b/pkg/services/blockfetcher/blockfetcher.go @@ -62,6 +62,11 @@ func (p poolWrapper) Close() error { return nil } +type indexedOID struct { + Index int + OID oid.ID +} + // Service is a service that fetches blocks from NeoFS. type Service struct { // isActive denotes whether the service is working or in the process of shutdown. @@ -72,15 +77,15 @@ type Service struct { operationMode OperationMode stateRootInHeader bool - // headerSize is the size of the header in bytes. - headerSize int + // headerSizeMap is a map of height to expected header size. + headerSizeMap map[int]int chain Ledger pool poolWrapper enqueue func(obj bqueue.Indexable) error account *wallet.Account - oidsCh chan oid.ID + oidsCh chan indexedOID // wg is a wait group for block downloaders. wg sync.WaitGroup @@ -98,7 +103,7 @@ type Service struct { shutdownCallback func() // Depends on the OperationMode, the following functions are set to the appropriate functions. - getFunc func(ctx context.Context, oid string) (io.ReadCloser, error) + getFunc func(ctx context.Context, oid string, index int) (io.ReadCloser, error) readFunc func(rc io.ReadCloser) (any, error) heightFunc func() uint32 } @@ -165,7 +170,7 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun log: logger, cfg: cfg, operationMode: opt, - headerSize: getHeaderSize(chain.GetConfig()), + headerSizeMap: getHeaderSizeMap(chain.GetConfig()), enqueue: put, account: account, @@ -181,12 +186,17 @@ func New(chain Ledger, cfg config.NeoFSBlockFetcher, logger *zap.Logger, put fun // * first full block of OIDs is processing by Downloader // * second full block of OIDs is available to be fetched by Downloader immediately // * third half-filled block of OIDs is being collected by OIDsFetcher. - oidsCh: make(chan oid.ID, 2*cfg.OIDBatchSize), + oidsCh: make(chan indexedOID, 2*cfg.OIDBatchSize), }, nil } -func getHeaderSize(chain config.Blockchain) int { - return block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0)) +func getHeaderSizeMap(chain config.Blockchain) map[int]int { + headerSizeMap := make(map[int]int) + headerSizeMap[0] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(0)) + for height := range chain.CommitteeHistory { + headerSizeMap[int(height)] = block.GetExpectedHeaderSize(chain.StateRootInHeader, chain.GetNumOfCNs(height)) + } + return headerSizeMap } // Start runs the NeoFS BlockFetcher service. @@ -277,11 +287,13 @@ func (bfs *Service) oidDownloader() { func (bfs *Service) blockDownloader() { defer bfs.wg.Done() - for blkOid := range bfs.oidsCh { + for indexedOid := range bfs.oidsCh { + index := indexedOid.Index + blkOid := indexedOid.OID ctx, cancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer cancel() - rc, err := bfs.getFunc(ctx, blkOid.String()) + rc, err := bfs.getFunc(ctx, blkOid.String(), index) if err != nil { if isContextCanceledErr(err) { return @@ -347,7 +359,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { blockCtx, blockCancel := context.WithTimeout(bfs.ctx, bfs.cfg.Timeout) defer blockCancel() - oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String()) + oidsRC, err := bfs.objectGet(blockCtx, blockOidsObject[0].String(), -1) if err != nil { if isContextCanceledErr(err) { return nil @@ -355,7 +367,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { return fmt.Errorf("failed to fetch '%s' object with index %d: %w", bfs.cfg.IndexFileAttribute, startIndex, err) } - err = bfs.streamBlockOIDs(oidsRC, int(skip)) + err = bfs.streamBlockOIDs(oidsRC, int(startIndex), int(skip)) if err != nil { if isContextCanceledErr(err) { return nil @@ -370,7 +382,7 @@ func (bfs *Service) fetchOIDsFromIndexFiles() error { } // streamBlockOIDs reads block OIDs from the read closer and sends them to the OIDs channel. -func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { +func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, startIndex, skip int) error { defer rc.Close() oidBytes := make([]byte, oid.Size) oidsProcessed := 0 @@ -397,7 +409,7 @@ func (bfs *Service) streamBlockOIDs(rc io.ReadCloser, skip int) error { select { case <-bfs.exiterToOIDDownloader: return nil - case bfs.oidsCh <- oidBlock: + case bfs.oidsCh <- indexedOID{Index: startIndex*int(bfs.cfg.IndexFileSize) + oidsProcessed, OID: oidBlock}: } oidsProcessed++ @@ -442,12 +454,14 @@ func (bfs *Service) fetchOIDsBySearch() error { bfs.log.Info(fmt.Sprintf("NeoFS BlockFetcher service: no block found with index %d, stopping", startIndex)) return nil } + index := int(startIndex) for _, oid := range blockOids { select { case <-bfs.exiterToOIDDownloader: return nil - case bfs.oidsCh <- oid: + case bfs.oidsCh <- indexedOID{Index: index, OID: oid}: } + index++ //Won't work properly if neofs.ObjectSearch results are not ordered. } startIndex += batchSize } @@ -581,7 +595,7 @@ func (bfs *Service) retry(action func() error) error { return err } -func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, error) { +func (bfs *Service) objectGet(ctx context.Context, oid string, index int) (io.ReadCloser, error) { u, err := url.Parse(fmt.Sprintf("%s:%s/%s", neofs.URIScheme, bfs.cfg.ContainerID, oid)) if err != nil { return nil, err @@ -594,8 +608,19 @@ func (bfs *Service) objectGet(ctx context.Context, oid string) (io.ReadCloser, e return rc, err } -func (bfs *Service) objectGetRange(ctx context.Context, oid string) (io.ReadCloser, error) { - u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, bfs.headerSize)) +func (bfs *Service) objectGetRange(ctx context.Context, oid string, height int) (io.ReadCloser, error) { + nearestHeight := 0 + for h := range bfs.headerSizeMap { + if h <= height && h > nearestHeight { + nearestHeight = h + } + if nearestHeight >= height { + break + } + } + + size := bfs.headerSizeMap[nearestHeight] + u, err := url.Parse(fmt.Sprintf("%s:%s/%s/%s/%d|%d", neofs.URIScheme, bfs.cfg.ContainerID, oid, "range", 0, size)) if err != nil { return nil, err }