V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
codehole
V2EX  ›  程序员

手把手教你使用 Go 基于 zookeeper 编写服务发现「原创」

  •  
  •   codehole ·
    pyloque · 2018-03-23 10:03:15 +08:00 · 4146 次点击
    这是一个创建于 2471 天前的主题,其中的信息可能已经有所发展或是发生改变。

    zookeeper 是一个强一致 [不严格] 的分布式数据库,由多个节点共同组成一个分布式集群,挂掉任意一个节点,数据库仍然可以正常工作,客户端无感知故障切换。客户端向任意一个节点写入数据,其它节点可以立即看到最新的数据。

    zookeeper 的内部是一个 key/value 存储引擎,key 是以树状的形式构成了一个多级的层次结构,每一个节点既可以存储数据,又可以作为一个目录存放下一级子节点。

    zookeeper 提供了创建/修改/删除节点的 api,如果父节点没有创建,字节点会创建失败。如果父节点还有子节点,父节点不可以被删除。

    zookeeper 和客户端之间以 socket 形式进行双向通讯,客户端可以主动调用服务器提供的 api,服务器可以主动向客户端推送事件。有多种事件可以 watch,比如节点的增删改,子节点的增删改,会话状态变更等。

    zookeeper 的事件有传递机制,字节点的增删改触发的事件会向上层依次传播,所有的父节点都可以收到字节点的数据变更事件,所以层次太深/子节点太多会给服务器的事件系统带来压力,节点分配要做好周密的规划。

    zookeeper 满足了 CAP 定理的分区容忍性 P 和强一致性 C,牺牲了高性能 A [可用性蕴含性能] 。zookeeper 的存储能力是有限的,当节点层次太深/子节点太多/节点数据太大,都会影响数据库的稳定性。所以 zookeeper 不是一个用来做高并发高性能的数据库,zookeeper 一般只用来存储配置信息。

    zookeeper 的读性能随着节点数量的提升能不断增加,但是写性能会随着节点数量的增加而降低,所以节点的数量不宜太多,一般配置成 3 个或者 5 个就可以了。

    图中可以看出当服务器节点增多时,复杂度会随之提升。因为每个节点和其它节点之间要进行 p2p 的连接。3 个节点可以容忍挂掉 1 个节点,5 个节点可以容忍挂掉 2 个节点。

    客户端连接 zookeeper 时会选择任意一个节点保持长链接,后续通信都是通过这个节点进行读写的。如果该节点挂了,客户端会尝试去连接其它节点。

    服务器会为每个客户端连接维持一个会话对象,会话的 ID 会保存在客户端。会话对象也是分布式的,意味着当一个节点挂掉了,客户端使用原有的会话 ID 去连接其它节点,服务器维持的会话对象还继续存在,并不需要重新创建一个新的会话。

    如果客户端主动发送会话关闭消息,服务器的会话对象会立即删除。如果客户端不小心奔溃了,没有发送关闭消息,服务器的会话对象还会继续存在一段时间。这个时间是会话的过期时间,在创建会话的时候客户端会提供这个参数,一般是 10 到 30 秒。

    也许你会问连接断开了,服务器是可以感知到的,为什么需要客户端主动发送关闭消息呢?

    因为服务器要考虑网络抖动的情况,连接可能只是临时断开了。为了避免这种情况下反复创建和销毁复杂的会话对象以及创建会话后要进行的一系列事件初始化操作,服务器会尽量延长会话的生存时间。

    zookeeper 的节点可以是持久化(Persistent)的,也可以是临时(Ephermeral)的。所谓临时的节点就是会话关闭后,会话期间创建的所有临时节点会立即消失。一般用于服务发现系统,将服务进程的生命期和 zookeeper 子节点的生命期绑定在一起,起到了实时监控服务进程的存活的效果。

    zookeeper 还提供了顺序节点。类似于 mysql 里面的 auto_increment 属性。服务器会在顺序节点名称后自动增加自增的唯一后缀,保持节点名称的唯一性和顺序性。

    还有一种节点叫着保护(Protected)节点。这个节点非常特殊,但是也非常常用。在应用服务发现的场合时,客户端创建了一个临时节点后,服务器节点挂了,连接断开了,然后客户端去重连到其它的节点。因为会话没有关闭,之前创建的临时节点还存在,但是这个时候客户端却无法识别去这个临时节点是不是自己创建的,因为节点内部并不存储会话 ID 字段。所以客户端会在节点名称上加上一个 GUID 前缀,这个前缀会保存在客户端,这样它就可以在重连后识别出哪个临时节点是自己之前创建的了。

    接下来我们使用 Go 语言实现一下服务发现的注册和发现功能。

    如图所示,我们要提供 api.user 这样的服务,这个服务有 3 个节点,每个节点有不一样的服务地址,这 3 个节点各自将自己的服务注册进 zk,然后消费者进行读取 zk 得到 api.user 的服务地址,任选一个节点地址进行服务调用。为了简单化,这里就没有提供权重参数了。在一个正式的服务发现里一般都有权重参数,用于调整服务节点之间的流量分配。

    go get github.com/samuel/go-zookeeper/zk
    

    首先我们定义一个 ServiceNode 结构,这个结构数据会存储在节点的 data 中,表示服务发现的地址信息。

    type ServiceNode struct {
    	Name string `json:"name"` // 服务名称,这里是 user
    	Host string `json:"host"`
    	Port int    `json:"port"`
    }
    在定义一个服务发现的客户端结构体 SdClient。
    
    type SdClient struct {
    	zkServers []string // 多个节点地址
    	zkRoot    string // 服务根节点,这里是 /api
    	conn      *zk.Conn // zk 的客户端连接
    }
    编写构造器,创建根节点
    
    
    func NewClient(zkServers []string, zkRoot string, timeout int) (*SdClient, error) {
    	client := new(SdClient)
    	client.zkServers = zkServers
    	client.zkRoot = zkRoot
    	// 连接服务器
    	conn, _, err := zk.Connect(zkServers, time.Duration(timeout)*time.Second)
    	if err != nil {
    		return nil, err
    	}
    	client.conn = conn
    	// 创建服务根节点
    	if err := client.ensureRoot(); err != nil {
    		client.Close()
    		return nil, err
    	}
    	return client, nil}// 关闭连接,释放临时节点 func (s *SdClient) Close() {
    	s.conn.Close()
    }
    
    func (s *SdClient) ensureRoot() error {
    	exists, _, err := s.conn.Exists(s.zkRoot)
    	if err != nil {
    		return err
    	}
    	if !exists {
    		_, err := s.conn.Create(s.zkRoot, []byte(""), 0, zk.WorldACL(zk.PermAll))
    		if err != nil && err != zk.ErrNodeExists {
    			return err
    		}
    	}
    	return nil
    }
    值得注意的是代码中的 Create 调用可能会返回节点已存在错误,这是正常现象,因为会存在多进程同时创建节点的可能。如果创建根节点出错,还需要及时关闭连接。我们不关心节点的权限控制,所以使用 zk.WorldACL(zk.PermAll)表示该节点没有权限限制。Create 参数中的 flag=0 表示这是一个持久化的普通节点。
    
    接下来我们编写服务注册方法
    
    func (s *SdClient) Register(node *ServiceNode) error {
    	if err := s.ensureName(node.Name); err != nil {
    		return err
    	}
    	path := s.zkRoot + "/" + node.Name + "/n"
    	data, err := json.Marshal(node)
    	if err != nil {
    		return err
    	}
    	_, err = s.conn.CreateProtectedEphemeralSequential(path, data, zk.WorldACL(zk.PermAll))
    	if err != nil {
    		return err
    	}
    	return nil}func (s *SdClient) ensureName(name string) error {
    	path := s.zkRoot + "/" + name
    	exists, _, err := s.conn.Exists(path)
    	if err != nil {
    		return err
    	}
    	if !exists {
    		_, err := s.conn.Create(path, []byte(""), 0, zk.WorldACL(zk.PermAll))
    		if err != nil && err != zk.ErrNodeExists {
    			return err
    		}
    	}
    	return nil
    }
    先要创建 /api/user 节点作为服务列表的父节点。然后创建一个保护顺序临时(ProtectedEphemeralSequential)子节点,同时将地址信息存储在节点中。什么叫保护顺序临时节点,首先它是一个临时节点,会话关闭后节点自动消失。其它它是个顺序节点,zookeeper 自动在名称后面增加自增后缀,确保节点名称的唯一性。同时还是个保护性节点,节点前缀增加了 GUID 字段,确保断开重连后临时节点可以和客户端状态对接上。
    
    接下来我们实现消费者获取服务列表方法
    
    func (s *SdClient) GetNodes(name string) ([]*ServiceNode, error) {
    	path := s.zkRoot + "/" + name
    	// 获取字节点名称
    	childs, _, err := s.conn.Children(path)
    	if err != nil {
    		if err == zk.ErrNoNode {
    			return []*ServiceNode{}, nil
    		}
    		return nil, err
    	}
    	nodes := []*ServiceNode{}
    	for _, child := range childs {
    		fullPath := path + "/" + child
    		data, _, err := s.conn.Get(fullPath)
    		if err != nil {
    			if err == zk.ErrNoNode {
    				continue
    			}
    			return nil, err
    		}
    		node := new(ServiceNode)
    		err = json.Unmarshal(data, node)
    		if err != nil {
    			return nil, err
    		}
    		nodes = append(nodes, node)
    	}
    	return nodes, nil
    }
    

    获取服务节点列表时,我们先获取字节点的名称列表,然后依次读取内容拿到服务地址。因为获取字节点名称和获取字节点内容不是一个原子操作,所以在调用 Get 获取内容时可能会出现节点不存在错误,这是正常现象。

    将以上代码凑在一起,一个简单的服务发现包装就实现了。

    最后我们看看如果使用以上代码,为了方便起见,我们将多个服务提供者和消费者写在一个 main 方法里。

    func main() {
            // 服务器地址列表
    	servers := []string{"192.168.0.101:2118", "192.168.0.102:2118", "192.168.0.103:2118"}
    	client, err := NewClient(servers, "/api", 10)
    	if err != nil {
    		panic(err)
    	}
    	defer client.Close()
    	node1 := &ServiceNode{"user", "127.0.0.1", 4000}
    	node2 := &ServiceNode{"user", "127.0.0.1", 4001}
    	node3 := &ServiceNode{"user", "127.0.0.1", 4002}
    	if err := client.Register(node1); err != nil {
    		panic(err)
    	}
    	if err := client.Register(node2); err != nil {
    		panic(err)
    	}
    	if err := client.Register(node3); err != nil {
    		panic(err)
    	}
    	nodes, err := client.GetNodes("user")
    	if err != nil {
    		panic(err)
    	}
    	for _, node := range nodes {
    		fmt.Println(node.Host, node.Port)
    	}
    }
    

    值得注意的是使用时一定要在进程退出前调用 Close 方法,否则 zookeeper 的会话不会立即关闭,服务器创建的临时节点也就不会立即消失,而是要等到 timeout 之后服务器才会清理。

    阅读相关文章,关注公众号 [码洞]

    2 条回复    2018-03-23 10:41:31 +08:00
    fyxtc
        1
    fyxtc  
       2018-03-23 10:37:10 +08:00
    最后一句厉害了。。。
    codehole
        2
    codehole  
    OP
       2018-03-23 10:41:31 +08:00 via Android
    @fyxtc 最后一句怎么了?
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   3543 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 04:39 · PVG 12:39 · LAX 20:39 · JFK 23:39
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.