Skip to main content

中控机挂掉以后

00:00/00:00
  • 单点服务都是异类分子, 都应该被烧死

背景

上周发生了两件小事, 都跟中控机有关, 一个是某新同学将我们的1号中控机重启了, 对于某些业务来说, 中控机上部署了较多的运维/管理/监控工具, 恢复起来比较麻烦, 另一个是我们的2号中控机因为机房搬迁要下线, 也耗费了很多同学不少力气做搬迁

我们知道, 在分布式的环境中, 任何依赖单点的服务都是不应该存在的, 为了避免类似的情况再导致不必要的人力消耗, 我们可以使用多节点部署加自动选主来应对这种情况

我们很多任务都是用 python 写的, 为了避免任务重复与单机故障, 可以这样写

from pyraft import pyraft
import time
import thread
import sys
class deploy:
    def __init__(self):
        self._stat = 0
    def start(self):
        if self._stat == 1:
            return
        else:
            self._stat = 1
    def stop(self):
        if self._stat == -1:
            return
        else:
            self._stat = -1
    def stat(self):
        return self._stat

if __name__ == "__main__":
    port = int(sys.argv[1])
    raft_obj = pyraft(port)
    thread.start_new_thread(raft_obj.start_server,())
    deploy_obj = deploy()
    while True:
        if raft_obj.role == raft_obj.leader:
            deploy_obj.start()
        else:
            deploy_obj.stop()
        time.sleep(1)
        print deploy_obj.stat()

启动之后, 通过发送 add 命令添加节点, 由多节点自动选主, 为避免一开始多主运行, 可以先启动, 再增加节点, 再初始化

实现pyraft 有很多种方法, 可以使用已有的类库, 可以借助已有的 raft 产品, 如 zookeeper, 也可以自己实现一个, 身为做存储的同学, 自己实现也是不错的练习机会

raft

raft 是一个保证多节点状态一致的协议, 一个实现完整的 raft 协议可以将集群由某一个状态一致性地转为另一个状态, 主要的原理是在一个给定的超时时间内, 通过先到先得与大多数选票来确定主节点, 同时引入了自增版本 id 来避免陈旧信息可能导致的不一致

我们利用 raft 协议规定的过程来实现一个主从选举, 实现过程中, 需要注意的几点有:

  1. 通讯协议: 可以使用 tcp 自己实现协议; 可以使用现有的任何基于 tcp 的内容协议, 如 http; 可以使用现有的 rpc 库. 引入 rpc 是一个好的设计, 但是在本次实现中, 为了更明晰地表述每个过程, 不再引入额外的库, 使用 http 协议进行交互
  2. 重新选主超时时间的选定: 超时时间应该远大于集群中节点的通讯时间, 我们不同机房的 ping 延时一般在2-500ms 之间, 取决于地理位置与网络情况, 加上丢包可能导致更长时间的无响应, 并为了区分网络暂时中断与网络长时间异常, 我们可以设定超时时间为15s
  3. 选举超时时间: 设定为15s
  4. 心跳间隔时间: 小于超时时间, 并大于3倍 ping延时即可, 我们选定为5s
  5. 随机睡眠时间的选定: 在选举时, 为了避免各自投票分散无人成为主节点的情况发生, 在首次投票失败后, 需要随机 sleep 一段时间再进行投票, 这个时间不宜过长, 否则会导致选主时间增长, 也不宜太短于节点通讯时间, 可能导致效果较差, 我们可以选择随机100-500ms 之间, 这样, 一般情况下选主所需要的时间应该在个位数秒级别
  6. 状态/配置/鉴权: 状态存内存, 配置通过 url 添加与删除, 无鉴权
  7. raft 协议里除了 termid 还有日志更新程度的区别, 我们这里因为没有接受用户请求的场景, 没有日志更新, 唯一能作为判据的只有 termid 与投票数
  8. 多 leader: 在发生网络分区时, 按照 raft 协议的实现, 是有可能出现多 leader 情况的, 两个分区的 leader 各自维护自己的follower, 小分区的 leader 在网络恢复后因为自身 termid 较低, 会降级为 follower, 但是只要网络一直不恢复, 就会一直保持多 leader 的状态, 我们不希望出现这种状态, 所以需要增加 leader 心跳包发送失败时的自动降级, 具体实现为: 心跳发送成功节点小于总节点数一半时, 自动降级; 在发生选举时, 按照自身 termid + n发送投票请求, 但是自身 termid 并不改变, 投票的节点也不会因为投票请求改变自身的 termid, 在选举成功后, 主节点更新自身的 termid, 并由心跳包广播, 这时接收心跳包的节点才会更新自身的 termid, 可以解决小分区的 leader 自身 termid 不停变高的问题

