文章目录
  1. 1. 文档更新说明
  2. 2. 前言
  3. 3. 起因
  4. 4. 解决思路
  5. 5. 代码怎么写
  6. 6. 结语

文档更新说明

  • 最后更新 2018年12月26日
  • 首次更新 2018年12月26日

前言

  最近对接了EOS公链的充提币功能, 有感而发.
  本文提出了一个优秀的解决方案, 用来解决出块速度较快的公链中区块检索同步无法跟上的问题. 当然也适合出块慢的比如BTC或者Ethereum, 在服务出问题之后追赶区块也有很大的帮助.   

起因

  不管是哪一种公链, 检索区块数据的时候都是按照区块编号从低到高检索, 所以我们经常会遇到几种情况.
  第1是服务因某种原因停止运行, 比如更新代码, 遇到bug, 节点同步落后等;
  第2是因为公链出块速度太快导致单线程追赶不上, 例如EOS 1秒出块2个, 但是API请求可能延迟就有1秒了. 这样一来我们的服务所检查到的区块就会落后链的最新高度, 从而导致各种用户体验问题, 比如无法提币, 用户充值到帐速度变慢, 可能用户充值要等上好几个小时甚至好几天才能到帐, 体验很差.

解决思路

  为了解决这个问题, 我们很容易联想到并发. 是的, 本文的思路也是从并发入手, 那么我们怎么样用并发来加速区块检索呢? 并发代码比较难写, 因为容易出问题和不好调试的原因, 所以要设计合理的并发算法来减少这类问题的发生,简化代码.
  这里我们可以采用并发同步算法来解决这个问题. 算法步骤如下:

  1. 根据check表已检索的高度H和区块最新高度, 确认并发数量N.
  2. 创建线程同步信号量.
  3. 当前线程M创建N个子线程, 并为每一个新线程分配一个顺序的不重复的块号, 交由子线程处理检索操作. 同时M进入等待状态.
  4. 子线程处理完成, 根据处理状态, 若失败则将失败的块号插入badcheck表 ,最后向同步信号量发起一个完成的信号.
  5. 当M收到N个完成的信号之后, 结束等待, 同时向数据库写入已检索记录, 高度为H+N. 回到步骤1继续循环处理.
  6. 用新的线程, 定时处理第4步中产生的badcheck记录, 保证区块完整检索不遗漏.

  check表, 是一个用来记录已检索的高度的表; badcheck表, 是一个用来记录检索失败的块的表, 因为检索块是需要从区块链节点拉取数据的, 所以随时有可能因为网络问题导致拉取失败, 这里需要对失败的记录做重试处理.至于怎么处理这个有很多方法, 比如在badcheck表中为每一条记录设计一个status字段, 0表示没重试的, 1表示重试成功了.

代码怎么写

  这里以Golang语言为例, 部分代码用伪代码表示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

// 区块检查定时器
func checkBlockTimer() {
for {
lastBlock, err := 获取最新不可逆的块() //以太坊是最新高度减12, EOS可以直接从接口得到不可逆块号

var nextCheckBlock int64
nextCheckBlock, err = 获取check表的高度()
if err == ErrNoRows {
nextCheckBlock = lastBlock
} else {
nextCheckBlock++
}

if nextCheckBlock > lastBlock {
time.Sleep(time.Second * 2)
continue
}

// 根据配置的协程数量, 并发检查区块
chanCount := 获取需要开启的协程数()
// endBlock 为本轮循环的结束块号的后一个块
endBlock := nextCheckBlock + int64(chanCount)
// 动态确认协程数, 不要获取超过最新高度
if endBlock > lastBlock {
endBlock = lastBlock
}
//创建线程同步组(同步信号量)
wg := sync.WaitGroup{}
wg.Add(int(endBlock - nextCheckBlock))
//在M线程里, 分别开启N个协程, 这里N = endBlock - nextCheckBlock
for i := nextCheckBlock; i < endBlock; i++ {
go checkBlockInBlock(i, &wg, false)
}
wg.Wait()
已检索最高块号插入check表(endBlock - 1)
}

}
// 坏块重试定时器
func recheckBlockTimer() {
for {
badCheckList, err := 获取badcheck需要重试的记录数组()
if err == orm.ErrNoRows {
time.Sleep(time.Second * 3)
continue
}

wg := sync.WaitGroup{}
wg.Add(int(num))
for _, v := range badCheckList {
checkBlockInBlock(v.BlockHeight, &wg, true)
}
wg.Wait()
}
}

// 检查失败的块, 将自动保存, 等待重试
// recheck 参数用于标识是否为重查块, 是的话, 需要更新badcheck表状态
func checkBlockInBlock(bnum int64, wg *sync.WaitGroup, recheck bool) {
defer wg.Done()

data, err := 获取区块数据(bnum)
if err != nil {
//区块数据获取失败, 将bnum插入badcheck表
if !recheck {
插入badcheck表(bnum)
}
return
}
//区块数据获取成功, 根据业务处理相关逻辑

// Code here ...
// 处理结束

if recheck {
更新badcheck表状态为1(bnum)
}
}

结语

  上面代码已经在实际项目中使用, 效果挺好的. 不止可以应对公链出块速度太快导致的区块检索落后的问题, 还可以应对程序停止服务之后恢复的检索追赶问题.

文章目录
  1. 1. 文档更新说明
  2. 2. 前言
  3. 3. 起因
  4. 4. 解决思路
  5. 5. 代码怎么写
  6. 6. 结语