From b6ac8c2363706b542d83b3dd285ef4bd99d00b4f Mon Sep 17 00:00:00 2001 From: Ryan Date: Thu, 21 Sep 2023 11:28:56 +0200 Subject: [PATCH] refactoring(full/availability): removing TeeGetter, storing via SharesAvailable (#2726) --- das/daser_test.go | 2 +- nodebuilder/share/constructors.go | 8 +- nodebuilder/tests/nd_test.go | 2 +- share/availability/cache/testing.go | 2 +- share/availability/full/availability.go | 23 ++++-- share/availability/full/availability_test.go | 19 ++++- share/availability/full/testing.go | 14 +++- share/getters/getter_test.go | 55 ------------- share/getters/store.go | 12 +-- share/getters/tee.go | 85 -------------------- share/ipld/corrupted_data_test.go | 5 +- 11 files changed, 61 insertions(+), 166 deletions(-) delete mode 100644 share/getters/tee.go diff --git a/das/daser_test.go b/das/daser_test.go index a14da325f2..6a3378cdb5 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -155,7 +155,7 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) { ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0], pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) require.NoError(t, err) - avail := full.TestAvailability(getters.NewIPLDGetter(bServ)) + avail := full.TestAvailability(t, getters.NewIPLDGetter(bServ)) // 15 headers from the past and 15 future headers mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15) diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index 0e8d17f208..28e0997163 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -92,7 +92,6 @@ func lightGetter( // by shrex the next time the data is retrieved (meaning shard recovery is // manual after corruption is detected). func bridgeGetter( - store *eds.Store, storeGetter *getters.StoreGetter, shrexGetter *getters.ShrexGetter, cfg Config, @@ -100,13 +99,12 @@ func bridgeGetter( var cascade []share.Getter cascade = append(cascade, storeGetter) if cfg.UseShareExchange { - cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store)) + cascade = append(cascade, shrexGetter) } return getters.NewCascadeGetter(cascade) } func fullGetter( - store *eds.Store, storeGetter *getters.StoreGetter, shrexGetter *getters.ShrexGetter, ipldGetter *getters.IPLDGetter, @@ -115,8 +113,8 @@ func fullGetter( var cascade []share.Getter cascade = append(cascade, storeGetter) if cfg.UseShareExchange { - cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store)) + cascade = append(cascade, shrexGetter) } - cascade = append(cascade, getters.NewTeeGetter(ipldGetter, store)) + cascade = append(cascade, ipldGetter) return getters.NewCascadeGetter(cascade) } diff --git a/nodebuilder/tests/nd_test.go b/nodebuilder/tests/nd_test.go index 64d672cddc..8149ea9fe7 100644 --- a/nodebuilder/tests/nd_test.go +++ b/nodebuilder/tests/nd_test.go @@ -204,7 +204,7 @@ func replaceShareGetter() fx.Option { ) share.Getter { cascade := make([]share.Getter, 0, 2) cascade = append(cascade, storeGetter) - cascade = append(cascade, getters.NewTeeGetter(shrexGetter, store)) + cascade = append(cascade, shrexGetter) return getters.NewCascadeGetter(cascade) }, )) diff --git a/share/availability/cache/testing.go b/share/availability/cache/testing.go index 00a1520114..989bee522c 100644 --- a/share/availability/cache/testing.go +++ b/share/availability/cache/testing.go @@ -32,7 +32,7 @@ func FullAvailabilityWithLocalRandSquare(t *testing.T, n int) (share.Availabilit store := dssync.MutexWrap(ds.NewMapDatastore()) getter := getters.NewIPLDGetter(bServ) avail := NewShareAvailability( - full.TestAvailability(getter), + full.TestAvailability(t, getter), store, ) return avail, availability_test.RandFillBS(t, n, bServ) diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index fb8dead839..51d9fd8518 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -3,12 +3,15 @@ package full import ( "context" "errors" + "fmt" + "github.com/filecoin-project/dagstore" logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/eds/byzantine" + "github.com/celestiaorg/celestia-node/share/ipld" "github.com/celestiaorg/celestia-node/share/p2p/discovery" ) @@ -63,13 +66,15 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, root *share.Ro } // a hack to avoid loading the whole EDS in mem if we store it already. - if fa.store != nil { - if ok, _ := fa.store.Has(ctx, root.Hash()); ok { - return nil - } + if ok, _ := fa.store.Has(ctx, root.Hash()); ok { + return nil } - _, err := fa.getter.GetEDS(ctx, root) + adder := ipld.NewProofsAdder(len(root.RowRoots)) + ctx = ipld.CtxWithProofsAdder(ctx, adder) + defer adder.Purge() + + eds, err := fa.getter.GetEDS(ctx, root) if err != nil { if errors.Is(err, context.Canceled) { return err @@ -79,8 +84,14 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, root *share.Ro if errors.Is(err, share.ErrNotFound) || errors.Is(err, context.DeadlineExceeded) && !errors.As(err, &byzantineErr) { return share.ErrNotAvailable } + return err } - return err + + err = fa.store.Put(ctx, root.Hash(), eds) + if err != nil && !errors.Is(err, dagstore.ErrShardExists) { + return fmt.Errorf("full availability: failed to store eds: %w", err) + } + return nil } func (fa *ShareAvailability) ProbabilityOfAvailability(context.Context) float64 { diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index b6d217cd6b..a769c981c4 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -35,11 +35,26 @@ func TestSharesAvailable_Full(t *testing.T) { // RandServiceWithSquare creates a NewShareAvailability inside, so we can test it getter, dah := GetterWithRandSquare(t, 16) - avail := TestAvailability(getter) + avail := TestAvailability(t, getter) err := avail.SharesAvailable(ctx, dah) assert.NoError(t, err) } +func TestSharesAvailable_StoresToEDSStore(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // RandServiceWithSquare creates a NewShareAvailability inside, so we can test it + getter, dah := GetterWithRandSquare(t, 16) + avail := TestAvailability(t, getter) + err := avail.SharesAvailable(ctx, dah) + assert.NoError(t, err) + + has, err := avail.store.Has(ctx, dah.Hash()) + assert.NoError(t, err) + assert.True(t, has) +} + func TestSharesAvailable_Full_ErrNotAvailable(t *testing.T) { ctrl := gomock.NewController(t) getter := mocks.NewMockGetter(ctrl) @@ -49,7 +64,7 @@ func TestSharesAvailable_Full_ErrNotAvailable(t *testing.T) { eds := edstest.RandEDS(t, 4) dah, err := da.NewDataAvailabilityHeader(eds) require.NoError(t, err) - avail := TestAvailability(getter) + avail := TestAvailability(t, getter) errors := []error{share.ErrNotFound, context.DeadlineExceeded} for _, getterErr := range errors { diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index 4638fde1bc..a636b26ea6 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -1,14 +1,18 @@ package full import ( + "context" "testing" "time" + "github.com/ipfs/go-datastore" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/p2p/discovery/routing" + "github.com/stretchr/testify/require" "github.com/celestiaorg/celestia-node/share" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" + "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/ipld" "github.com/celestiaorg/celestia-node/share/p2p/discovery" @@ -32,16 +36,20 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { nd := dn.NewTestNode() nd.Getter = getters.NewIPLDGetter(nd.BlockService) - nd.Availability = TestAvailability(nd.Getter) + nd.Availability = TestAvailability(dn.T, nd.Getter) return nd } -func TestAvailability(getter share.Getter) *ShareAvailability { +func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { disc := discovery.NewDiscovery( nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), discovery.WithAdvertiseInterval(time.Second), discovery.WithPeersLimit(10), ) - return NewShareAvailability(nil, getter, disc) + store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) + require.NoError(t, err) + err = store.Start(context.Background()) + require.NoError(t, err) + return NewShareAvailability(store, getter, disc) } diff --git a/share/getters/getter_test.go b/share/getters/getter_test.go index f8d1d0a5f0..bacb0a2c39 100644 --- a/share/getters/getter_test.go +++ b/share/getters/getter_test.go @@ -22,61 +22,6 @@ import ( "github.com/celestiaorg/celestia-node/share/sharetest" ) -func TestTeeGetter(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - storeCfg := eds.DefaultParameters() - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - edsStore, err := eds.NewStore(storeCfg, t.TempDir(), ds) - require.NoError(t, err) - - err = edsStore.Start(ctx) - require.NoError(t, err) - - bServ := ipld.NewMemBlockservice() - ig := NewIPLDGetter(bServ) - tg := NewTeeGetter(ig, edsStore) - - t.Run("TeesToEDSStore", func(t *testing.T) { - randEds, dah := randomEDS(t) - _, err := ipld.ImportShares(ctx, randEds.Flattened(), bServ) - require.NoError(t, err) - - // eds store doesn't have the EDS yet - ok, err := edsStore.Has(ctx, dah.Hash()) - assert.False(t, ok) - assert.NoError(t, err) - - retrievedEDS, err := tg.GetEDS(ctx, dah) - require.NoError(t, err) - require.True(t, randEds.Equals(retrievedEDS)) - - // eds store now has the EDS and it can be retrieved - ok, err = edsStore.Has(ctx, dah.Hash()) - assert.True(t, ok) - assert.NoError(t, err) - finalEDS, err := edsStore.Get(ctx, dah.Hash()) - assert.NoError(t, err) - require.True(t, randEds.Equals(finalEDS)) - }) - - t.Run("ShardAlreadyExistsDoesntError", func(t *testing.T) { - randEds, dah := randomEDS(t) - _, err := ipld.ImportShares(ctx, randEds.Flattened(), bServ) - require.NoError(t, err) - - retrievedEDS, err := tg.GetEDS(ctx, dah) - require.NoError(t, err) - require.True(t, randEds.Equals(retrievedEDS)) - - // no error should be returned, even though the EDS identified by the DAH already exists - retrievedEDS, err = tg.GetEDS(ctx, dah) - require.NoError(t, err) - require.True(t, randEds.Equals(retrievedEDS)) - }) -} - func TestStoreGetter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) diff --git a/share/getters/store.go b/share/getters/store.go index 415c9f047f..ad05cf8f37 100644 --- a/share/getters/store.go +++ b/share/getters/store.go @@ -132,11 +132,13 @@ func (sg *StoreGetter) GetSharesByNamespace( blockGetter := eds.NewBlockGetter(bs) shares, err = collectSharesByNamespace(ctx, blockGetter, root, namespace) if errors.Is(err, ipld.ErrNodeNotFound) { - // IPLD node not found after the index pointed to this shard and the CAR blockstore has been - // opened successfully is a strong indicator of corruption. We remove the block on bridges - // and fulls and return share.ErrNotFound to ensure the data is retrieved by the next - // getter. Note that this recovery is manual and will only be restored by an RPC call to - // fetch the same datahash that was removed. + // IPLD node not found after the index pointed to this shard and the CAR + // blockstore has been opened successfully is a strong indicator of + // corruption. We remove the block on bridges and fulls and return + // share.ErrNotFound to ensure the data is retrieved by the next getter. + // Note that this recovery is manual and will only be restored by an RPC + // call to SharesAvailable that fetches the same datahash that was + // removed. err = sg.store.Remove(ctx, root.Hash()) if err != nil { log.Errorf("getter/store: failed to remove CAR after detected corruption: %w", err) diff --git a/share/getters/tee.go b/share/getters/tee.go deleted file mode 100644 index 9c89b2dec5..0000000000 --- a/share/getters/tee.go +++ /dev/null @@ -1,85 +0,0 @@ -package getters - -import ( - "context" - "errors" - "fmt" - - "github.com/filecoin-project/dagstore" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" - - "github.com/celestiaorg/rsmt2d" - - "github.com/celestiaorg/celestia-node/libs/utils" - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/ipld" -) - -var _ share.Getter = (*TeeGetter)(nil) - -// TeeGetter is a share.Getter that wraps a getter and stores the results of GetEDS into an -// eds.Store. -type TeeGetter struct { - getter share.Getter - store *eds.Store -} - -// NewTeeGetter creates a new TeeGetter. -func NewTeeGetter(getter share.Getter, store *eds.Store) *TeeGetter { - return &TeeGetter{ - getter: getter, - store: store, - } -} - -func (tg *TeeGetter) GetShare(ctx context.Context, root *share.Root, row, col int) (share share.Share, err error) { - ctx, span := tracer.Start(ctx, "tee/get-share", trace.WithAttributes( - attribute.Int("row", row), - attribute.Int("col", col), - )) - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - return tg.getter.GetShare(ctx, root, row, col) -} - -func (tg *TeeGetter) GetEDS(ctx context.Context, root *share.Root) (eds *rsmt2d.ExtendedDataSquare, err error) { - ctx, span := tracer.Start(ctx, "tee/get-eds") - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - adder := ipld.NewProofsAdder(len(root.RowRoots)) - ctx = ipld.CtxWithProofsAdder(ctx, adder) - defer adder.Purge() - - eds, err = tg.getter.GetEDS(ctx, root) - if err != nil { - return nil, err - } - - err = tg.store.Put(ctx, root.Hash(), eds) - if err != nil && !errors.Is(err, dagstore.ErrShardExists) { - return nil, fmt.Errorf("getter/tee: failed to store eds: %w", err) - } - - return eds, nil -} - -func (tg *TeeGetter) GetSharesByNamespace( - ctx context.Context, - root *share.Root, - namespace share.Namespace, -) (shares share.NamespacedShares, err error) { - ctx, span := tracer.Start(ctx, "tee/get-shares-by-namespace", trace.WithAttributes( - attribute.String("namespace", namespace.String()), - )) - defer func() { - utils.SetStatusAndEnd(span, err) - }() - - return tg.getter.GetSharesByNamespace(ctx, root, namespace) -} diff --git a/share/ipld/corrupted_data_test.go b/share/ipld/corrupted_data_test.go index df1d0d7888..58ddf785fb 100644 --- a/share/ipld/corrupted_data_test.go +++ b/share/ipld/corrupted_data_test.go @@ -26,7 +26,7 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) { requestor := full.Node(net) provider, mockBS := availability_test.MockNode(t, net) - provider.Availability = full.TestAvailability(getters.NewIPLDGetter(provider.BlockService)) + provider.Availability = full.TestAvailability(t, getters.NewIPLDGetter(provider.BlockService)) net.ConnectAll() // before the provider starts attacking, we should be able to retrieve successfully. We pass a size @@ -38,7 +38,8 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) { require.NoError(t, err) // clear the storage of the requester so that it must retrieve again, then start attacking - requestor.ClearStorage() + // we reinitialize the node to clear the eds store + requestor = full.Node(net) mockBS.Attacking = true getCtx, cancelGet = context.WithTimeout(ctx, sharesAvailableTimeout) t.Cleanup(cancelGet)