实现

import time
import thread
import random
import pycurl
import StringIO
import urlparse
import json
import urllib

from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler

class http_server(BaseHTTPRequestHandler):
    pyraft_obj = None
    no_termid_cmds = ["init", "_init", "debug", "add_node"]

    def do_GET(self):
        parsed_path = urlparse.urlparse(self.path)
        action = parsed_path.path.split("/")[1]
        query = parsed_path.query
        data = {}
        if query is not None and query != "":
            data = {i.split("=")[0]: urllib.unquote(i.split("=")[1]) for i in query.split("&")}
        if action in self.pyraft_obj.cmds:
            if ("termid" in data and int(data["termid"]) >= self.pyraft_obj.termid) or action in self.no_termid_cmds:
                res = self.pyraft_obj.cmds[action](data)

                self.send_response(200)
                self.send_header('Content-type','text/html')
                self.end_headers()
                self.wfile.write(res)
                print "action is %s data is %s res is %s" % (action, data, res)

class pyraft(BaseHTTPRequestHandler):
    def __init__(self, port):
        self.vote_timeout = 15
        self.heart_beat_interval = 5
        self.refuse_vote_timeout = 15
        self.random_sleep = [0.1, 0.5]
        self.port = port

        self.leader = 1
        self.follower = 2
        self.candidate = 3
        self.down = 4

        self.votes = 0
        self.last_voted_time = 0

        self.termid = 0
        self._inited = 0

        self.role = self.follower
        self.i = "0.0.0.0:%s" % port
        self.i_leader = None
        self.i_leader_last_heart_beat = 0

        self.members = ["0.0.0.0:%s" % port]

        self.cmds = {
            "heart_beat": self.heart_beat,
            "add_node": self.add_node,
            "vote": self.vote,
            "init": self.init,
            "_init": self._init,
            "debug": self.debug
        }
    def start_server(self):
        port = self.port
        http_server.pyraft_obj = self
        s = HTTPServer(("0.0.0.0", port), http_server)
        s.serve_forever()

    def __str__(self):
        s = "_inited:%s\r\nrole:%s\r\ntermid:%s\r\ni:%s\r\nleader:%s\r\nleader_last_heart_beat:%s\r\nmembers:%s\r\n" % (self._inited, self.role, self.termid, self.i, self.i_leader, time.time() - self.i_leader_last_heart_beat, json.dumps(self.members))
        return s
    def debug(self, data):
        return "%s" % self

    def run(self):
        time.sleep(random.uniform(self.random_sleep[0], self.random_sleep[1]))
        while True:
            while (self.i_leader is None or self.i_leader_last_heart_beat < time.time() - self.vote_timeout) and self.role == self.follower:
                self.role = self.candidate
                print "i %s start vote" % self.i
                self.start_vote()
            if self.role == self.leader:
                success = self.send_heart_beat()
                if success < len(self.members) / 2 + 1:
                    print "success is %s, down to follower" % success
                    self.role = self.follower
                time.sleep(random.uniform(self.random_sleep[0], self.random_sleep[1]))
        while True:
            time.sleep(self.heart_beat_interval)

    def heart_beat(self, data):
        termid = int(data["termid"])
        if self.termid > termid:
            return -1
        if self.termid == termid:
            self.i_leader = data["i"]
            self.i_leader_last_heart_beat = time.time()
            self.members = json.loads(data["members"])
            if self.role == self.candidate:
                print "vote done, i become follower now"
                self.role = self.follower

        if self.termid < termid:
            if self.role == self.leader or self.role == self.candidate:
                print "receive bigger termid %s:%s, down to follower" % (self.termid, termid)
                self.role = self.follower
            self.termid = termid
            members = json.loads(data["members"])
            self.members = members
            self.i_leader = data["i"]
            self.i_leader_last_heart_beat = time.time()
        return 1
    def add_node(self, data):
        node = data["node"]
        if self._inited == 0 or (self._inited == 1 and self.role == self.leader):
            if node not in self.members:
                self.members.append(node)
            return 1
        else:
            return -1

    def init(self, data):
        if self._inited == 0:
            for m in self.members:
                url = "http://%s/_init?members=%s" % (m, urllib.quote(json.dumps(self.members)))
                print json.dumps(self.members)
                print urllib.quote(json.dumps(self.members))
                print url
                thread.start_new_thread(self.send_request,(url,))
            return 1
        return -1
    def _init(self, data):
        if self._inited == 0:
            members = json.loads(data["members"])
            self.members = members
            self._inited = 1
            thread.start_new_thread(self.run, ())
            return 1
        else:
            return -1
    def send_vote(self, url):
        res = self.send_request(url)
        if res is None:
            return
        votes = int(res)
        self.votes += votes

    def send_request(self, url):
        b = StringIO.StringIO()
        c = pycurl.Curl()
        c.setopt(pycurl.URL, url)
        c.setopt(pycurl.HTTPHEADER, ["Accept:"])
        c.setopt(pycurl.WRITEFUNCTION, b.write)
        c.setopt(pycurl.FOLLOWLOCATION, 1)
        c.setopt(pycurl.MAXREDIRS, 5)
        c.setopt(pycurl.CONNECTTIMEOUT, 1)
        c.setopt(pycurl.TIMEOUT, 1)
        try:
            c.perform()
            return b.getvalue()
        except Exception as e:
            return None

    def start_vote(self):
        termid = self.termid
        while True:
            termid += 1
            start_time = time.time()
            self.votes = 0

            for m in self.members:
                url = "http://%s/vote?members=%s&termid=%s" % (m, urllib.quote(self.i), termid)
                print url
                thread.start_new_thread(self.send_vote, (url,))
            while True:
                if time.time() - start_time < self.vote_timeout:
                    print "self.votes is %s" % self.votes
                    if self.role != self.candidate:
                        return False
                    if self.votes >= len(self.members) / 2 + 1:
                        self.termid = termid
                        self.role = self.leader
                        self.send_heart_beat()
                        self.i_leader = self.i
                        self.i_leader_last_heart_beat = time.time()
                        print "i %s was leader now" % self.i
                        print self.role
                        return True
                else:
                    break
                time.sleep(0.1)

    def send_heart_beat(self):
        success = 0
        for m in self.members:
            url = "http://%s/heart_beat?i=%s&members=%s&termid=%s" % (m, urllib.quote(self.i), urllib.quote(json.dumps(self.members)), self.termid)
            res = self.send_request(url)
            if res is not None:
                success += 1
        return success

    def vote(self, data):
        termid = int(data["termid"])
        if time.time() - self.last_voted_time < self.refuse_vote_timeout:
            return 0
        if self.termid > termid:
            return 0
        if time.time() - self.i_leader_last_heart_beat < self.vote_timeout * 0.8:
            return 0

        self.last_voted_time = time.time()
        return 1

小结

实现过程与标准 raft 不同, 修改点有两个:

  1. 节点属性的termid 在选举成功之后才增加, 选举过程中发送的请求中 termid 增加, leader节点在选举成功后自己增加termid, follower节点依靠心跳信息改变 termid
  2. leader 在联系不上半数以上节点时自动降级为 follower

http 协议处理的部分不太好看, 本来是希望 pyraft 继承BaseHTTPRequestHandler类并自己处理 do_GET 方法的, 但是父类的__init__不知道怎么执行, 单分出一个方法处理了

代码不好看, 下周再写好看点

打赏
微信扫一扫支付
微信logo微信扫一扫, 打赏作者吧~