diff --git a/core/exchange.go b/core/exchange.go index f8e1606a3e..06f648edad 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -42,7 +42,29 @@ func (ce *Exchange) GetByHeight(ctx context.Context, height uint64) (*header.Ext return ce.getExtendedHeaderByHeight(ctx, &intHeight) } -func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ([]*header.ExtendedHeader, error) { +func (ce *Exchange) GetRangeByHeight( + ctx context.Context, + from *header.ExtendedHeader, + to uint64, +) ([]*header.ExtendedHeader, error) { + amount := to - (from.Height() + 1) + headers, err := ce.getRangeByHeight(ctx, from.Height()+1, amount) + if err != nil { + return nil, err + } + + for _, h := range headers { + err := from.Verify(h) + if err != nil { + return nil, fmt.Errorf("verifying next header against last verified height: %d: %w", + from.Height(), err) + } + from = h + } + return headers, nil +} + +func (ce *Exchange) getRangeByHeight(ctx context.Context, from, amount uint64) ([]*header.ExtendedHeader, error) { if amount == 0 { return nil, nil } @@ -73,27 +95,6 @@ func (ce *Exchange) GetRangeByHeight(ctx context.Context, from, amount uint64) ( return headers, nil } -func (ce *Exchange) GetVerifiedRange( - ctx context.Context, - from *header.ExtendedHeader, - amount uint64, -) ([]*header.ExtendedHeader, error) { - headers, err := ce.GetRangeByHeight(ctx, from.Height()+1, amount) - if err != nil { - return nil, err - } - - for _, h := range headers { - err := from.Verify(h) - if err != nil { - return nil, fmt.Errorf("verifying next header against last verified height: %d: %w", - from.Height(), err) - } - from = h - } - return headers, nil -} - func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.ExtendedHeader, error) { log.Debugw("requesting header", "hash", hash.String()) block, err := ce.fetcher.GetBlockByHash(ctx, hash) diff --git a/core/exchange_test.go b/core/exchange_test.go index 8aa62593c8..853b5a8dc6 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -17,6 +17,9 @@ import ( ) func TestCoreExchange_RequestHeaders(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + fetcher, _ := createCoreFetcher(t, DefaultTestConfig()) // generate 10 blocks @@ -25,10 +28,26 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { store := createStore(t) ce := NewExchange(fetcher, store, header.MakeExtendedHeader) - headers, err := ce.GetRangeByHeight(context.Background(), 1, 10) + + // initialize store with genesis block + genHeight := int64(1) + genBlock, err := fetcher.GetBlock(ctx, &genHeight) + require.NoError(t, err) + genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) + require.NoError(t, err) + + to := uint64(10) + expectedFirstHeightInRange := genHeader.Height() + 1 + expectedLastHeightInRange := to - 1 + expectedLenHeaders := to - expectedFirstHeightInRange + + // request headers from height 1 to 10 [2:10) + headers, err := ce.GetRangeByHeight(context.Background(), genHeader, to) require.NoError(t, err) - assert.Equal(t, 10, len(headers)) + assert.Len(t, headers, int(expectedLenHeaders)) + assert.Equal(t, expectedFirstHeightInRange, headers[0].Height()) + assert.Equal(t, expectedLastHeightInRange, headers[len(headers)-1].Height()) } func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testnode.Context) { diff --git a/core/listener_test.go b/core/listener_test.go index 90bdefb72f..eab634a330 100644 --- a/core/listener_test.go +++ b/core/listener_test.go @@ -31,8 +31,13 @@ func TestListener(t *testing.T) { // create mocknet with two pubsub endpoints ps0, ps1 := createMocknetWithTwoPubsubEndpoints(ctx, t) - subscriber := p2p.NewSubscriber[*header.ExtendedHeader](ps1, header.MsgID, networkID) - err := subscriber.SetVerifier(func(context.Context, *header.ExtendedHeader) error { + subscriber, err := p2p.NewSubscriber[*header.ExtendedHeader]( + ps1, + header.MsgID, + p2p.WithSubscriberNetworkID(networkID), + ) + require.NoError(t, err) + err = subscriber.SetVerifier(func(context.Context, *header.ExtendedHeader) error { return nil }) require.NoError(t, err) @@ -162,8 +167,9 @@ func createListener( edsSub *shrexsub.PubSub, store *eds.Store, ) *Listener { - p2pSub := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, networkID) - err := p2pSub.Start(ctx) + p2pSub, err := p2p.NewSubscriber[*header.ExtendedHeader](ps, header.MsgID, p2p.WithSubscriberNetworkID(networkID)) + require.NoError(t, err) + err = p2pSub.Start(ctx) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, p2pSub.Stop(ctx)) diff --git a/das/daser_test.go b/das/daser_test.go index ca553b0f09..1ec160e224 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -384,11 +384,7 @@ func (m getterStub) GetByHeight(_ context.Context, height uint64) (*header.Exten DAH: &share.Root{RowRoots: make([][]byte, 0)}}, nil } -func (m getterStub) GetRangeByHeight(context.Context, uint64, uint64) ([]*header.ExtendedHeader, error) { - return nil, nil -} - -func (m getterStub) GetVerifiedRange( +func (m getterStub) GetRangeByHeight( context.Context, *header.ExtendedHeader, uint64, diff --git a/docs/adr/adr-009-public-api.md b/docs/adr/adr-009-public-api.md index d2bf6ee817..be9e8827eb 100644 --- a/docs/adr/adr-009-public-api.md +++ b/docs/adr/adr-009-public-api.md @@ -100,10 +100,10 @@ GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) // WaitForHeight blocks until the header at the given height has been processed // by the node's header store or until context deadline is exceeded. WaitForHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) -// GetVerifiedRangeByHeight returns the given range (from:to) of ExtendedHeaders +// GetRangeByHeight returns the given range (from:to) of ExtendedHeaders // from the node's header store and verifies that the returned headers are // adjacent to each other. -GetVerifiedRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error) +GetRangeByHeight(ctx context.Context, from, to uint64) ([]*ExtendedHeader, error) // Subscribe creates long-living Subscription for newly validated // ExtendedHeaders. Multiple Subscriptions can be created. Subscribe(context.Context) (<-chan *header.ExtendedHeader, error) diff --git a/go.mod b/go.mod index 1c76383ffa..dfae47fb29 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/celestiaorg/celestia-app v1.0.0 github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5 github.com/celestiaorg/go-fraud v0.2.0 - github.com/celestiaorg/go-header v0.3.3 + github.com/celestiaorg/go-header v0.4.0 github.com/celestiaorg/go-libp2p-messenger v0.2.0 github.com/celestiaorg/nmt v0.20.0 github.com/celestiaorg/rsmt2d v0.11.0 diff --git a/go.sum b/go.sum index 4e31203f91..0c46be987a 100644 --- a/go.sum +++ b/go.sum @@ -370,8 +370,8 @@ github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5 h1:MJgXv github.com/celestiaorg/go-ds-badger4 v0.0.0-20230712104058-7ede1c814ac5/go.mod h1:r6xB3nvGotmlTACpAr3SunxtoXeesbqb57elgMJqflY= github.com/celestiaorg/go-fraud v0.2.0 h1:aaq2JiW0gTnhEdac3l51UCqSyJ4+VjFGTTpN83V4q7I= github.com/celestiaorg/go-fraud v0.2.0/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= -github.com/celestiaorg/go-header v0.3.3 h1:Y04hdJIJfD5hapyqK0ZQMgMTH5PQGV9YpcIf56LGc4E= -github.com/celestiaorg/go-header v0.3.3/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c= +github.com/celestiaorg/go-header v0.4.0 h1:Ine/xpvFx8o9p6fXW+h2RSPp68rn7VUxTkW1okJxcEY= +github.com/celestiaorg/go-header v0.4.0/go.mod h1:H8xhnDLDLbkpwmWPhCaZyTnIV3dlVxBHPnxNXS2Qu6c= github.com/celestiaorg/go-libp2p-messenger v0.2.0 h1:/0MuPDcFamQMbw9xTZ73yImqgTO3jHV7wKHvWD/Irao= github.com/celestiaorg/go-libp2p-messenger v0.2.0/go.mod h1:s9PIhMi7ApOauIsfBcQwbr7m+HBzmVfDIS+QLdgzDSo= github.com/celestiaorg/merkletree v0.0.0-20210714075610-a84dc3ddbbe4 h1:CJdIpo8n5MFP2MwK0gSRcOVlDlFdQJO1p+FqdxYzmvc= diff --git a/nodebuilder/header/config.go b/nodebuilder/header/config.go index 238fb39afd..d7265373ce 100644 --- a/nodebuilder/header/config.go +++ b/nodebuilder/header/config.go @@ -16,6 +16,9 @@ import ( "github.com/celestiaorg/celestia-node/nodebuilder/p2p" ) +// MetricsEnabled will be set during runtime if metrics are enabled on the node. +var MetricsEnabled = false + // Config contains configuration parameters for header retrieval and management. type Config struct { // TrustedHash is the Block/Header hash that Nodes use as starting point for header synchronization. diff --git a/nodebuilder/header/constructors.go b/nodebuilder/header/constructors.go index be4bcbd427..15d2da09b1 100644 --- a/nodebuilder/header/constructors.go +++ b/nodebuilder/header/constructors.go @@ -40,12 +40,18 @@ func newP2PExchange[H libhead.Header[H]]( ids[index] = peer.ID host.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL) } - exchange, err := p2p.NewExchange[H](host, ids, conngater, + + opts := []p2p.Option[p2p.ClientParameters]{ p2p.WithParams(cfg.Client), p2p.WithNetworkID[p2p.ClientParameters](network.String()), p2p.WithChainID(network.String()), p2p.WithPeerIDStore[p2p.ClientParameters](pidstore), - ) + } + if MetricsEnabled { + opts = append(opts, p2p.WithMetrics[p2p.ClientParameters]()) + } + + exchange, err := p2p.NewExchange[H](host, ids, conngater, opts...) if err != nil { return nil, err } @@ -68,10 +74,12 @@ func newSyncer[H libhead.Header[H]]( sub libhead.Subscriber[H], cfg Config, ) (*sync.Syncer[H], *modfraud.ServiceBreaker[*sync.Syncer[H], H], error) { - syncer, err := sync.NewSyncer[H](ex, store, sub, - sync.WithParams(cfg.Syncer), - sync.WithBlockTime(modp2p.BlockTime), - ) + opts := []sync.Option{sync.WithParams(cfg.Syncer), sync.WithBlockTime(modp2p.BlockTime)} + if MetricsEnabled { + opts = append(opts, sync.WithMetrics()) + } + + syncer, err := sync.NewSyncer[H](ex, store, sub, opts...) if err != nil { return nil, nil, err } @@ -96,6 +104,13 @@ func newInitStore[H libhead.Header[H]]( return nil, err } + if MetricsEnabled { + err = libhead.WithMetrics[H](s) + if err != nil { + return nil, err + } + } + trustedHash, err := cfg.trustedHash(net) if err != nil { return nil, err diff --git a/nodebuilder/header/header.go b/nodebuilder/header/header.go index 43b5646a77..f807796eb6 100644 --- a/nodebuilder/header/header.go +++ b/nodebuilder/header/header.go @@ -19,10 +19,10 @@ type Module interface { // GetByHash returns the header of the given hash from the node's header store. GetByHash(ctx context.Context, hash libhead.Hash) (*header.ExtendedHeader, error) - // GetVerifiedRangeByHeight returns the given range (from:to) of ExtendedHeaders + // GetRangeByHeight returns the given range (from:to) of ExtendedHeaders // from the node's header store and verifies that the returned headers are // adjacent to each other. - GetVerifiedRangeByHeight( + GetRangeByHeight( ctx context.Context, from *header.ExtendedHeader, to uint64, @@ -54,7 +54,7 @@ type API struct { ctx context.Context, hash libhead.Hash, ) (*header.ExtendedHeader, error) `perm:"read"` - GetVerifiedRangeByHeight func( + GetRangeByHeight func( context.Context, *header.ExtendedHeader, uint64, @@ -72,12 +72,12 @@ func (api *API) GetByHash(ctx context.Context, hash libhead.Hash) (*header.Exten return api.Internal.GetByHash(ctx, hash) } -func (api *API) GetVerifiedRangeByHeight( +func (api *API) GetRangeByHeight( ctx context.Context, from *header.ExtendedHeader, to uint64, ) ([]*header.ExtendedHeader, error) { - return api.Internal.GetVerifiedRangeByHeight(ctx, from, to) + return api.Internal.GetRangeByHeight(ctx, from, to) } func (api *API) GetByHeight(ctx context.Context, u uint64) (*header.ExtendedHeader, error) { diff --git a/nodebuilder/header/mocks/api.go b/nodebuilder/header/mocks/api.go index 7d6661ff5d..b0d2b961d9 100644 --- a/nodebuilder/header/mocks/api.go +++ b/nodebuilder/header/mocks/api.go @@ -8,11 +8,10 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - header "github.com/celestiaorg/celestia-node/header" header0 "github.com/celestiaorg/go-header" sync "github.com/celestiaorg/go-header/sync" + gomock "github.com/golang/mock/gomock" ) // MockModule is a mock of Module interface. @@ -68,19 +67,19 @@ func (mr *MockModuleMockRecorder) GetByHeight(arg0, arg1 interface{}) *gomock.Ca return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetByHeight", reflect.TypeOf((*MockModule)(nil).GetByHeight), arg0, arg1) } -// GetVerifiedRangeByHeight mocks base method. -func (m *MockModule) GetVerifiedRangeByHeight(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 uint64) ([]*header.ExtendedHeader, error) { +// GetRangeByHeight mocks base method. +func (m *MockModule) GetRangeByHeight(arg0 context.Context, arg1 *header.ExtendedHeader, arg2 uint64) ([]*header.ExtendedHeader, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetVerifiedRangeByHeight", arg0, arg1, arg2) + ret := m.ctrl.Call(m, "GetRangeByHeight", arg0, arg1, arg2) ret0, _ := ret[0].([]*header.ExtendedHeader) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetVerifiedRangeByHeight indicates an expected call of GetVerifiedRangeByHeight. -func (mr *MockModuleMockRecorder) GetVerifiedRangeByHeight(arg0, arg1, arg2 interface{}) *gomock.Call { +// GetRangeByHeight indicates an expected call of GetRangeByHeight. +func (mr *MockModuleMockRecorder) GetRangeByHeight(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetVerifiedRangeByHeight", reflect.TypeOf((*MockModule)(nil).GetVerifiedRangeByHeight), arg0, arg1, arg2) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetRangeByHeight", reflect.TypeOf((*MockModule)(nil).GetRangeByHeight), arg0, arg1, arg2) } // LocalHead mocks base method. diff --git a/nodebuilder/header/module.go b/nodebuilder/header/module.go index 557cbcfab7..4be25f7125 100644 --- a/nodebuilder/header/module.go +++ b/nodebuilder/header/module.go @@ -50,8 +50,12 @@ func ConstructModule[H libhead.Header[H]](tp node.Type, cfg *Config) fx.Option { }), )), fx.Provide(fx.Annotate( - func(ps *pubsub.PubSub, network modp2p.Network) *p2p.Subscriber[H] { - return p2p.NewSubscriber[H](ps, header.MsgID, network.String()) + func(ps *pubsub.PubSub, network modp2p.Network) (*p2p.Subscriber[H], error) { + opts := []p2p.SubscriberOption{p2p.WithSubscriberNetworkID(network.String())} + if MetricsEnabled { + opts = append(opts, p2p.WithSubscriberMetrics()) + } + return p2p.NewSubscriber[H](ps, header.MsgID, opts...) }, fx.OnStart(func(ctx context.Context, sub *p2p.Subscriber[H]) error { return sub.Start(ctx) @@ -62,14 +66,20 @@ func ConstructModule[H libhead.Header[H]](tp node.Type, cfg *Config) fx.Option { )), fx.Provide(fx.Annotate( func( + cfg Config, host host.Host, store libhead.Store[H], network modp2p.Network, ) (*p2p.ExchangeServer[H], error) { - return p2p.NewExchangeServer[H](host, store, + opts := []p2p.Option[p2p.ServerParameters]{ p2p.WithParams(cfg.Server), p2p.WithNetworkID[p2p.ServerParameters](network.String()), - ) + } + if MetricsEnabled { + opts = append(opts, p2p.WithMetrics[p2p.ServerParameters]()) + } + + return p2p.NewExchangeServer[H](host, store, opts...) }, fx.OnStart(func(ctx context.Context, server *p2p.ExchangeServer[H]) error { return server.Start(ctx) diff --git a/nodebuilder/header/opts.go b/nodebuilder/header/opts.go deleted file mode 100644 index decedcad85..0000000000 --- a/nodebuilder/header/opts.go +++ /dev/null @@ -1,28 +0,0 @@ -package header - -import ( - libhead "github.com/celestiaorg/go-header" - "github.com/celestiaorg/go-header/p2p" - "github.com/celestiaorg/go-header/sync" - - "github.com/celestiaorg/celestia-node/header" -) - -// WithMetrics provides sets `MetricsEnabled` to true on ClientParameters for the header exchange -func WithMetrics( - store libhead.Store[*header.ExtendedHeader], - ex libhead.Exchange[*header.ExtendedHeader], - sync *sync.Syncer[*header.ExtendedHeader], -) error { - if p2pex, ok := ex.(*p2p.Exchange[*header.ExtendedHeader]); ok { - if err := p2pex.InitMetrics(); err != nil { - return err - } - } - - if err := sync.InitMetrics(); err != nil { - return err - } - - return libhead.WithMetrics[*header.ExtendedHeader](store) -} diff --git a/nodebuilder/header/service.go b/nodebuilder/header/service.go index 2b208cb88d..072ef070c6 100644 --- a/nodebuilder/header/service.go +++ b/nodebuilder/header/service.go @@ -52,12 +52,12 @@ func (s *Service) GetByHash(ctx context.Context, hash libhead.Hash) (*header.Ext return s.store.Get(ctx, hash) } -func (s *Service) GetVerifiedRangeByHeight( +func (s *Service) GetRangeByHeight( ctx context.Context, from *header.ExtendedHeader, to uint64, ) ([]*header.ExtendedHeader, error) { - return s.store.GetVerifiedRange(ctx, from, to) + return s.store.GetRangeByHeight(ctx, from, to) } func (s *Service) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { diff --git a/nodebuilder/settings.go b/nodebuilder/settings.go index d56125209c..9767081971 100644 --- a/nodebuilder/settings.go +++ b/nodebuilder/settings.go @@ -24,7 +24,7 @@ import ( "github.com/celestiaorg/celestia-node/header" "github.com/celestiaorg/celestia-node/nodebuilder/das" - modheader "github.com/celestiaorg/celestia-node/nodebuilder/header" + modhead "github.com/celestiaorg/celestia-node/nodebuilder/header" "github.com/celestiaorg/celestia-node/nodebuilder/node" "github.com/celestiaorg/celestia-node/nodebuilder/p2p" "github.com/celestiaorg/celestia-node/nodebuilder/share" @@ -72,6 +72,10 @@ func WithPyroscope(endpoint string, nodeType node.Type) fx.Option { // WithMetrics enables metrics exporting for the node. func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Option { + // TODO @renaynay: this will be refactored when there is more granular + // control over which module to enable metrics for + modhead.MetricsEnabled = true + baseComponents := fx.Options( fx.Supply(metricOpts), fx.Invoke(initializeMetrics), @@ -83,7 +87,6 @@ func WithMetrics(metricOpts []otlpmetrichttp.Option, nodeType node.Type) fx.Opti }), fx.Invoke(fraud.WithMetrics[*header.ExtendedHeader]), fx.Invoke(node.WithMetrics), - fx.Invoke(modheader.WithMetrics), fx.Invoke(share.WithDiscoveryMetrics), )