Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ics20 memo limit #1376

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ type GlobalConfig struct {
Memo string `yaml:"memo" json:"memo"`
LightCacheSize int `yaml:"light-cache-size" json:"light-cache-size"`
LogLevel string `yaml:"log-level" json:"log-level"`
ICS20MemoLimit int `yaml:"ics20-memo-limit" json:"ics20-memo-limit"`
}

// newDefaultGlobalConfig returns a global config with defaults set
Expand Down
1 change: 1 addition & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ $ %s start demo-path2 --max-tx-size 10`, appName, appName, appName, appName)),
chains,
paths,
maxMsgLength,
a.config.Global.ICS20MemoLimit,
a.config.memo(cmd),
clientUpdateThresholdTime,
flushInterval,
Expand Down
4 changes: 3 additions & 1 deletion cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"context"
"errors"
"fmt"
"github.com/avast/retry-go/v4"
"strings"
"time"

"github.com/avast/retry-go/v4"

sdk "github.com/cosmos/cosmos-sdk/types"
chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
"github.com/cosmos/relayer/v2/relayer"
Expand Down Expand Up @@ -933,6 +934,7 @@ $ %s tx flush demo-path channel-0`,
chains,
paths,
maxMsgLength,
a.config.Global.ICS20MemoLimit,
a.config.memo(cmd),
0,
0,
Expand Down
2 changes: 1 addition & 1 deletion relayer/chains/mock/mock_chain_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func TestMockChainAndPathProcessors(t *testing.T) {
flushInterval := 6 * time.Hour

pathProcessor := processor.NewPathProcessor(log, pathEnd1, pathEnd2, metrics, "",
clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength)
clientUpdateThresholdTime, flushInterval, relayer.DefaultMaxMsgLength, 0)

