Skip to content

Commit

Permalink
fan in writeFrameInternal
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Mar 24, 2019
1 parent f4f6ca3 commit b45c894
Showing 1 changed file with 14 additions and 41 deletions.
55 changes: 14 additions & 41 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,34 +98,24 @@ func (s *Stream) Write(b []byte) (n int, err error) {
default:
}

frames := s.split(b, cmdPSH, s.id)
// frame split and transmit
sent := 0
for k := range frames {
req := writeRequest{
frame: frames[k],
result: make(chan writeResult, 1),
frame := newFrame(cmdPSH, s.id)
bts := b
for len(bts) > 0 {
sz := len(bts)
if sz > s.frameSize {
sz = s.frameSize
}

select {
case s.sess.writes <- req:
case <-s.die:
return sent, errors.New(errBrokenPipe)
case <-deadline:
return sent, errTimeout
}

select {
case result := <-req.result:
sent += result.n
if result.err != nil {
return sent, result.err
}
case <-s.die:
return sent, errors.New(errBrokenPipe)
case <-deadline:
return sent, errTimeout
frame.data = bts[:sz]
bts = bts[sz:]
n, err := s.sess.writeFrameInternal(frame, deadline)
sent += n
if err != nil {
return sent, err
}
}

return sent, nil
}

Expand Down Expand Up @@ -229,23 +219,6 @@ func (s *Stream) recycleTokens() (n int) {
return
}

// split large byte buffer into smaller frames, reference only
func (s *Stream) split(bts []byte, cmd byte, sid uint32) []Frame {
frames := make([]Frame, 0, len(bts)/s.frameSize+1)
for len(bts) > s.frameSize {
frame := newFrame(cmd, sid)
frame.data = bts[:s.frameSize]
bts = bts[s.frameSize:]
frames = append(frames, frame)
}
if len(bts) > 0 {
frame := newFrame(cmd, sid)
frame.data = bts
frames = append(frames, frame)
}
return frames
}

// notify read event
func (s *Stream) notifyReadEvent() {
select {
Expand Down

0 comments on commit b45c894

Please sign in to comment.