From f52b99ca0e0155b2f1684effaaf99cdd31a423fd Mon Sep 17 00:00:00 2001 From: geoknee Date: Thu, 19 Sep 2024 13:54:56 +0100 Subject: [PATCH 1/4] tidy up godoc --- op-batcher/batcher/channel.go | 11 ++++++----- op-batcher/batcher/channel_builder.go | 4 ++-- op-batcher/batcher/channel_config.go | 4 ++-- op-batcher/batcher/channel_config_provider.go | 6 +++++- op-batcher/batcher/driver.go | 9 +++++---- 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/op-batcher/batcher/channel.go b/op-batcher/batcher/channel.go index 9a5e6b4c6a2e..f18758d8582c 100644 --- a/op-batcher/batcher/channel.go +++ b/op-batcher/batcher/channel.go @@ -155,9 +155,9 @@ func (s *channel) ID() derive.ChannelID { return s.channelBuilder.ID() } -// NextTxData returns the next tx data packet. -// If cfg.MultiFrameTxs is false, it returns txData with a single frame. -// If cfg.MultiFrameTxs is true, it will read frames from its channel builder +// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet. +// If cfg.UseBlobs is false, it returns txData with a single frame. +// If cfg.UseBlobs is true, it will read frames from its channel builder // until it either doesn't have more frames or the target number of frames is reached. // // NextTxData should only be called after HasTxData returned true. @@ -177,10 +177,11 @@ func (s *channel) NextTxData() txData { } func (s *channel) HasTxData() bool { - if s.IsFull() || !s.cfg.UseBlobs { + if s.IsFull() || // If the channel is full, we should start to submit it + !s.cfg.UseBlobs { // If using calldata, we only send one frame per tx return s.channelBuilder.HasFrame() } - // collect enough frames if channel is not full yet + // Collect enough frames if channel is not full yet return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx()) } diff --git a/op-batcher/batcher/channel_builder.go b/op-batcher/batcher/channel_builder.go index 4cdcba8325c2..d0e47238fd5a 100644 --- a/op-batcher/batcher/channel_builder.go +++ b/op-batcher/batcher/channel_builder.go @@ -416,12 +416,12 @@ func (c *ChannelBuilder) HasFrame() bool { } // PendingFrames returns the number of pending frames in the frames queue. -// It is larger zero iff HasFrames() returns true. +// It is larger zero iff HasFrame() returns true. func (c *ChannelBuilder) PendingFrames() int { return len(c.frames) } -// NextFrame returns the next available frame. +// NextFrame dequeues the next available frame. // HasFrame must be called prior to check if there's a next frame available. // Panics if called when there's no next frame. func (c *ChannelBuilder) NextFrame() frameData { diff --git a/op-batcher/batcher/channel_config.go b/op-batcher/batcher/channel_config.go index 63e0d5d5deef..45dc1d4dcfa4 100644 --- a/op-batcher/batcher/channel_config.go +++ b/op-batcher/batcher/channel_config.go @@ -51,8 +51,8 @@ type ChannelConfig struct { UseBlobs bool } -// ChannelConfig returns a copy of itself. This makes a ChannelConfig a static -// ChannelConfigProvider of itself. +// ChannelConfig returns a copy of the receiver. +// This allows the receiver to be a static ChannelConfigProvider of itself. func (cc ChannelConfig) ChannelConfig() ChannelConfig { return cc } diff --git a/op-batcher/batcher/channel_config_provider.go b/op-batcher/batcher/channel_config_provider.go index c65e83b8289f..171ccf5d9499 100644 --- a/op-batcher/batcher/channel_config_provider.go +++ b/op-batcher/batcher/channel_config_provider.go @@ -48,6 +48,10 @@ func NewDynamicEthChannelConfig(lgr log.Logger, return dec } +// ChannelConfig will perform an estimate of the cost per byte for +// calldata and for blobs, given current market conditions: it will return +// the appropriate ChannelConfig depending on which is cheaper. It makes +// assumptions about the typical makeup of channel data. func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig { ctx, cancel := context.WithTimeout(context.Background(), dec.timeout) defer cancel() @@ -89,7 +93,7 @@ func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig { "blob_data_bytes", blobDataBytes, "blob_cost", blobCost, "cost_ratio", costRatio) - if ay.Cmp(bx) == 1 { + if ay.Cmp(bx) > 0 { lgr.Info("Using calldata channel config") dec.lastConfig = &dec.calldataConfig return dec.calldataConfig diff --git a/op-batcher/batcher/driver.go b/op-batcher/batcher/driver.go index fd56a1cebc6b..392b2739620c 100644 --- a/op-batcher/batcher/driver.go +++ b/op-batcher/batcher/driver.go @@ -190,10 +190,11 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error { // loadBlocksIntoState loads all blocks since the previous stored block // It does the following: -// 1. Fetch the sync status of the sequencer -// 2. Check if the sync status is valid or if we are all the way up to date -// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) -// 4. Load all new blocks into the local state. +// 1. Fetch the sync status of the sequencer +// 2. Check if the sync status is valid or if we are all the way up to date +// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?) +// 4. Load all new blocks into the local state. +// // If there is a reorg, it will reset the last stored block but not clear the internal state so // the state can be flushed to L1. func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error { From 855b15916a1d3a3aec3de512b64e27d22f7fe632 Mon Sep 17 00:00:00 2001 From: geoknee Date: Thu, 19 Sep 2024 14:06:49 +0100 Subject: [PATCH 2/4] move data availability config decision to channel submission time instead of channel creation time Also, cache the ChannelConfig whenever we switch DA type so it is used by default for new channels --- op-batcher/batcher/channel_manager.go | 81 +++++++++++++++++++++++++-- 1 file changed, 75 insertions(+), 6 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index 1f22565c94c5..c816d3d79866 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -35,6 +35,8 @@ type channelManager struct { blocks []*types.Block // The latest L1 block from all the L2 blocks in the most recently closed channel l1OriginLastClosedChannel eth.BlockID + // The default ChannelConfig to use for the next channel + defaultCfg ChannelConfig // last block hash - for reorg detection tip common.Hash @@ -54,6 +56,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe log: log, metr: metr, cfgProvider: cfgProvider, + defaultCfg: cfgProvider.ChannelConfig(), rollupCfg: rollupCfg, txChannels: make(map[string]*channel), } @@ -133,7 +136,8 @@ func (s *channelManager) removePendingChannel(channel *channel) { s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...) } -// nextTxData pops off s.datas & handles updating the internal state +// nextTxData dequeues frames from the channel and returns them encoded in a transaction. +// It also handles updating the internal state of the receiver. func (s *channelManager) nextTxData(channel *channel) (txData, error) { if channel == nil || !channel.HasTxData() { s.log.Trace("no next tx data") @@ -146,10 +150,44 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) { // TxData returns the next tx data that should be submitted to L1. // -// If the pending channel is +// If the current channel is // full, it only returns the remaining frames of this channel until it got // successfully fully sent to L1. It returns io.EOF if there's no pending tx data. +// +// It will generate the tx data internally, and decide whether to switch DA type +// automatically. When switching DA type, the channelManager state will be rebuilt +// with a new ChannelConfig. func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { + data, err := s.txData(l1Head) + if s.currentChannel == nil { + s.log.Trace("no current channel") + return data, err + } + assumedBlobs := s.currentChannel.cfg.UseBlobs + newCfg := s.cfgProvider.ChannelConfig() + if newCfg.UseBlobs == assumedBlobs { + s.log.Info("Recomputing optimal ChannelConfig: no need to switch DA type", "useBlobs", assumedBlobs) + return data, err + } + s.log.Info("Recomputing optimal ChannelConfig: changing DA type...", "useBlobsBefore", assumedBlobs, "useBlobsAfter", newCfg.UseBlobs) + // We have detected that our assumptions on DA + // type were wrong and we need to rebuild + // the channel manager state + err = s.Rebuild(newCfg) + if err != nil { + return data, err + } + // Finally, call the inner function to get txData + // with the new config + return s.txData(l1Head) +} + +// txData returns the next tx data that should be submitted to L1. +// +// If the current channel is +// full, it only returns the remaining frames of this channel until it got +// successfully fully sent to L1. It returns io.EOF if there's no pending tx data. +func (s *channelManager) txData(l1Head eth.BlockID) (txData, error) { s.mu.Lock() defer s.mu.Unlock() var firstWithTxData *channel @@ -160,10 +198,11 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { } } - dataPending := firstWithTxData != nil && firstWithTxData.HasTxData() + dataPending := firstWithTxData != nil s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks)) // Short circuit if there is pending tx data or the channel manager is closed. + if dataPending || s.closed { return s.nextTxData(firstWithTxData) } @@ -203,7 +242,10 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { return nil } - cfg := s.cfgProvider.ChannelConfig() + // We reuse the ChannelConfig from the last channel. + // This will be reassessed at channel submission-time, + // but this is our best guess at the appropriate values for now. + cfg := s.defaultCfg pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number) if err != nil { return fmt.Errorf("creating new channel: %w", err) @@ -228,7 +270,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error { return nil } -// registerL1Block registers the given block at the pending channel. +// registerL1Block registers the given block at the current channel. func (s *channelManager) registerL1Block(l1Head eth.BlockID) { s.currentChannel.CheckTimeout(l1Head.Number) s.log.Debug("new L1-block registered at channel builder", @@ -238,7 +280,7 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) { ) } -// processBlocks adds blocks from the blocks queue to the pending channel until +// processBlocks adds blocks from the blocks queue to the current channel until // either the queue got exhausted or the channel is full. func (s *channelManager) processBlocks() error { var ( @@ -288,6 +330,7 @@ func (s *channelManager) processBlocks() error { return nil } +// outputFrames generates frames for the current channel, and computes and logs the compression ratio func (s *channelManager) outputFrames() error { if err := s.currentChannel.OutputFrames(); err != nil { return fmt.Errorf("creating frames with channel builder: %w", err) @@ -339,6 +382,7 @@ func (s *channelManager) outputFrames() error { func (s *channelManager) AddL2Block(block *types.Block) error { s.mu.Lock() defer s.mu.Unlock() + if s.tip != (common.Hash{}) && s.tip != block.ParentHash() { return ErrReorg } @@ -414,3 +458,28 @@ func (s *channelManager) Close() error { } return nil } + +// Rebuild rebuilds the channel manager state by +// rewinding blocks back from the channel queue, and setting the defaultCfg. +func (s *channelManager) Rebuild(newCfg ChannelConfig) error { + s.mu.Lock() + defer s.mu.Unlock() + s.log.Info("Rebuilding channelManager state", "UseBlobs", newCfg.UseBlobs) + newChannelQueue := []*channel{} + blocksToRequeueInChannelManager := []*types.Block{} + for _, channel := range s.channelQueue { + if len(channel.pendingTransactions) > 0 { + newChannelQueue = append(newChannelQueue, channel) + continue + } + blocksToRequeueInChannelManager = append(blocksToRequeueInChannelManager, channel.channelBuilder.Blocks()...) + } + // We put the blocks back at the front of the queue: + s.blocks = append(blocksToRequeueInChannelManager, s.blocks...) + s.channelQueue = newChannelQueue + s.currentChannel = nil + // Setting the defaultCfg will cause new channels + // to pick up the new ChannelConfig + s.defaultCfg = newCfg + return nil +} From 2cd99dead3a08d24a35326bf289a9050587e3534 Mon Sep 17 00:00:00 2001 From: geoknee Date: Thu, 19 Sep 2024 14:06:56 +0100 Subject: [PATCH 3/4] fix test --- op-batcher/batcher/driver_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/op-batcher/batcher/driver_test.go b/op-batcher/batcher/driver_test.go index df72fa28d49a..802061673897 100644 --- a/op-batcher/batcher/driver_test.go +++ b/op-batcher/batcher/driver_test.go @@ -49,6 +49,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) { return NewBatchSubmitter(DriverSetup{ Log: testlog.Logger(t, log.LevelDebug), Metr: metrics.NoopMetrics, + ChannelConfig: defaultTestChannelConfig(), RollupConfig: &cfg, EndpointProvider: ep, }), ep From 99bc3851645b0024ff0ac8fb63c96208ec5d0cd9 Mon Sep 17 00:00:00 2001 From: geoknee Date: Thu, 19 Sep 2024 14:08:54 +0100 Subject: [PATCH 4/4] formatting changes --- op-batcher/batcher/channel_manager.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/op-batcher/batcher/channel_manager.go b/op-batcher/batcher/channel_manager.go index c816d3d79866..324a9ccbe859 100644 --- a/op-batcher/batcher/channel_manager.go +++ b/op-batcher/batcher/channel_manager.go @@ -166,10 +166,13 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) { assumedBlobs := s.currentChannel.cfg.UseBlobs newCfg := s.cfgProvider.ChannelConfig() if newCfg.UseBlobs == assumedBlobs { - s.log.Info("Recomputing optimal ChannelConfig: no need to switch DA type", "useBlobs", assumedBlobs) + s.log.Info("Recomputing optimal ChannelConfig: no need to switch DA type", + "useBlobs", assumedBlobs) return data, err } - s.log.Info("Recomputing optimal ChannelConfig: changing DA type...", "useBlobsBefore", assumedBlobs, "useBlobsAfter", newCfg.UseBlobs) + s.log.Info("Recomputing optimal ChannelConfig: changing DA type...", + "useBlobsBefore", assumedBlobs, + "useBlobsAfter", newCfg.UseBlobs) // We have detected that our assumptions on DA // type were wrong and we need to rebuild // the channel manager state @@ -202,7 +205,6 @@ func (s *channelManager) txData(l1Head eth.BlockID) (txData, error) { s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks)) // Short circuit if there is pending tx data or the channel manager is closed. - if dataPending || s.closed { return s.nextTxData(firstWithTxData) }