22

使用 Raft 实现 VIP 功能 | Yiran's Blog

 4 years ago
source link: https://zdyxry.github.io/2020/01/17/%E4%BD%BF%E7%94%A8-Raft-%E5%AE%9E%E7%8E%B0-VIP-%E5%8A%9F%E8%83%BD/?
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在部署应用时想要应用是高可用,通常会在应用前放置一个 HAProxy,当任何一个 Server 故障,HAProxy 会自动切换,但是 HAProxy 也存在单点故障,因此需要多个 HAProxy 来保证业务不中断,这时候我们需要另一个软件配合:Keepalived。通常我用 Keepalived 仅用来提供 VIP,保证当一个 Keepalived 故障,VIP 自动在其他 Keepalived 节点配置。

Keepalived 有一个问题是 virtual route ID 必须是同一网段内唯一的,当我们想要在一个网段内部署多个集群时,就需要人为的介入去分配 virtual route ID,不方便。这次来使用 Raft 自己实现 VIP 逻辑。

hashicorp/raft

Raft 有很多开源实现,其中 Hashicorp 实现的 Raft 库 已经被 Consul 等软件使用,且接口友善,选择使用它来实现。在 Github 上有很多 Raft 的使用示例,比较简单且完整的是 otoolep/hraftd,我们来看看他是怎么使用的。

otoolep/hraftd

main.go

在 main.go 中主要做了 4 件事情:store.New, store.Open, http.New, http.Start,先来看看程序是如何启动的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func init() {
// 设置命令行参数
flag.BoolVar(&inmem, "inmem", false, "Use in-memory storage for Raft")
...
flag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage: %s [options] <raft-data-path> \n", os.Args[0])
flag.PrintDefaults()
}
}

func main() {
// 解析命令行参数
flag.Parse()
...
// 创建一个 Store 对象
s := store.New(inmem)
s.RaftDir = raftDir
s.RaftBind = raftAddr
// 运行 Store
if err := s.Open(joinAddr == "", nodeID); err != nil {
log.Fatalf("failed to open store: %s", err.Error())
}
// 新建一个 http 对象并运行
h := httpd.New(httpAddr, s)
if err := h.Start(); err != nil {
log.Fatalf("failed to start HTTP service: %s", err.Error())
}

// 如果 joinAddr 参数不为空,则处理 join 请求
if joinAddr != "" {
if err := join(joinAddr, raftAddr, nodeID); err != nil {
log.Fatalf("failed to join node at %s: %s", joinAddr, err.Error())
}
}

log.Println("hraftd started successfully")
// 监听系统信号,若接收到 os.Interrupt 则程序退出
terminate := make(chan os.Signal, 1)
signal.Notify(terminate, os.Interrupt)
<-terminate
log.Println("hraftd exiting")
}

看了 main.go 我们知道调用了 http.Start, 先不管 Store 是什么,先来看看 http 相关实现:

