From 1e2726ccb1737b6302828f94f2ed2aa839d4db64 Mon Sep 17 00:00:00 2001 From: Franco Testagrossa Date: Mon, 23 Sep 2024 13:11:19 +0200 Subject: [PATCH] Try prune /acks on network discrepancy Remove acks persistency --- hydra-node/src/Hydra/Network/Reliability.hs | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/hydra-node/src/Hydra/Network/Reliability.hs b/hydra-node/src/Hydra/Network/Reliability.hs index 231906b7cf5..af4f34db86e 100644 --- a/hydra-node/src/Hydra/Network/Reliability.hs +++ b/hydra-node/src/Hydra/Network/Reliability.hs @@ -221,8 +221,8 @@ withReliability :: -- | Underlying network component providing consuming and sending channels. NetworkComponent m (Authenticated (ReliableMsg (Heartbeat inbound))) (ReliableMsg (Heartbeat outbound)) a -> NetworkComponent m (Authenticated (Heartbeat inbound)) (Heartbeat outbound) a -withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loadMessages} me otherParties withRawNetwork callback action = do - acksCache <- loadAcks >>= newTVarIO +withReliability tracer MessagePersistence{appendMessage, loadMessages} me otherParties withRawNetwork callback action = do + acksCache <- newTVarIO mempty sentMessages <- loadMessages >>= newTVarIO . Seq.fromList resendQ <- newTQueueIO let ourIndex = fromMaybe (error "This cannot happen because we constructed the list with our party inside.") (findPartyIndex me) @@ -242,13 +242,11 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa case msg of Data{} -> do localCounter <- atomically $ cacheMessage msg >> incrementAckCounter - saveAcks localCounter appendMessage msg traceWith tracer BroadcastCounter{ourIndex, localCounter} broadcast $ ReliableMsg localCounter msg Ping{} -> do localCounter <- readTVarIO acksCache - saveAcks localCounter traceWith tracer BroadcastPing{ourIndex, localCounter} broadcast $ ReliableMsg localCounter msg } @@ -288,8 +286,12 @@ withReliability tracer MessagePersistence{saveAcks, loadAcks, appendMessage, loa let newAcks = constructAcks loadedAcks partyIndex lift $ writeTVar acksCache newAcks return (True, partyIndex, newAcks) - | otherwise -> - -- other messages are dropped + | messageAckForParty <= knownAckForParty -> + -- old messages are dropped + return (False, partyIndex, loadedAcks) + | otherwise -> do + -- others + lift $ writeTVar acksCache mempty return (False, partyIndex, loadedAcks) case eShouldCallbackWithKnownAcks of