文档更新说明
- 最后更新 2018年12月26日
- 首次更新 2018年12月26日
前言
最近对接了EOS公链的充提币功能, 有感而发.
本文提出了一个优秀的解决方案, 用来解决出块速度较快的公链中区块检索同步无法跟上的问题. 当然也适合出块慢的比如BTC或者Ethereum, 在服务出问题之后追赶区块也有很大的帮助.
起因
不管是哪一种公链, 检索区块数据的时候都是按照区块编号从低到高检索, 所以我们经常会遇到几种情况.
第1是服务因某种原因停止运行, 比如更新代码, 遇到bug, 节点同步落后等;
第2是因为公链出块速度太快导致单线程追赶不上, 例如EOS 1秒出块2个, 但是API请求可能延迟就有1秒了. 这样一来我们的服务所检查到的区块就会落后链的最新高度, 从而导致各种用户体验问题, 比如无法提币, 用户充值到帐速度变慢, 可能用户充值要等上好几个小时甚至好几天才能到帐, 体验很差.
解决思路
为了解决这个问题, 我们很容易联想到并发. 是的, 本文的思路也是从并发入手, 那么我们怎么样用并发来加速区块检索呢? 并发代码比较难写, 因为容易出问题和不好调试的原因, 所以要设计合理的并发算法来减少这类问题的发生,简化代码.
这里我们可以采用并发同步算法来解决这个问题. 算法步骤如下:
- 根据check表已检索的高度H和区块最新高度, 确认并发数量N.
- 创建线程同步信号量.
- 当前线程M创建N个子线程, 并为每一个新线程分配一个顺序的不重复的块号, 交由子线程处理检索操作. 同时M进入等待状态.
- 子线程处理完成, 根据处理状态, 若失败则将失败的块号插入badcheck表 ,最后向同步信号量发起一个完成的信号.
- 当M收到N个完成的信号之后, 结束等待, 同时向数据库写入已检索记录, 高度为H+N. 回到步骤1继续循环处理.
- 用新的线程, 定时处理第4步中产生的badcheck记录, 保证区块完整检索不遗漏.
check表, 是一个用来记录已检索的高度的表; badcheck表, 是一个用来记录检索失败的块的表, 因为检索块是需要从区块链节点拉取数据的, 所以随时有可能因为网络问题导致拉取失败, 这里需要对失败的记录做重试处理.至于怎么处理这个有很多方法, 比如在badcheck表中为每一条记录设计一个status字段, 0表示没重试的, 1表示重试成功了.
代码怎么写
这里以Golang语言为例, 部分代码用伪代码表示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
|
func checkBlockTimer() { for { lastBlock, err := 获取最新不可逆的块()
var nextCheckBlock int64 nextCheckBlock, err = 获取check表的高度() if err == ErrNoRows { nextCheckBlock = lastBlock } else { nextCheckBlock++ }
if nextCheckBlock > lastBlock { time.Sleep(time.Second * 2) continue }
chanCount := 获取需要开启的协程数() endBlock := nextCheckBlock + int64(chanCount) if endBlock > lastBlock { endBlock = lastBlock } wg := sync.WaitGroup{} wg.Add(int(endBlock - nextCheckBlock)) for i := nextCheckBlock; i < endBlock; i++ { go checkBlockInBlock(i, &wg, false) } wg.Wait() 已检索最高块号插入check表(endBlock - 1) }
}
func recheckBlockTimer() { for { badCheckList, err := 获取badcheck需要重试的记录数组() if err == orm.ErrNoRows { time.Sleep(time.Second * 3) continue }
wg := sync.WaitGroup{} wg.Add(int(num)) for _, v := range badCheckList { checkBlockInBlock(v.BlockHeight, &wg, true) } wg.Wait() } }
func checkBlockInBlock(bnum int64, wg *sync.WaitGroup, recheck bool) { defer wg.Done()
data, err := 获取区块数据(bnum) if err != nil { if !recheck { 插入badcheck表(bnum) } return }
if recheck { 更新badcheck表状态为1(bnum) } }
|
结语
上面代码已经在实际项目中使用, 效果挺好的. 不止可以应对公链出块速度太快导致的区块检索落后的问题, 还可以应对程序停止服务之后恢复的检索追赶问题.