1
2
3
4
5
6
7
8
// Start starts the service.
func (s *Service) Start() error {
// Service 实现了 ServeHTTP 方法
http.Handle("/", s)
go func() {
err := server.Serve(s.ln)
...
}
1
2
3
4
5
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if strings.HasPrefix(r.URL.Path, "/key") {
s.handleKeyRequest(w, r)
// 先忽略其他分支
}

主要处理请求的是 s.handleKeyRequest 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func (s *Service) handleKeyRequest(w http.ResponseWriter, r *http.Request) {
...
switch r.Method {
case "GET":
k := getKey()
if k == "" {
w.WriteHeader(http.StatusBadRequest)
}
v, err := s.store.Get(k)
...
io.WriteString(w, string(b))
case "POST":
// Read the value from the POST body.
m := map[string]string{}
if err := json.NewDecoder(r.Body).Decode(&m); err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
for k, v := range m {
if err := s.store.Set(k, v); err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
}
...

s.handleKeyRequest 中根据请求方法,去调用 store 对应的方法,那么 store 实现了哪些接口呢?这也是在 http 模块中定义的:

1
2
3
4
5
6
type Store interface {
Get(key string) (string, error)
Set(key, value string) error
Delete(key string) error
Join(nodeID string, addr string) error
}

除了 Join 看着比较奇怪,其他的都是一个 K/V 系统该有的接口,接下来就看看 Store 具体方法的实现。

store

这个模块的编写涉及到了 Raft 中的具体概念,建议先阅读 siddontang 写的 Raft 相关博客快速了解(链接在参考链接列出)。

以下以设置 Key 为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func (s *Store) Set(key, value string) error {
if s.raft.State() != raft.Leader {
return fmt.Errorf("not leader")
}

// 封装具体执行的动作
c := &command{
Op: "set",
Key: key,
Value: value,
}
b, err := json.Marshal(c)
...
// 将 command 应用于 FSM
f := s.raft.Apply(b, raftTimeout)
return f.Error()
}

查看 FSM Apply 方法实现:

1
2
3
4
5
6
7
8
9
10
// Apply applies a Raft log entry to the key-value store.
func (f *fsm) Apply(l *raft.Log) interface{} {
var c command
// 根据操作动作的不同,执行不同的方法,这里以设置 Key 为例
switch c.Op {
case "set":
return f.applySet(c.Key, c.Value)
...
}
}



1
2
3
4
5
6
7
8
func (f *fsm) applySet(key, value string) interface{} {
// 互斥锁
f.mu.Lock()
defer f.mu.Unlock()
// 设置 Map 中的 Key/Value
f.m[key] = value
return nil
}

看了上面的具体动作实现,接下来看看 Raft 具体启动:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
func (s *Store) Open(enableSingle bool, localID string) error {
// 设置 Raft 配置
config := raft.DefaultConfig()
config.LocalID = raft.ServerID(localID)
// 设置 Raft 通信
addr, err := net.ResolveTCPAddr("tcp", s.RaftBind)
...
transport, err := raft.NewTCPTransport(s.RaftBind, addr, 3, 10*time.Second, os.Stderr)
// 设置 Raft 存储对象
snapshots, err := raft.NewFileSnapshotStore(s.RaftDir, retainSnapshotCount, os.Stderr)
var logStore raft.LogStore
var stableStore raft.StableStore
if s.inmem {
logStore = raft.NewInmemStore()
stableStore = raft.NewInmemStore()
} else {
boltDB, err := raftboltdb.NewBoltStore(filepath.Join(s.RaftDir, "raft.db"))
if err != nil {
return fmt.Errorf("new bolt store: %s", err)
}
logStore = boltDB
stableStore = boltDB
}
// 创建 raft 示例,并使用该 raft 实例启动集群
ra, err := raft.NewRaft(config, (*fsm)(s), logStore, stableStore, snapshots, transport)
if err != nil {
return fmt.Errorf("new raft: %s", err)
}
s.raft = ra
if enableSingle {
...
ra.BootstrapCluster(configuration)
}
}

这里可以看到 Raft 所需接口为 FSM 和 Snapshot,具体的实现方式根据需求来实现,一般与 hraftd 相仿,大概了解了 hashicorp/raft 的使用,那么我们来实现具体的 VIP 功能。

network

既然是跟 IP 相关,那么肯定需要给对应网卡配置时间,在 Linux 中我们可以通过 ip 命令来设置,Golang 中使用 vishvananda/netlink 来实现。

netlink.AddrAdd 可以在指定的网络设备上添加 IP 地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (nc *NetworkConfig) addIP() error {
res, err := nc.IsSet()
if err != nil {
return errors.Wrap(err, "ip check in AddIP failed")

}
if res {
return nil
}
if err := netlink.AddrAdd(nc.link, nc.address); err != nil {
return errors.Wrap(err, "could not add ip")
}
return nil
}

netlink.AddrDel 可以将 IP 从指定网络设备上删除:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func (nc *NetworkConfig) delIP() error {
res, err := nc.IsSet()
if err != nil {
return errors.Wrap(err, "ip check in DelIP failed")
}

if !res {
return nil
}

if err := netlink.AddrDel(nc.link, nc.address); err != nil {
return errors.Wrap(err, "could not delete ip")
}

return nil
}

实现了 IP 设置相关,我们不需要提供 HTTP 接口,直接编写 Raft 相关实现,跟 hraftd 实现不同,在 hraftd 中需要进行信息写入读取,而我们的 VIP 仅依赖于 Raft 选举 Leader,所以只需要编写好对应的方法,不需要做额外操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type FSM struct {
}

func (fsm FSM) Apply(log *raft.Log) interface{} {
return nil
}

func (fsm FSM) Restore(snap io.ReadCloser) error {
return nil
}

func (fsm FSM) Snapshot() (raft.FSMSnapshot, error) {
return Snapshot{}, nil
}

type Snapshot struct {
}

func (snapshot Snapshot) Persist(sink raft.SnapshotSink) error {
return nil
}

func (snapshot Snapshot) Release() {
}

serve

基础方法都已经实现了,那么接下来编写集群启动逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
func (manager *VIPManager) Start() error {
// 初始化 raft 配置、存储对象、通信
for id, ip := range manager.peers {
configuration.Servers = append(configuration.Servers, raft.Server{ID: raft.ServerID(id), Address: raft.ServerAddress(ip)})
}

// 启动 Raft 集群,这里与 hraftd 不同,需要注意
if error := raft.BootstrapCluster(config, logStore, stableStore, snapshots, transport, configuration); error != nil {
return error
}

// 创建 raft 实例
raftServer, error := raft.NewRaft(config, manager.fsm, logStore, stableStore, snapshots, transport)
...
ticker := time.NewTicker(time.Second)
isLeader := false
// 服务启动时先删除 VIP,防止集群中同时存在节点都配置了 VIP
manager.deleteIP(false)
go func() {
for {
select {
// 如果 当前节点是 Leader 节点,则设置 VIP
case leader := <-raftServer.LeaderCh():
if leader {
isLeader = true
log.Info("Leading")
manager.addIP(true)

}
// 定时检测,如果是 Leader,则检测 VIP 是否正确设置,如果没有就再次配置 VIP
case <-ticker.C:
if isLeader {
result, error := manager.networkConfigurator.IsSet()
if error != nil {
log.WithFields(log.Fields{"error": error, "ip": manager.networkConfigurator.IP(), "interface": manager.networkConfigurator.Interface()}).Error("Could not check ip")
}

if result == false {
log.Error("Lost IP")

manager.addIP(true)
}
}
...
}
}
}()
}

这里与 hraftd 的实现不同,hraftd 先实例 raft,然后使用该 raft 实例启动集群,这样做的好处是哪怕集群只有一个节点,就是第一个节点,那么集群也是可以正常工作的,坏处是集群启动顺序是固定的,必须要先启动第一个节点,然后其他节点通过 Join 请求添加到 Raft 集群中(我们忽略了 Join 的走读)。

重新想一下我们的需求:集群、高可用、故障。当这几个词放在一起,我们就知道 hraftd 的方法不适合我们,有以下原因:

  1. 集群节点启动顺序要求
  2. 各个节点配置文件不同,有的需要 Join 参数

所以我们是直接使用 raft.BootstrapCluster 来启动集群,虽然只有一个节点集群无法正常工作,但是这个是可以容忍的。

在实现完上述功能后,我以为大功告成了,就开始自测,但是测试过程中发现了很诡异的现象,虽然我们通过 Raft 自身选举实现了 VIP 的故障自动漂移,但是实际测试中发现业务访问随着 VIP 的重建并没有立即恢复,,检查 ARP 记录发现集群中各个节点关于 VIP 的 ARP 记录各不相同,甚至是完全不同。

我们来重温下 ARP 协议内容:

地址解析协议(英語:Address Resolution Protocol,缩写:ARP)是一个通过解析网络层地址来找寻数据链路层地址的网络传输协议,它在IPv4中极其重要。ARP最初在1982年的RFC 826(征求意见稿)[1]中提出并纳入互联网标准 STD 37. ARP 也可能指是在多数操作系统中管理其相关地址的一个进程。

在以太网协议中规定,同一局域网中的一台主机要和另一台主机进行直接通信,必须要知道目标主机的MAC地址。而在TCP/IP协议中,网络层和传输层只关心目标主机的IP地址。这就导致在以太网中使用IP协议时,数据链路层的以太网协议接到上层IP协议提供的数据中,只包含目的主机的IP地址。于是需要一种方法,根据目的主机的IP地址,获得其MAC地址。这就是ARP协议要做的事情。所谓地址解析(address resolution)就是主机在发送帧前将目标IP地址转换成目标MAC地址的过程。

如果 VIP 与真实节点对应的 MAC 地址不同,就相当于 ARP 攻击了,所以我们的 Leader 节点设置完 VIP 后,还需要发送 ARP 请求广播,告诉广播域中的其他节点 VIP 正确的 MAC 地址。采用的方式是 gratuitous ARP(免费 ARP)。这里我们直接找一个开源的 ARP 实现来完成这个需求:

google/seesaw 是一个负载均衡器,里面实现了这个功能:

1
2
3
4
5
6
7
8
9
10
11
12
13
// ARPSendGratuitous sends a gratuitous ARP message via the specified interface.
func (ncc *SeesawNCC) ARPSendGratuitous(arp *ncctypes.ARPGratuitous, out *int) error {
iface, err := net.InterfaceByName(arp.IfaceName)
if err != nil {
return fmt.Errorf("failed to get interface %q: %v", arp.IfaceName, err)
}
log.V(2).Infof("Sending gratuitous ARP for %s (%s) via %s", arp.IP, iface.HardwareAddr, iface.Name)
m, err := gratuitousARPReply(arp.IP, iface.HardwareAddr)
if err != nil {
return err
}
return sendARP(iface, m)
}

使用 hashicorp/raft 可以很简单方便的实现需要选举 Leader 的分布式应用,虽然我司的 Slogan 是 Make IT Simple ,但是愈发感觉 Hashicorp 才是这句话的忠实体现,他们的 Terraform、Vault、Consul、Nomad、Vagrant 等软件,都是让基础设施的适用与管理更简单方便的,还是很爽的。

本文的具体实现在 sparrow 可以看到。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK