diff --git a/cmd/config.go b/cmd/config.go index a6337ed94..2c7c17330 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -493,6 +493,7 @@ type GlobalConfig struct { Memo string `yaml:"memo" json:"memo"` LightCacheSize int `yaml:"light-cache-size" json:"light-cache-size"` LogLevel string `yaml:"log-level" json:"log-level"` + ICS20MemoLimit int `yaml:"ics20-memo-limit" json:"ics20-memo-limit"` } // newDefaultGlobalConfig returns a global config with defaults set diff --git a/cmd/start.go b/cmd/start.go index 3527de993..16375dbfa 100644 --- a/cmd/start.go +++ b/cmd/start.go @@ -160,6 +160,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)), chains, paths, maxMsgLength, + a.config.Global.ICS20MemoLimit, a.config.memo(cmd), clientUpdateThresholdTime, flushInterval, diff --git a/cmd/tx.go b/cmd/tx.go index fba474708..4012e4ac5 100644 --- a/cmd/tx.go +++ b/cmd/tx.go @@ -4,10 +4,11 @@ import ( "context" "errors" "fmt" - "github.com/avast/retry-go/v4" "strings" "time" + "github.com/avast/retry-go/v4" + sdk "github.com/cosmos/cosmos-sdk/types" chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer" @@ -933,6 +934,7 @@ $ %s tx flush demo-path channel-0`, chains, paths, maxMsgLength, + a.config.Global.ICS20MemoLimit, a.config.memo(cmd), 0, 0, diff --git a/relayer/chains/mock/mock_chain_processor_test.go b/relayer/chains/mock/mock_chain_processor_test.go index 99440eee7..fa6099cac 100644 --- a/relayer/chains/mock/mock_chain_processor_test.go +++ b/relayer/chains/mock/mock_chain_processor_test.go @@ -63,7 +63,7 @@ func TestMockChainAndPathProcessors(t *testing.T) { flushInterval := 6 * time.Hour pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "", - clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength) + clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength, 0) eventProcessor := processor.NewEventProcessor(). WithChainProcessors( diff --git a/relayer/channel.go b/relayer/channel.go index f8d0ba8f7..5f1a16fa8 100644 --- a/relayer/channel.go +++ b/relayer/channel.go @@ -60,6 +60,7 @@ func (c *Chain) CreateOpenChannels( DefaultClientUpdateThreshold, DefaultFlushInterval, DefaultMaxMsgLength, + 0, ) c.log.Info("Starting event processor for channel handshake", @@ -133,6 +134,7 @@ func (c *Chain) CloseChannel( DefaultClientUpdateThreshold, DefaultFlushInterval, DefaultMaxMsgLength, + 0, )). WithInitialBlockHistory(0). WithMessageLifecycle(&processor.FlushLifecycle{}). @@ -171,6 +173,7 @@ func (c *Chain) CloseChannel( DefaultClientUpdateThreshold, DefaultFlushInterval, DefaultMaxMsgLength, + 0, )). WithInitialBlockHistory(0). WithMessageLifecycle(&processor.ChannelCloseLifecycle{ diff --git a/relayer/connection.go b/relayer/connection.go index ba51d3939..823d60053 100644 --- a/relayer/connection.go +++ b/relayer/connection.go @@ -41,6 +41,7 @@ func (c *Chain) CreateOpenConnections( DefaultClientUpdateThreshold, DefaultFlushInterval, DefaultMaxMsgLength, + 0, ) var connectionSrc, connectionDst string diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index c6c21ee2d..762e8834c 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -2,9 +2,11 @@ package processor import ( "context" + "fmt" "sync" "time" + transfertypes "github.com/cosmos/ibc-go/v8/modules/apps/transfer/types" clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types" conntypes "github.com/cosmos/ibc-go/v8/modules/core/03-connection/types" chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types" @@ -98,10 +100,35 @@ func (pathEnd *pathEndRuntime) isRelevantChannel(channelID string) bool { return false } +// checkMemoLimit returns an error if the packet memo exceeds the configured limit. +func checkMemoLimit(packetData []byte, memoLimit int) error { + if memoLimit == 0 { + // no limit + return nil + } + + var packet transfertypes.FungibleTokenPacketData + if err := transfertypes.ModuleCdc.Unmarshal(packetData, &packet); err != nil { + // not an ICS-20 packet + return nil + } + + if len(packet.Memo) > int(memoLimit) { + return fmt.Errorf("packet memo size: %d exceeds limit: %d", len(packet.Memo), memoLimit) + } + + return nil +} + // mergeMessageCache merges relevant IBC messages for packet flows, connection handshakes, and channel handshakes. // inSync indicates whether both involved ChainProcessors are in sync or not. When true, the observed packets // metrics will be counted so that observed vs relayed packets can be compared. -func (pathEnd *pathEndRuntime) mergeMessageCache(messageCache IBCMessagesCache, counterpartyChainID string, inSync bool) { +func (pathEnd *pathEndRuntime) mergeMessageCache( + messageCache IBCMessagesCache, + counterpartyChainID string, + inSync bool, + memoLimit int, +) { packetMessages := make(ChannelPacketMessagesCache) connectionHandshakeMessages := make(ConnectionMessagesCache) channelHandshakeMessages := make(ChannelMessagesCache) @@ -109,14 +136,28 @@ func (pathEnd *pathEndRuntime) mergeMessageCache(messageCache IBCMessagesCache, for ch, pmc := range messageCache.PacketFlow { if pathEnd.ShouldRelayChannel(ChainChannelKey{ChainID: pathEnd.info.ChainID, CounterpartyChainID: counterpartyChainID, ChannelKey: ch}) { - if inSync && pathEnd.metrics != nil { - for eventType, pCache := range pmc { + newPmc := make(PacketMessagesCache) + for eventType, pCache := range pmc { + if inSync && pathEnd.metrics != nil { pathEnd.metrics.AddPacketsObserved(pathEnd.info.PathName, pathEnd.info.ChainID, ch.ChannelID, ch.PortID, eventType, len(pCache)) } + newPc := make(PacketSequenceCache) + for seq, p := range pCache { + if err := checkMemoLimit(p.Data, memoLimit); err != nil { + pathEnd.log.Warn("Ignoring packet", zap.Error(err)) + continue + } + + newPc[seq] = p + } + if len(newPc) > 0 { + newPmc[eventType] = newPc + } } - packetMessages[ch] = pmc + packetMessages[ch] = newPmc } } + pathEnd.messageCache.PacketFlow.Merge(packetMessages) for eventType, cmc := range messageCache.ConnectionHandshake { @@ -370,7 +411,16 @@ func (pathEnd *pathEndRuntime) checkForMisbehaviour( return true, nil } -func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func(), d ChainProcessorCacheData, counterpartyChainID string, counterpartyInSync bool, messageLifecycle MessageLifecycle, counterParty *pathEndRuntime) { +func (pathEnd *pathEndRuntime) mergeCacheData( + ctx context.Context, + cancel func(), + d ChainProcessorCacheData, + counterpartyChainID string, + counterpartyInSync bool, + messageLifecycle MessageLifecycle, + counterParty *pathEndRuntime, + memoLimit int, +) { pathEnd.lastClientUpdateHeightMu.Lock() pathEnd.latestBlock = d.LatestBlock pathEnd.lastClientUpdateHeightMu.Unlock() @@ -409,7 +459,7 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func() pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain pathEnd.channelStateCacheMu.Unlock() - pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog + pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync, memoLimit) // Merge incoming packet IBC messages into the backlog pathEnd.ibcHeaderCache.Merge(d.IBCHeaderCache) // Update latest IBC header state pathEnd.ibcHeaderCache.Prune(ibcHeadersToCache) // Only keep most recent IBC headers diff --git a/relayer/processor/path_processor.go b/relayer/processor/path_processor.go index c6a284bf7..0d43511be 100644 --- a/relayer/processor/path_processor.go +++ b/relayer/processor/path_processor.go @@ -79,7 +79,8 @@ type PathProcessor struct { // true if this is a localhost IBC connection isLocalhost bool - maxMsgs uint64 + maxMsgs uint64 + memoLimit int metrics *PrometheusMetrics } @@ -105,6 +106,7 @@ func NewPathProcessor( clientUpdateThresholdTime time.Duration, flushInterval time.Duration, maxMsgs uint64, + memoLimit int, ) *PathProcessor { isLocalhost := pathEnd1.ClientID == ibcexported.LocalhostClientID @@ -119,6 +121,7 @@ func NewPathProcessor( metrics: metrics, isLocalhost: isLocalhost, maxMsgs: maxMsgs, + memoLimit: memoLimit, } if flushInterval == 0 { pp.disablePeriodicFlush() @@ -319,11 +322,11 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun return true case d := <-pp.pathEnd1.incomingCacheData: // we have new data from ChainProcessor for pathEnd1 - pp.pathEnd1.mergeCacheData(ctx, cancel, d, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync, pp.messageLifecycle, pp.pathEnd2) + pp.pathEnd1.mergeCacheData(ctx, cancel, d, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync, pp.messageLifecycle, pp.pathEnd2, pp.memoLimit) case d := <-pp.pathEnd2.incomingCacheData: // we have new data from ChainProcessor for pathEnd2 - pp.pathEnd2.mergeCacheData(ctx, cancel, d, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync, pp.messageLifecycle, pp.pathEnd1) + pp.pathEnd2.mergeCacheData(ctx, cancel, d, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync, pp.messageLifecycle, pp.pathEnd1, pp.memoLimit) case <-pp.retryProcess: // No new data to merge in, just retry handling. diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index 1f2c627bb..4ec0dcf94 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1493,8 +1493,8 @@ func (pp *PathProcessor) flush(ctx context.Context) error { return fmt.Errorf("failed to enqueue pending messages for flush: %w", err) } - pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync) - pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync) + pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync, pp.memoLimit) + pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync, pp.memoLimit) if len(skipped) > 0 { skippedPacketsString := "" diff --git a/relayer/strategies.go b/relayer/strategies.go index 704e2aa25..9363f2997 100644 --- a/relayer/strategies.go +++ b/relayer/strategies.go @@ -39,6 +39,7 @@ func StartRelayer( chains map[string]*Chain, paths []NamedPath, maxMsgLength uint64, + memoLimit int, memo string, clientUpdateThresholdTime time.Duration, flushInterval time.Duration, @@ -99,6 +100,7 @@ func StartRelayer( ePaths, initialBlockHistory, maxMsgLength, + memoLimit, memo, messageLifecycle, clientUpdateThresholdTime, @@ -154,6 +156,7 @@ func relayerStartEventProcessor( paths []path, initialBlockHistory uint64, maxMsgLength uint64, + memoLimit int, memo string, messageLifecycle processor.MessageLifecycle, clientUpdateThresholdTime time.Duration, @@ -179,6 +182,7 @@ func relayerStartEventProcessor( clientUpdateThresholdTime, flushInterval, maxMsgLength, + memoLimit, )) }