eventProcessor := processor.NewEventProcessor().
WithChainProcessors(
Expand Down
3 changes: 3 additions & 0 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (c *Chain) CreateOpenChannels(
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
0,
)

c.log.Info("Starting event processor for channel handshake",
Expand Down Expand Up @@ -133,6 +134,7 @@ func (c *Chain) CloseChannel(
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
0,
)).
WithInitialBlockHistory(0).
WithMessageLifecycle(&processor.FlushLifecycle{}).
Expand Down Expand Up @@ -171,6 +173,7 @@ func (c *Chain) CloseChannel(
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
0,
)).
WithInitialBlockHistory(0).
WithMessageLifecycle(&processor.ChannelCloseLifecycle{
Expand Down
1 change: 1 addition & 0 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func (c *Chain) CreateOpenConnections(
DefaultClientUpdateThreshold,
DefaultFlushInterval,
DefaultMaxMsgLength,
0,
)

var connectionSrc, connectionDst string
Expand Down
62 changes: 56 additions & 6 deletions relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package processor

import (
"context"
"fmt"
"sync"
"time"

transfertypes "github.com/cosmos/ibc-go/v8/modules/apps/transfer/types"
clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types"
conntypes "github.com/cosmos/ibc-go/v8/modules/core/03-connection/types"
chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
Expand Down Expand Up @@ -98,25 +100,64 @@ func (pathEnd *pathEndRuntime) isRelevantChannel(channelID string) bool {
return false
}

// checkMemoLimit returns an error if the packet memo exceeds the configured limit.
func checkMemoLimit(packetData []byte, memoLimit int) error {
if memoLimit == 0 {
// no limit
return nil
}

var packet transfertypes.FungibleTokenPacketData
if err := transfertypes.ModuleCdc.Unmarshal(packetData, &packet); err != nil {
// not an ICS-20 packet
return nil
}

if len(packet.Memo) > int(memoLimit) {
return fmt.Errorf("packet memo size: %d exceeds limit: %d", len(packet.Memo), memoLimit)
}

return nil
}

// mergeMessageCache merges relevant IBC messages for packet flows, connection handshakes, and channel handshakes.
// inSync indicates whether both involved ChainProcessors are in sync or not. When true, the observed packets
// metrics will be counted so that observed vs relayed packets can be compared.
func (pathEnd *pathEndRuntime) mergeMessageCache(messageCache IBCMessagesCache, counterpartyChainID string, inSync bool) {
func (pathEnd *pathEndRuntime) mergeMessageCache(
messageCache IBCMessagesCache,
counterpartyChainID string,
inSync bool,
memoLimit int,
) {
packetMessages := make(ChannelPacketMessagesCache)
connectionHandshakeMessages := make(ConnectionMessagesCache)
channelHandshakeMessages := make(ChannelMessagesCache)
clientICQMessages := make(ClientICQMessagesCache)

for ch, pmc := range messageCache.PacketFlow {
if pathEnd.ShouldRelayChannel(ChainChannelKey{ChainID: pathEnd.info.ChainID, CounterpartyChainID: counterpartyChainID, ChannelKey: ch}) {
if inSync && pathEnd.metrics != nil {
for eventType, pCache := range pmc {
newPmc := make(PacketMessagesCache)
for eventType, pCache := range pmc {
if inSync && pathEnd.metrics != nil {
pathEnd.metrics.AddPacketsObserved(pathEnd.info.PathName, pathEnd.info.ChainID, ch.ChannelID, ch.PortID, eventType, len(pCache))
}
newPc := make(PacketSequenceCache)
for seq, p := range pCache {
if err := checkMemoLimit(p.Data, memoLimit); err != nil {
pathEnd.log.Warn("Ignoring packet", zap.Error(err))
continue
}

newPc[seq] = p
}
if len(newPc) > 0 {
newPmc[eventType] = newPc
}
}
packetMessages[ch] = pmc
packetMessages[ch] = newPmc
}
}

pathEnd.messageCache.PacketFlow.Merge(packetMessages)

for eventType, cmc := range messageCache.ConnectionHandshake {
Expand Down Expand Up @@ -370,7 +411,16 @@ func (pathEnd *pathEndRuntime) checkForMisbehaviour(
return true, nil
}

func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func(), d ChainProcessorCacheData, counterpartyChainID string, counterpartyInSync bool, messageLifecycle MessageLifecycle, counterParty *pathEndRuntime) {
func (pathEnd *pathEndRuntime) mergeCacheData(
ctx context.Context,
cancel func(),
d ChainProcessorCacheData,
counterpartyChainID string,
counterpartyInSync bool,
messageLifecycle MessageLifecycle,
counterParty *pathEndRuntime,
memoLimit int,
) {
pathEnd.lastClientUpdateHeightMu.Lock()
pathEnd.latestBlock = d.LatestBlock
pathEnd.lastClientUpdateHeightMu.Unlock()
Expand Down Expand Up @@ -409,7 +459,7 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func()
pathEnd.channelStateCache = d.ChannelStateCache // Update latest channel open state for chain
pathEnd.channelStateCacheMu.Unlock()

pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog
pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync, memoLimit) // Merge incoming packet IBC messages into the backlog

pathEnd.ibcHeaderCache.Merge(d.IBCHeaderCache) // Update latest IBC header state
pathEnd.ibcHeaderCache.Prune(ibcHeadersToCache) // Only keep most recent IBC headers
Expand Down
9 changes: 6 additions & 3 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ type PathProcessor struct {
// true if this is a localhost IBC connection
isLocalhost bool

maxMsgs uint64
maxMsgs uint64
memoLimit int

metrics *PrometheusMetrics
}
Expand All @@ -105,6 +106,7 @@ func NewPathProcessor(
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
maxMsgs uint64,
memoLimit int,
) *PathProcessor {
isLocalhost := pathEnd1.ClientID == ibcexported.LocalhostClientID

Expand All @@ -119,6 +121,7 @@ func NewPathProcessor(
metrics: metrics,
isLocalhost: isLocalhost,
maxMsgs: maxMsgs,
memoLimit: memoLimit,
}
if flushInterval == 0 {
pp.disablePeriodicFlush()
Expand Down Expand Up @@ -319,11 +322,11 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun
return true
case d := <-pp.pathEnd1.incomingCacheData:
// we have new data from ChainProcessor for pathEnd1
pp.pathEnd1.mergeCacheData(ctx, cancel, d, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync, pp.messageLifecycle, pp.pathEnd2)
pp.pathEnd1.mergeCacheData(ctx, cancel, d, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync, pp.messageLifecycle, pp.pathEnd2, pp.memoLimit)

case d := <-pp.pathEnd2.incomingCacheData:
// we have new data from ChainProcessor for pathEnd2
pp.pathEnd2.mergeCacheData(ctx, cancel, d, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync, pp.messageLifecycle, pp.pathEnd1)
pp.pathEnd2.mergeCacheData(ctx, cancel, d, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync, pp.messageLifecycle, pp.pathEnd1, pp.memoLimit)

case <-pp.retryProcess:
// No new data to merge in, just retry handling.
Expand Down
4 changes: 2 additions & 2 deletions relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1493,8 +1493,8 @@ func (pp *PathProcessor) flush(ctx context.Context) error {
return fmt.Errorf("failed to enqueue pending messages for flush: %w", err)
}

pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync)
pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync)
pp.pathEnd1.mergeMessageCache(pathEnd1Cache, pp.pathEnd2.info.ChainID, pp.pathEnd2.inSync, pp.memoLimit)
pp.pathEnd2.mergeMessageCache(pathEnd2Cache, pp.pathEnd1.info.ChainID, pp.pathEnd1.inSync, pp.memoLimit)

if len(skipped) > 0 {
skippedPacketsString := ""
Expand Down
4 changes: 4 additions & 0 deletions relayer/strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func StartRelayer(
chains map[string]*Chain,
paths []NamedPath,
maxMsgLength uint64,
memoLimit int,
memo string,
clientUpdateThresholdTime time.Duration,
flushInterval time.Duration,
Expand Down Expand Up @@ -99,6 +100,7 @@ func StartRelayer(
ePaths,
initialBlockHistory,
maxMsgLength,
memoLimit,
memo,
messageLifecycle,
clientUpdateThresholdTime,
Expand Down Expand Up @@ -154,6 +156,7 @@ func relayerStartEventProcessor(
paths []path,
initialBlockHistory uint64,
maxMsgLength uint64,
memoLimit int,
memo string,
messageLifecycle processor.MessageLifecycle,
clientUpdateThresholdTime time.Duration,
Expand All @@ -179,6 +182,7 @@ func relayerStartEventProcessor(
clientUpdateThresholdTime,
flushInterval,
maxMsgLength,
memoLimit,
))
}

Expand Down
Loading