Skip to content

Commit

Permalink
Merge branch 'master' into dependabot/go_modules/google.golang.org/gr…
Browse files Browse the repository at this point in the history
…pc-1.53.0
  • Loading branch information
vipwzw committed Aug 2, 2023
2 parents 6d54514 + c909f4d commit a43bb9c
Show file tree
Hide file tree
Showing 8 changed files with 177 additions and 32 deletions.
2 changes: 1 addition & 1 deletion blockchain/blockstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,7 @@ func (bs *BlockStore) GetDbVersion() int64 {
return ver.Data
}

//SetDbVersion 获取blockchain的数据库版本号
//SetDbVersion 设置blockchain的数据库版本号
func (bs *BlockStore) SetDbVersion(versionNo int64) error {
ver := types.Int64{Data: versionNo}
verByte := types.Encode(&ver)
Expand Down
4 changes: 2 additions & 2 deletions executor/execenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ func (e *executor) execTxOne(feelog *types.Receipt, tx *types.Transaction, index
e.startTx()
receipt, err := e.Exec(tx, index)
if err != nil {
elog.Error("exec tx error = ", "err", err, "exec", string(tx.Execer), "action", tx.ActionName())
elog.Error("execTxOne", "exec tx error", err, "exec", string(tx.Execer), "action", tx.ActionName())
//add error log
errlog := &types.ReceiptLog{Ty: types.TyLogErr, Log: []byte(err.Error())}
feelog.Logs = append(feelog.Logs, errlog)
Expand Down Expand Up @@ -632,7 +632,7 @@ func (e *executor) execTx(exec *Executor, tx *types.Transaction, index int) (*ty
feelog, err = e.execTxOne(feelog, tx, index)
if err != nil {
e.rollback()
elog.Error("exec tx = ", "index", index, "execer", string(tx.Execer), "err", err)
elog.Error("execTx", "index", index, "execer", string(tx.Execer), "err", err)
} else {
err := e.commit()
if err != nil {
Expand Down
117 changes: 117 additions & 0 deletions system/p2p/dht/protocol/download/counter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package download

import (
"fmt"
"sync"
"time"
)

func peersCounterKey(taskID, pid string) string {
return fmt.Sprintf("%s-%s", taskID, pid)
}

type heightCostTime struct {
height int64
costTime int64
}

// PeerTaskCounter ...
type PeerTaskCounter struct {
pid string
latencies []time.Duration
taskID string
heightCostTimes []heightCostTime
}

// NewPeerTaskCounter ...
func NewPeerTaskCounter(pid, taskID string) *PeerTaskCounter {
return &PeerTaskCounter{
pid: pid,
taskID: taskID,
latencies: []time.Duration{},
heightCostTimes: []heightCostTime{},
}
}

// Pretty print information
func (p *PeerTaskCounter) Pretty() string {
return fmt.Sprintf("pid = %s, taskID = %s, latencys = %v, counter = %d, heightCostTimes = %v", p.pid, p.taskID, p.latencies, len(p.heightCostTimes), p.heightCostTimes)
}

// Append add counter info
func (p *PeerTaskCounter) Append(height, costTime int64) {
p.heightCostTimes = append(p.heightCostTimes, heightCostTime{
height: height,
costTime: costTime,
})
}

// AppendLatency ...
func (p *PeerTaskCounter) AppendLatency(latency time.Duration) {
p.latencies = append(p.latencies, latency)
}

// Counter ..
func (p *PeerTaskCounter) Counter() int64 {
return int64(len(p.heightCostTimes))
}

// Counter ...
type Counter struct {
taskCounter map[string][]string //taskID:pid
peerCounter map[string]*PeerTaskCounter //taskID-pid:PeerTaskCounter
rw sync.Mutex
}

// NewCounter ...
func NewCounter() *Counter {
return &Counter{
taskCounter: map[string][]string{},
peerCounter: map[string]*PeerTaskCounter{},
rw: sync.Mutex{},
}
}

// UpdateTaskInfo ...
func (c *Counter) UpdateTaskInfo(taskID, pid string, height, costTime int64) {
c.rw.Lock()
if counter, ok := c.peerCounter[peersCounterKey(taskID, pid)]; ok {
counter.Append(height, costTime)
}
c.rw.Unlock()
}

// AddTaskInfo ...
func (c *Counter) AddTaskInfo(taskID, pid string, latency time.Duration) {
c.rw.Lock()
if ps, ok := c.taskCounter[taskID]; ok {
c.taskCounter[taskID] = append(ps, pid)
} else {
c.taskCounter[taskID] = []string{pid}
}
key := peersCounterKey(taskID, pid)
if counter, ok := c.peerCounter[key]; ok {
counter.AppendLatency(latency)
} else {
counter = NewPeerTaskCounter(pid, taskID)
counter.AppendLatency(latency)
c.peerCounter[key] = counter
}
c.rw.Unlock()
}

// Release task by id
func (c *Counter) Release(tasksID string) {
c.rw.Lock()
defer c.rw.Unlock()
if pids, ok := c.taskCounter[tasksID]; ok {
for _, pid := range pids {
key := peersCounterKey(tasksID, pid)
if counter, ok := c.peerCounter[key]; ok {
log.Info("Release", "Counter ", counter.Pretty())
delete(c.peerCounter, key)
}
}
delete(c.taskCounter, tasksID)
}
}
17 changes: 12 additions & 5 deletions system/p2p/dht/protocol/download/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ const (
// Protocol ...
type Protocol struct {
*protocol.P2PEnv
counter *Counter
}

// InitProtocol initials protocol
func InitProtocol(env *protocol.P2PEnv) {
p := &Protocol{
P2PEnv: env,
P2PEnv: env,
counter: NewCounter(),
}
//注册p2p通信协议,用于处理节点之间请求
protocol.RegisterStreamHandler(p.Host, downloadBlockOld, p.handleStreamDownloadBlockOld)
Expand All @@ -46,7 +48,7 @@ func InitProtocol(env *protocol.P2PEnv) {
func (p *Protocol) downloadBlock(height int64, tasks tasks) error {

var retryCount uint
tasks.Sort() //对任务节点时延进行排序,优先选择时延低的节点进行下载
tasks.Sort() //TODO bug 对任务节点时延进行排序,优先选择时延低的节点进行下载
ReDownload:
select {
case <-p.Ctx.Done():
Expand All @@ -71,19 +73,23 @@ ReDownload:
goto ReDownload
}

var downloadStart = time.Now().UnixNano()
var downloadStart = time.Now()
//一个高度在一个pid上请求。
block, err := p.downloadBlockFromPeerOld(height, task.Pid)
if err != nil {
//发生EOF,剔除无用节点。
//EROR[06-16|17:09:26] handleEventDownloadBlock module=p2p.download SendRecvPeer="stream reset" pid=16Uiu2HAkzNiDx1mN6muuBRgPpDRaUG5NGs8HMHmp1HND968Y6Kho
log.Error("handleEventDownloadBlock", "SendRecvPeer", err, "pid", task.Pid)
p.releaseJob(task)
tasks = tasks.Remove(task)
goto ReDownload
}
remotePid := task.Pid.Pretty()
costTime := (time.Now().UnixNano() - downloadStart) / 1e6
costTime := time.Since(downloadStart)
p.counter.UpdateTaskInfo(task.ID, remotePid, height, costTime.Milliseconds())

log.Debug("download+++++", "from", remotePid, "height", block.GetHeight(),
"blockSize (bytes)", block.Size(), "costTime ms", costTime)
"blockSize (bytes)", block.Size(), "costTime ms", costTime.Milliseconds())

msg := p.QueueClient.NewMessage("blockchain", types.EventSyncBlock, &types.BlockPid{Pid: remotePid, Block: block}) //加入到输出通道)
_ = p.QueueClient.Send(msg, false)
Expand All @@ -92,6 +98,7 @@ ReDownload:
return nil
}

// TODO unused
func (p *Protocol) downloadBlockFromPeer(height int64, pid peer.ID) (*types.Block, error) {
ctx, cancel := context.WithTimeout(p.Ctx, time.Second*10)
defer cancel()
Expand Down
3 changes: 2 additions & 1 deletion system/p2p/dht/protocol/download/download_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ func initEnv(t *testing.T, q queue.Queue) *Protocol {
PeerInfoManager: &peerInfoManager{},
}
p2 := &Protocol{
P2PEnv: &env2,
P2PEnv: &env2,
counter: NewCounter(),
}

client1.Sub("p2p")
Expand Down
32 changes: 19 additions & 13 deletions system/p2p/dht/protocol/download/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package download
import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/33cn/chain33/queue"
Expand Down Expand Up @@ -100,6 +99,7 @@ func (p *Protocol) handleStreamDownloadBlockOld(stream network.Stream) {
}

func (p *Protocol) handleEventDownloadBlock(msg *queue.Message) {
var startTime = time.Now()
req := msg.GetData().(*types.ReqBlocks)
if req.GetStart() > req.GetEnd() {
log.Error("handleEventDownloadBlock", "download start", req.GetStart(), "download end", req.GetEnd())
Expand All @@ -123,18 +123,21 @@ func (p *Protocol) handleEventDownloadBlock(msg *queue.Message) {
log.Debug("handleEventDownloadBlock", "jobs", jobS)
var wg sync.WaitGroup
var mutex sync.Mutex
var maxGoroutine int32
//var maxGoroutine int32
var reDownload = make(map[string]interface{})
var startTime = time.Now().UnixNano()

for height := req.GetStart(); height <= req.GetEnd(); height++ {
wg.Add(1)
Wait:
if atomic.LoadInt32(&maxGoroutine) > 50 {
time.Sleep(time.Millisecond * 200)
goto Wait
}
atomic.AddInt32(&maxGoroutine, 1)

//Wait:
// //TODO bug 50这个值,参考标准是什么?
// if atomic.LoadInt32(&maxGoroutine) > 50 {
// time.Sleep(time.Millisecond * 200)
// goto Wait
// }
// atomic.AddInt32(&maxGoroutine, 1)

//一个高度对应一个任务
go func(blockheight int64, tasks tasks) {
err := p.downloadBlock(blockheight, tasks)
if err != nil {
Expand All @@ -161,7 +164,7 @@ func (p *Protocol) handleEventDownloadBlock(msg *queue.Message) {
}
}
wg.Done()
atomic.AddInt32(&maxGoroutine, -1)
//atomic.AddInt32(&maxGoroutine, -1)

}(height, jobS)

Expand All @@ -170,7 +173,10 @@ func (p *Protocol) handleEventDownloadBlock(msg *queue.Message) {
wg.Wait()
p.checkTask(taskID, pids, reDownload)
log.Debug("Download Job Complete!", "TaskID++++++++++++++", taskID,
"cost time", fmt.Sprintf("cost time:%d ms", (time.Now().UnixNano()-startTime)/1e6),
"from", pids)

"height diff", req.GetEnd()-req.GetStart()+1,
"cost time", fmt.Sprintf("cost time:%d ms", time.Since(startTime).Milliseconds()),
"from", pids,
"pids count", len(pids),
)
p.counter.Release(taskID)
}
28 changes: 21 additions & 7 deletions system/p2p/dht/protocol/download/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func (t tasks) Len() int {
return len(t)
}

//TODO bug
//Less Sort from low to high
func (t tasks) Less(a, b int) bool {
return t[a].Latency < t[b].Latency
Expand Down Expand Up @@ -55,7 +56,7 @@ func (t tasks) Size() int {
return len(t)
}

func (p *Protocol) initJob(pids []string, jobID string) tasks {
func (p *Protocol) initJob(pids []string, taskID string) tasks {
var JobPeerIds tasks
var pIDs []peer.ID
for _, pid := range pids {
Expand All @@ -77,13 +78,16 @@ func (p *Protocol) initJob(pids []string, jobID string) tasks {
}
var job taskInfo
job.Pid = pID
job.ID = jobID
job.ID = taskID
//时延,并不能代表吞吐量
job.Latency = p.Host.Peerstore().LatencyEWMA(pID)
log.Debug("initJob", "pid", pID, "jobID", taskID, "Latency", job.Latency)
if job.Latency == 0 { //如果查询不到节点对应的时延,就设置非常大
job.Latency = time.Second
}
job.TaskNum = 0
JobPeerIds = append(JobPeerIds, &job)
p.counter.AddTaskInfo(job.ID, job.Pid.Pretty(), job.Latency)
}
return JobPeerIds
}
Expand Down Expand Up @@ -114,12 +118,22 @@ func (p *Protocol) checkTask(taskID string, pids []string, faildJobs map[string]

func (p *Protocol) availbTask(ts tasks, blockheight int64) *taskInfo {

var limit int
if len(ts) > 10 {
limit = 20 //节点数大于10,每个节点限制最大下载任务数为20个
} else {
limit = 50 //节点数较少,每个节点节点最大下载任务数位50个
//TODO bug
//var limit int
//if len(ts) > 10 {
// limit = 20 //节点数大于10,每个节点限制最大下载任务数为20个
//} else {
// limit = 50 //节点数较少,每个节点节点最大下载任务数位50个
//}
limit := 128 / len(ts)
if limit < 20 {
limit = 20
}
if limit > 50 {
limit = 50
}
log.Debug("availbTask", " len(ts)", len(ts), "limit", limit)

for i, task := range ts {
//check blockHeight
peerHeight := p.PeerInfoManager.PeerHeight(task.Pid)
Expand Down
6 changes: 3 additions & 3 deletions util/cli/chain33.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,9 +254,9 @@ func createFile(filename string) (*os.File, error) {
func watching() {
var m runtime.MemStats
runtime.ReadMemStats(&m)
log.Info("info:", "NumGoroutine:", runtime.NumGoroutine())
log.Info("info:", "Mem:", m.Sys/(1024*1024))
log.Info("info:", "HeapAlloc:", m.HeapAlloc/(1024*1024))
log.Info("GC runtime info:", "NumGoroutine:", runtime.NumGoroutine())
log.Info("GC runtime info:", "Mem:", m.Sys/(1024*1024))
log.Info("GC runtime info:", "HeapAlloc:", m.HeapAlloc/(1024*1024))
}

func pwd() string {
Expand Down

0 comments on commit a43bb9c

Please sign in to comment.