Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

op-batcher: Move decision about data availability type to channel submission time #12002

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions op-batcher/batcher/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,9 +155,9 @@ func (s *channel) ID() derive.ChannelID {
return s.channelBuilder.ID()
}

// NextTxData returns the next tx data packet.
// If cfg.MultiFrameTxs is false, it returns txData with a single frame.
// If cfg.MultiFrameTxs is true, it will read frames from its channel builder
// NextTxData dequeues the next frames from the channel and returns them encoded in a tx data packet.
// If cfg.UseBlobs is false, it returns txData with a single frame.
// If cfg.UseBlobs is true, it will read frames from its channel builder
// until it either doesn't have more frames or the target number of frames is reached.
//
// NextTxData should only be called after HasTxData returned true.
Expand All @@ -177,10 +177,11 @@ func (s *channel) NextTxData() txData {
}

func (s *channel) HasTxData() bool {
if s.IsFull() || !s.cfg.UseBlobs {
if s.IsFull() || // If the channel is full, we should start to submit it
!s.cfg.UseBlobs { // If using calldata, we only send one frame per tx
return s.channelBuilder.HasFrame()
}
// collect enough frames if channel is not full yet
// Collect enough frames if channel is not full yet
return s.channelBuilder.PendingFrames() >= int(s.cfg.MaxFramesPerTx())
}

Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,12 @@ func (c *ChannelBuilder) HasFrame() bool {
}

// PendingFrames returns the number of pending frames in the frames queue.
// It is larger zero iff HasFrames() returns true.
// It is larger zero iff HasFrame() returns true.
func (c *ChannelBuilder) PendingFrames() int {
return len(c.frames)
}

// NextFrame returns the next available frame.
// NextFrame dequeues the next available frame.
// HasFrame must be called prior to check if there's a next frame available.
// Panics if called when there's no next frame.
func (c *ChannelBuilder) NextFrame() frameData {
Expand Down
4 changes: 2 additions & 2 deletions op-batcher/batcher/channel_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ type ChannelConfig struct {
UseBlobs bool
}

// ChannelConfig returns a copy of itself. This makes a ChannelConfig a static
// ChannelConfigProvider of itself.
// ChannelConfig returns a copy of the receiver.
// This allows the receiver to be a static ChannelConfigProvider of itself.
func (cc ChannelConfig) ChannelConfig() ChannelConfig {
return cc
}
Expand Down
6 changes: 5 additions & 1 deletion op-batcher/batcher/channel_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ func NewDynamicEthChannelConfig(lgr log.Logger,
return dec
}

// ChannelConfig will perform an estimate of the cost per byte for
// calldata and for blobs, given current market conditions: it will return
// the appropriate ChannelConfig depending on which is cheaper. It makes
// assumptions about the typical makeup of channel data.
func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig {
ctx, cancel := context.WithTimeout(context.Background(), dec.timeout)
defer cancel()
Expand Down Expand Up @@ -89,7 +93,7 @@ func (dec *DynamicEthChannelConfig) ChannelConfig() ChannelConfig {
"blob_data_bytes", blobDataBytes, "blob_cost", blobCost,
"cost_ratio", costRatio)

if ay.Cmp(bx) == 1 {
if ay.Cmp(bx) > 0 {
lgr.Info("Using calldata channel config")
dec.lastConfig = &dec.calldataConfig
return dec.calldataConfig
Expand Down
83 changes: 77 additions & 6 deletions op-batcher/batcher/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type channelManager struct {
blocks []*types.Block
// The latest L1 block from all the L2 blocks in the most recently closed channel
l1OriginLastClosedChannel eth.BlockID
// The default ChannelConfig to use for the next channel
defaultCfg ChannelConfig
// last block hash - for reorg detection
tip common.Hash

Expand All @@ -54,6 +56,7 @@ func NewChannelManager(log log.Logger, metr metrics.Metricer, cfgProvider Channe
log: log,
metr: metr,
cfgProvider: cfgProvider,
defaultCfg: cfgProvider.ChannelConfig(),
rollupCfg: rollupCfg,
txChannels: make(map[string]*channel),
}
Expand Down Expand Up @@ -133,7 +136,8 @@ func (s *channelManager) removePendingChannel(channel *channel) {
s.channelQueue = append(s.channelQueue[:index], s.channelQueue[index+1:]...)
}

// nextTxData pops off s.datas & handles updating the internal state
// nextTxData dequeues frames from the channel and returns them encoded in a transaction.
// It also handles updating the internal state of the receiver.
func (s *channelManager) nextTxData(channel *channel) (txData, error) {
if channel == nil || !channel.HasTxData() {
s.log.Trace("no next tx data")
Expand All @@ -146,10 +150,47 @@ func (s *channelManager) nextTxData(channel *channel) (txData, error) {

// TxData returns the next tx data that should be submitted to L1.
//
// If the pending channel is
// If the current channel is
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending tx data.
//
// It will generate the tx data internally, and decide whether to switch DA type
// automatically. When switching DA type, the channelManager state will be rebuilt
// with a new ChannelConfig.
func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
data, err := s.txData(l1Head)
if s.currentChannel == nil {
s.log.Trace("no current channel")
return data, err
}
assumedBlobs := s.currentChannel.cfg.UseBlobs
newCfg := s.cfgProvider.ChannelConfig()
if newCfg.UseBlobs == assumedBlobs {
s.log.Info("Recomputing optimal ChannelConfig: no need to switch DA type",
"useBlobs", assumedBlobs)
return data, err
}
s.log.Info("Recomputing optimal ChannelConfig: changing DA type...",
"useBlobsBefore", assumedBlobs,
"useBlobsAfter", newCfg.UseBlobs)
// We have detected that our assumptions on DA
// type were wrong and we need to rebuild
// the channel manager state
err = s.Rebuild(newCfg)
if err != nil {
return data, err
}
// Finally, call the inner function to get txData
// with the new config
return s.txData(l1Head)
}

// txData returns the next tx data that should be submitted to L1.
//
// If the current channel is
// full, it only returns the remaining frames of this channel until it got
// successfully fully sent to L1. It returns io.EOF if there's no pending tx data.
func (s *channelManager) txData(l1Head eth.BlockID) (txData, error) {
s.mu.Lock()
defer s.mu.Unlock()
var firstWithTxData *channel
Expand All @@ -160,7 +201,7 @@ func (s *channelManager) TxData(l1Head eth.BlockID) (txData, error) {
}
}

dataPending := firstWithTxData != nil && firstWithTxData.HasTxData()
dataPending := firstWithTxData != nil
s.log.Debug("Requested tx data", "l1Head", l1Head, "txdata_pending", dataPending, "blocks_pending", len(s.blocks))

// Short circuit if there is pending tx data or the channel manager is closed.
Expand Down Expand Up @@ -203,7 +244,10 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

cfg := s.cfgProvider.ChannelConfig()
// We reuse the ChannelConfig from the last channel.
// This will be reassessed at channel submission-time,
// but this is our best guess at the appropriate values for now.
cfg := s.defaultCfg
pc, err := newChannel(s.log, s.metr, cfg, s.rollupCfg, s.l1OriginLastClosedChannel.Number)
if err != nil {
return fmt.Errorf("creating new channel: %w", err)
Expand All @@ -228,7 +272,7 @@ func (s *channelManager) ensureChannelWithSpace(l1Head eth.BlockID) error {
return nil
}

// registerL1Block registers the given block at the pending channel.
// registerL1Block registers the given block at the current channel.
func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
s.currentChannel.CheckTimeout(l1Head.Number)
s.log.Debug("new L1-block registered at channel builder",
Expand All @@ -238,7 +282,7 @@ func (s *channelManager) registerL1Block(l1Head eth.BlockID) {
)
}

// processBlocks adds blocks from the blocks queue to the pending channel until
// processBlocks adds blocks from the blocks queue to the current channel until
// either the queue got exhausted or the channel is full.
func (s *channelManager) processBlocks() error {
var (
Expand Down Expand Up @@ -288,6 +332,7 @@ func (s *channelManager) processBlocks() error {
return nil
}

// outputFrames generates frames for the current channel, and computes and logs the compression ratio
func (s *channelManager) outputFrames() error {
if err := s.currentChannel.OutputFrames(); err != nil {
return fmt.Errorf("creating frames with channel builder: %w", err)
Expand Down Expand Up @@ -339,6 +384,7 @@ func (s *channelManager) outputFrames() error {
func (s *channelManager) AddL2Block(block *types.Block) error {
s.mu.Lock()
defer s.mu.Unlock()

if s.tip != (common.Hash{}) && s.tip != block.ParentHash() {
return ErrReorg
}
Expand Down Expand Up @@ -414,3 +460,28 @@ func (s *channelManager) Close() error {
}
return nil
}

// Rebuild rebuilds the channel manager state by
// rewinding blocks back from the channel queue, and setting the defaultCfg.
func (s *channelManager) Rebuild(newCfg ChannelConfig) error {
s.mu.Lock()
defer s.mu.Unlock()
s.log.Info("Rebuilding channelManager state", "UseBlobs", newCfg.UseBlobs)
newChannelQueue := []*channel{}
blocksToRequeueInChannelManager := []*types.Block{}
for _, channel := range s.channelQueue {
if len(channel.pendingTransactions) > 0 {
newChannelQueue = append(newChannelQueue, channel)
continue
}
blocksToRequeueInChannelManager = append(blocksToRequeueInChannelManager, channel.channelBuilder.Blocks()...)
}
// We put the blocks back at the front of the queue:
s.blocks = append(blocksToRequeueInChannelManager, s.blocks...)
s.channelQueue = newChannelQueue
s.currentChannel = nil
// Setting the defaultCfg will cause new channels
// to pick up the new ChannelConfig
s.defaultCfg = newCfg
return nil
}
9 changes: 5 additions & 4 deletions op-batcher/batcher/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,11 @@ func (l *BatchSubmitter) StopBatchSubmitting(ctx context.Context) error {

// loadBlocksIntoState loads all blocks since the previous stored block
// It does the following:
// 1. Fetch the sync status of the sequencer
// 2. Check if the sync status is valid or if we are all the way up to date
// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?)
// 4. Load all new blocks into the local state.
// 1. Fetch the sync status of the sequencer
// 2. Check if the sync status is valid or if we are all the way up to date
// 3. Check if it needs to initialize state OR it is lagging (todo: lagging just means race condition?)
// 4. Load all new blocks into the local state.
//
// If there is a reorg, it will reset the last stored block but not clear the internal state so
// the state can be flushed to L1.
func (l *BatchSubmitter) loadBlocksIntoState(ctx context.Context) error {
Expand Down
1 change: 1 addition & 0 deletions op-batcher/batcher/driver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func setup(t *testing.T) (*BatchSubmitter, *mockL2EndpointProvider) {
return NewBatchSubmitter(DriverSetup{
Log: testlog.Logger(t, log.LevelDebug),
Metr: metrics.NoopMetrics,
ChannelConfig: defaultTestChannelConfig(),
RollupConfig: &cfg,
EndpointProvider: ep,
}), ep
Expand Down
Loading