Skip to content

Commit

Permalink
Fix flush race (#1363)
Browse files Browse the repository at this point in the history
* Remove writeack too when removing packet retention

* Add additional state tracking to prevent loops

* simplify

* fix
  • Loading branch information
agouin committed Jan 30, 2024
1 parent ef1578e commit 287354b
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 4 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
cosmossdk.io/math v1.2.0
cosmossdk.io/store v1.0.2
cosmossdk.io/x/feegrant v0.1.0
cosmossdk.io/x/tx v0.13.0
cosmossdk.io/x/upgrade v0.1.0
github.com/avast/retry-go/v4 v4.5.1
github.com/btcsuite/btcd v0.23.5-0.20231215221805-96c9fd8078fd
Expand All @@ -22,7 +23,6 @@ require (
github.com/cosmos/ics23/go v0.10.0
github.com/ethereum/go-ethereum v1.13.5
github.com/gofrs/flock v0.8.1
github.com/google/go-cmp v0.6.0
github.com/google/go-github/v43 v43.0.0
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/jsternberg/zap-logfmt v1.3.0
Expand Down Expand Up @@ -51,7 +51,6 @@ require (
cosmossdk.io/core v0.11.0 // indirect
cosmossdk.io/depinject v1.0.0-alpha.4 // indirect
cosmossdk.io/log v1.3.0 // indirect
cosmossdk.io/x/tx v0.13.0 // indirect
filippo.io/edwards25519 v1.0.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
Expand Down Expand Up @@ -110,6 +109,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/orderedcode v0.0.1 // indirect
github.com/google/s2a-go v0.1.7 // indirect
Expand Down
14 changes: 13 additions & 1 deletion relayer/processor/path_end_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ func (pathEnd *pathEndRuntime) mergeMessageCache(
channelHandshakeMessages := make(ChannelMessagesCache)
clientICQMessages := make(ClientICQMessagesCache)

messageCache.PacketState.Prune(100) // Only keep most recent 100 packet states per channel

for ch, pmc := range messageCache.PacketFlow {
if pathEnd.ShouldRelayChannel(ChainChannelKey{
ChainID: pathEnd.info.ChainID,
Expand Down Expand Up @@ -194,6 +196,12 @@ func (pathEnd *pathEndRuntime) mergeMessageCache(
}

packetMessages[ch] = newPmc

for eventType, pCache := range newPmc {
for seq := range pCache {
pathEnd.messageCache.PacketState.UpdateState(ch, seq, eventType)
}
}
}
}

Expand Down Expand Up @@ -610,9 +618,13 @@ func (pathEnd *pathEndRuntime) removePacketRetention(
case chantypes.EventTypeRecvPacket:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeSendPacket] = []uint64{sequence}
case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket:
case chantypes.EventTypeAcknowledgePacket:
toDelete[eventType] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeRecvPacket] = []uint64{sequence}
toDeleteCounterparty[chantypes.EventTypeWriteAck] = []uint64{sequence}
toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence}
case chantypes.EventTypeTimeoutPacket:
toDelete[eventType] = []uint64{sequence}
toDelete[chantypes.EventTypeSendPacket] = []uint64{sequence}
}
// delete in progress send for this specific message
Expand Down
30 changes: 30 additions & 0 deletions relayer/processor/path_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,36 @@ func (pp *PathProcessor) processAvailableSignals(ctx context.Context, cancel fun
case <-pp.retryProcess:
// No new data to merge in, just retry handling.
case <-pp.flushTimer.C:
for len(pp.pathEnd1.incomingCacheData) > 0 {
d := <-pp.pathEnd1.incomingCacheData
// we have new data from ChainProcessor for pathEnd1
pp.pathEnd1.mergeCacheData(
ctx,
cancel,
d,
pp.pathEnd2.info.ChainID,
pp.pathEnd2.inSync,
pp.messageLifecycle,
pp.pathEnd2,
pp.memoLimit,
pp.maxReceiverSize,
)
}
for len(pp.pathEnd2.incomingCacheData) > 0 {
d := <-pp.pathEnd2.incomingCacheData
// we have new data from ChainProcessor for pathEnd2
pp.pathEnd2.mergeCacheData(
ctx,
cancel,
d,
pp.pathEnd1.info.ChainID,
pp.pathEnd1.inSync,
pp.messageLifecycle,
pp.pathEnd1,
pp.memoLimit,
pp.maxReceiverSize,
)
}
// Periodic flush to clear out any old packets
pp.handleFlush(ctx)
}
Expand Down
11 changes: 10 additions & 1 deletion relayer/processor/path_processor_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -1261,6 +1261,10 @@ func (pp *PathProcessor) queuePendingRecvAndAcks(
var skipped *skippedPackets

for i, seq := range unrecv {
if state, ok := dst.messageCache.PacketState.State(k, seq); ok && stateValue(state) >= stateValue(chantypes.EventTypeRecvPacket) {
continue // already recv'd by path processor
}

srcMu.Lock()
if srcCache.IsCached(chantypes.EventTypeSendPacket, k, seq) {
continue // already cached
Expand Down Expand Up @@ -1340,8 +1344,13 @@ SeqLoop:
}

for i, seq := range unacked {
dstMu.Lock()
ck := k.Counterparty()

if state, ok := dst.messageCache.PacketState.State(ck, seq); ok && stateValue(state) >= stateValue(chantypes.EventTypeAcknowledgePacket) {
continue // already acked by path processor
}

dstMu.Lock()
if dstCache.IsCached(chantypes.EventTypeRecvPacket, ck, seq) &&
dstCache.IsCached(chantypes.EventTypeWriteAck, ck, seq) {
continue // already cached
Expand Down
78 changes: 78 additions & 0 deletions relayer/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (t *ChannelCloseLifecycle) messageLifecycler() {}
// which will retain relevant messages for each PathProcessor.
type IBCMessagesCache struct {
PacketFlow ChannelPacketMessagesCache
PacketState ChannelPacketStateCache
ConnectionHandshake ConnectionMessagesCache
ChannelHandshake ChannelMessagesCache
ClientICQ ClientICQMessagesCache
Expand All @@ -115,6 +116,7 @@ func (c IBCMessagesCache) Clone() IBCMessagesCache {
func NewIBCMessagesCache() IBCMessagesCache {
return IBCMessagesCache{
PacketFlow: make(ChannelPacketMessagesCache),
PacketState: make(ChannelPacketStateCache),
ConnectionHandshake: make(ConnectionMessagesCache),
ChannelHandshake: make(ChannelMessagesCache),
ClientICQ: make(ClientICQMessagesCache),
Expand All @@ -124,12 +126,18 @@ func NewIBCMessagesCache() IBCMessagesCache {
// ChannelPacketMessagesCache is used for caching a PacketMessagesCache for a given IBC channel.
type ChannelPacketMessagesCache map[ChannelKey]PacketMessagesCache

// ChannelPacketStateCache is used for caching a PacketSequenceStateCache for a given IBC channel.
type ChannelPacketStateCache map[ChannelKey]PacketSequenceStateCache

// PacketMessagesCache is used for caching a PacketSequenceCache for a given IBC message type.
type PacketMessagesCache map[string]PacketSequenceCache

// PacketSequenceCache is used for caching an IBC message for a given packet sequence.
type PacketSequenceCache map[uint64]provider.PacketInfo

// PacketSequenceStateCache is used for caching the state of a packet sequence.
type PacketSequenceStateCache map[uint64]string

// ChannelMessagesCache is used for caching a ChannelMessageCache for a given IBC message type.
type ChannelMessagesCache map[string]ChannelMessageCache

Expand Down Expand Up @@ -344,6 +352,76 @@ func (c PacketMessagesCache) DeleteMessages(toDelete ...map[string][]uint64) {
}
}

func stateValue(state string) int {
switch state {
case chantypes.EventTypeSendPacket:
return 1
case chantypes.EventTypeRecvPacket:
return 2
case chantypes.EventTypeWriteAck:
return 3
case chantypes.EventTypeAcknowledgePacket, chantypes.EventTypeTimeoutPacket:
return 4
}
panic(fmt.Errorf("unexpected state: %s", state))
}

func (c ChannelPacketStateCache) UpdateState(k ChannelKey, sequence uint64, state string) {
minState := 0
if sequenceCache, ok := c[k]; ok {
if currentState, ok := sequenceCache[sequence]; ok {
minState = stateValue(currentState)
}
} else {
c[k] = make(PacketSequenceStateCache)
}

if stateValue(state) <= minState {
// can't downgrade state
return
}

c[k][sequence] = state
}

func (c ChannelPacketStateCache) State(k ChannelKey, sequence uint64) (string, bool) {
sequenceCache, ok := c[k]
if !ok {
return "", false
}

state, ok := sequenceCache[sequence]
if !ok {
return "", false
}

return state, true
}

// Prune deletes all map entries except for the most recent (keep) for all channels.
func (c ChannelPacketStateCache) Prune(keep int) {
for _, pssc := range c {
pssc.Prune(keep)
}
}

// Prune deletes all map entries except for the most recent (keep).
func (c PacketSequenceStateCache) Prune(keep int) {
if len(c) <= keep {
return
}
seqs := make([]uint64, 0, len(c))
for seq := range c {
seqs = append(seqs, seq)
}
sort.Slice(seqs, func(i, j int) bool { return seqs[i] < seqs[j] })

// only keep recent packet states
for _, seq := range seqs[:len(seqs)-keep] {
delete(c, seq)
}
}

// IsCached returns true if a sequence for a channel key and event type is already cached.
func (c ChannelPacketMessagesCache) IsCached(eventType string, k ChannelKey, sequence uint64) bool {
if _, ok := c[k]; !ok {
Expand Down
32 changes: 32 additions & 0 deletions relayer/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package processor_test
import (
"testing"

chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -41,3 +42,34 @@ func TestIBCHeaderCachePrune(t *testing.T) {
require.Len(t, cache, 5)
require.NotNil(t, cache[uint64(15)], cache[uint64(16)], cache[uint64(17)], cache[uint64(18)], cache[uint64(19)])
}

func TestPacketSequenceStateCachePrune(t *testing.T) {
cache := make(processor.PacketSequenceStateCache)

for i := uint64(0); i < 50; i++ {
cache[i] = chantypes.EventTypeSendPacket
}

cache.Prune(100)

require.Len(t, cache, 50)

cache.Prune(25)

require.Len(t, cache, 25)

min := uint64(1000)
max := uint64(0)

for seq := range cache {
if seq < min {
min = seq
}
if seq > max {
max = seq
}
}

require.Equal(t, uint64(25), min)
require.Equal(t, uint64(49), max)
}

0 comments on commit 287354b

Please sign in to comment.