diff --git a/go.mod b/go.mod index 36d7c471e..5aa84d765 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( cosmossdk.io/math v1.2.0 cosmossdk.io/store v1.0.2 cosmossdk.io/x/feegrant v0.1.0 + cosmossdk.io/x/tx v0.13.0 cosmossdk.io/x/upgrade v0.1.0 github.com/avast/retry-go/v4 v4.5.1 github.com/btcsuite/btcd v0.23.5-0.20231215221805-96c9fd8078fd @@ -22,7 +23,6 @@ require ( github.com/cosmos/ics23/go v0.10.0 github.com/ethereum/go-ethereum v1.13.5 github.com/gofrs/flock v0.8.1 - github.com/google/go-cmp v0.6.0 github.com/google/go-github/v43 v43.0.0 github.com/grpc-ecosystem/grpc-gateway v1.16.0 github.com/jsternberg/zap-logfmt v1.3.0 @@ -51,7 +51,6 @@ require ( cosmossdk.io/core v0.11.0 // indirect cosmossdk.io/depinject v1.0.0-alpha.4 // indirect cosmossdk.io/log v1.3.0 // indirect - cosmossdk.io/x/tx v0.13.0 // indirect filippo.io/edwards25519 v1.0.0 // indirect github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect github.com/99designs/keyring v1.2.1 // indirect @@ -110,6 +109,7 @@ require ( github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect github.com/google/btree v1.1.2 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/orderedcode v0.0.1 // indirect github.com/google/s2a-go v0.1.7 // indirect diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index d0f3e1d23..94aa1ac6c 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -154,6 +154,8 @@ func (pathEnd *pathEndRuntime) mergeMessageCache( channelHandshakeMessages := make(ChannelMessagesCache) clientICQMessages := make(ClientICQMessagesCache) + messageCache.PacketState.Prune(100) // Only keep most recent 100 packet states per channel + for ch, pmc := range messageCache.PacketFlow { if pathEnd.ShouldRelayChannel(ChainChannelKey{ ChainID: pathEnd.info.ChainID, @@ -194,6 +196,12 @@ func (pathEnd *pathEndRuntime) mergeMessageCache( } packetMessages[ch] = newPmc + + for eventType, pCache := range newPmc { + for seq := range pCache { + pathEnd.messageCache.PacketState.UpdateState(ch, seq, eventType) + } + } } } @@ -610,9 +618,13 @@ func (pathEnd *pathEndRuntime) removePacketRetention( case chantypes.EventTypeRecvPacket: toDelete[eventType] = []uint64{sequence} toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence} - case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket: + case chantypes.EventTypeAcknowledgePacket: toDelete[eventType] = []uint64{sequence} toDeleteCounterparty[chantypes.EventTypeRecvPacket] = []uint64{sequence} + toDeleteCounterparty[chantypes.EventTypeWriteAck] = []uint64{sequence} + toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence} + case chantypes.EventTypeTimeoutPacket: + toDelete[eventType] = []uint64{sequence} toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence} } // delete in progress send for this specific message diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index 9a5f0ad7c..c73126acb 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -352,6 +352,36 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun case <-pp.retryProcess: // No new data to merge in, just retry handling. case <-pp.flushTimer.C: + for len(pp.pathEnd1.incomingCacheData) > 0 { + d := <-pp.pathEnd1.incomingCacheData + // we have new data from ChainProcessor for pathEnd1 + pp.pathEnd1.mergeCacheData( + ctx, + cancel, + d, + pp.pathEnd2.info.ChainID, + pp.pathEnd2.inSync, + pp.messageLifecycle, + pp.pathEnd2, + pp.memoLimit, + pp.maxReceiverSize, + ) + } + for len(pp.pathEnd2.incomingCacheData) > 0 { + d := <-pp.pathEnd2.incomingCacheData + // we have new data from ChainProcessor for pathEnd2 + pp.pathEnd2.mergeCacheData( + ctx, + cancel, + d, + pp.pathEnd1.info.ChainID, + pp.pathEnd1.inSync, + pp.messageLifecycle, + pp.pathEnd1, + pp.memoLimit, + pp.maxReceiverSize, + ) + } // Periodic flush to clear out any old packets pp.handleFlush(ctx) } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index a85b19870..01f7c6d8a 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1261,6 +1261,10 @@ func (pp *PathProcessor) queuePendingRecvAndAcks( var skipped *skippedPackets for i, seq := range unrecv { + if state, ok := dst.messageCache.PacketState.State(k, seq); ok && stateValue(state) >= stateValue(chantypes.EventTypeRecvPacket) { + continue // already recv'd by path processor + } + srcMu.Lock() if srcCache.IsCached(chantypes.EventTypeSendPacket, k, seq) { continue // already cached @@ -1340,8 +1344,13 @@ SeqLoop: } for i, seq := range unacked { - dstMu.Lock() ck := k.Counterparty() + + if state, ok := dst.messageCache.PacketState.State(ck, seq); ok && stateValue(state) >= stateValue(chantypes.EventTypeAcknowledgePacket) { + continue // already acked by path processor + } + + dstMu.Lock() if dstCache.IsCached(chantypes.EventTypeRecvPacket, ck, seq) && dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) { continue // already cached diff --git a/relayer/processor/types.go b/relayer/processor/types.go index 7cb2022af..a55bea4e5 100644 --- a/relayer/processor/types.go +++ b/relayer/processor/types.go @@ -91,6 +91,7 @@ func (t *ChannelCloseLifecycle) messageLifecycler() {} // which will retain relevant messages for each PathProcessor. type IBCMessagesCache struct { PacketFlow ChannelPacketMessagesCache + PacketState ChannelPacketStateCache ConnectionHandshake ConnectionMessagesCache ChannelHandshake ChannelMessagesCache ClientICQ ClientICQMessagesCache @@ -115,6 +116,7 @@ func (c IBCMessagesCache) Clone() IBCMessagesCache { func NewIBCMessagesCache() IBCMessagesCache { return IBCMessagesCache{ PacketFlow: make(ChannelPacketMessagesCache), + PacketState: make(ChannelPacketStateCache), ConnectionHandshake: make(ConnectionMessagesCache), ChannelHandshake: make(ChannelMessagesCache), ClientICQ: make(ClientICQMessagesCache), @@ -124,12 +126,18 @@ func NewIBCMessagesCache() IBCMessagesCache { // ChannelPacketMessagesCache is used for caching a PacketMessagesCache for a given IBC channel. type ChannelPacketMessagesCache map[ChannelKey]PacketMessagesCache +// ChannelPacketStateCache is used for caching a PacketSequenceStateCache for a given IBC channel. +type ChannelPacketStateCache map[ChannelKey]PacketSequenceStateCache + // PacketMessagesCache is used for caching a PacketSequenceCache for a given IBC message type. type PacketMessagesCache map[string]PacketSequenceCache // PacketSequenceCache is used for caching an IBC message for a given packet sequence. type PacketSequenceCache map[uint64]provider.PacketInfo +// PacketSequenceStateCache is used for caching the state of a packet sequence. +type PacketSequenceStateCache map[uint64]string + // ChannelMessagesCache is used for caching a ChannelMessageCache for a given IBC message type. type ChannelMessagesCache map[string]ChannelMessageCache @@ -344,6 +352,76 @@ func (c PacketMessagesCache) DeleteMessages(toDelete ...map[string][]uint64) { } } +func stateValue(state string) int { + switch state { + case chantypes.EventTypeSendPacket: + return 1 + case chantypes.EventTypeRecvPacket: + return 2 + case chantypes.EventTypeWriteAck: + return 3 + case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket: + return 4 + } + panic(fmt.Errorf("unexpected state: %s", state)) +} + +func (c ChannelPacketStateCache) UpdateState(k ChannelKey, sequence uint64, state string) { + minState := 0 + if sequenceCache, ok := c[k]; ok { + if currentState, ok := sequenceCache[sequence]; ok { + minState = stateValue(currentState) + } + } else { + c[k] = make(PacketSequenceStateCache) + } + + if stateValue(state) <= minState { + // can't downgrade state + return + } + + c[k][sequence] = state +} + +func (c ChannelPacketStateCache) State(k ChannelKey, sequence uint64) (string, bool) { + sequenceCache, ok := c[k] + if !ok { + return "", false + } + + state, ok := sequenceCache[sequence] + if !ok { + return "", false + } + + return state, true +} + +// Prune deletes all map entries except for the most recent (keep) for all channels. +func (c ChannelPacketStateCache) Prune(keep int) { + for _, pssc := range c { + pssc.Prune(keep) + } +} + +// Prune deletes all map entries except for the most recent (keep). +func (c PacketSequenceStateCache) Prune(keep int) { + if len(c) <= keep { + return + } + seqs := make([]uint64, 0, len(c)) + for seq := range c { + seqs = append(seqs, seq) + } + sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] }) + + // only keep recent packet states + for _, seq := range seqs[:len(seqs)-keep] { + delete(c, seq) + } +} + // IsCached returns true if a sequence for a channel key and event type is already cached. func (c ChannelPacketMessagesCache) IsCached(eventType string, k ChannelKey, sequence uint64) bool { if _, ok := c[k]; !ok { diff --git a/relayer/processor/types_test.go b/relayer/processor/types_test.go index 69d63dfa3..36278ed0d 100644 --- a/relayer/processor/types_test.go +++ b/relayer/processor/types_test.go @@ -3,6 +3,7 @@ package processor_test import ( "testing" + chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported" "github.com/cosmos/relayer/v2/relayer/processor" "github.com/stretchr/testify/require" @@ -41,3 +42,34 @@ func TestIBCHeaderCachePrune(t *testing.T) { require.Len(t, cache, 5) require.NotNil(t, cache[uint64(15)], cache[uint64(16)], cache[uint64(17)], cache[uint64(18)], cache[uint64(19)]) } + +func TestPacketSequenceStateCachePrune(t *testing.T) { + cache := make(processor.PacketSequenceStateCache) + + for i := uint64(0); i < 50; i++ { + cache[i] = chantypes.EventTypeSendPacket + } + + cache.Prune(100) + + require.Len(t, cache, 50) + + cache.Prune(25) + + require.Len(t, cache, 25) + + min := uint64(1000) + max := uint64(0) + + for seq := range cache { + if seq < min { + min = seq + } + if seq > max { + max = seq + } + } + + require.Equal(t, uint64(25), min) + require.Equal(t, uint64(49), max) +}