跳到主要内容

etcd

信息

etcd 是一个分布式键值对存储系统,由coresos 开发,内部采用 raft 协议作为一致性算法,用于可靠、快速地保存关键数据,并提供访问。通过分布式锁、leader选举和写屏障(write barriers),来实现可靠的分布式协作。etcd集群是为高可用、持久化数据存储和检索而准备。

概念词汇

  1. Raft: etcd所采用的保证分布式系统强一致性的算法。
  2. Node: 一个Raft状态机实例。
  3. Member: 一个etcd实例。它管理着一个Node,并且可以为客户端请求提供服务。
  4. Cluster: 由多个Member构成、可以协同工作的etcd集群。
  5. Peer: 对同一个etcd集群中另外一个Member的称呼。
  6. Client: 向etcd集群发送HTTP请求的客户端。
  7. WAL: 预写式日志,etcd用于持久化存储的日志格式。
  8. snapshot: etcd防止WAL文件过多而设置的快照,存储etcd数据状态。
  9. Proxy: etcd的一种模式,为etcd集群提供反向代理服务。
  10. Leader: Raft算法中,通过竞选而产生的、处理所有数据提交的节点。
  11. Follower: 竞选失败的节点作为Raft中的从属节点,为算法提供强一致性保证。
  12. Term: 某个节点成为Leader到下一次竞选时间,称为一个Term。
  13. Index: 数据项编号。Raft中通过Term和Index来定位数据

应用场景

  • 服务发现
  • 消息发布与订阅
  • 负载均衡
  • 分布式通知与协调
  • 分布式锁、分布式队列
  • 集群监控与Leader竞选

ectd 与 redis

etcd: 用于共享配置和服务发现的分布式一致键值存储. etcd 是一种分布式键值存储, 它提供了一种跨机器集群存储数据的可靠方式. etcd 在网络分区期间优雅地处理 master 选举, 并且会容忍机器故障.

redis: 持久化在磁盘上的内存数据库, Redis 是一个开源、BSD 许可的高级键值存储. 它通常被称为数据结构服务器, 因为键可以包含字符串、散列、列表、集合和排序集合.

代码实现

服务注册


import (
"context"
"github.com/coreos/etcd/clientv3"
"log"
"time"
)

// ServiceRegister 创建租约注册服务
type ServiceRegister struct {
cli *clientv3.Client //etcd client
leaseID clientv3.leaseID //租约ID
keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
key string //key
val string //value
}
//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}

ser := &ServiceRegister{
cli: cli,
key: key,
val: val,
}

//申请租约设置时间keepalive并注册服务
if err := ser.putKeyWithLease(lease); err != nil {
return nil, err
}

return ser, nil
}

//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
//设置租约时间
resp, err := s.cli.Grant(context.Background(), lease)
if err != nil {
return err
}
//注册服务并绑定租约
_, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
if err != nil {
return err
}
//设置续租 定期发送需求请求
leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

if err != nil {
return err
}
s.leaseID = resp.ID
log.Println(s.leaseID)
s.keepAliveChan = leaseRespChan
log.Printf("Put key:%s val:%s success!", s.key, s.val)
return nil
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
for leaseKeepResp := range s.keepAliveChan {
log.Println("续约成功", leaseKeepResp)
}
log.Println("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
//撤销租约
if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
return err
}
log.Println("撤销租约")
return s.cli.Close()
}

func main() {
var endpoints = []string{"192.168.79.134:2379"}
ser, err := NewServiceRegister(endpoints, "/web", "192.168.1.51:8000", 5)
if err != nil {
log.Fatalln(err)
}
//监听续租相应chan
go ser.ListenLeaseRespChan()
select {
case <-time.After(20 * time.Second):
ser.Close()
}
}

    服务发现  


package main

import (
"context"
"log"
"sync"
"time"

"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
)

//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
cli *clientv3.Client //etcd client
serverList map[string]string //服务列表
lock sync.Mutex
}

//NewServiceDiscovery 新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
cli, err := clientv3.New(clientv3.Config{
Endpoints: endpoints,
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}

return &ServiceDiscovery{
cli: cli,
serverList: make(map[string]string),
}
}

//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
//根据前缀获取现有的key
resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
if err != nil {
return err
}

for _, ev := range resp.Kvs {
s.SetServiceList(string(ev.Key), string(ev.Value))
}

//监视前缀,修改变更的server
go s.watcher(prefix)
return nil
}

//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
log.Printf("watching prefix:%s now...", prefix)
for wresp := range rch {
for _, ev := range wresp.Events {
switch ev.Type {
case mvccpb.PUT: //修改或者新增
s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
case mvccpb.DELETE: //删除
s.DelServiceList(string(ev.Kv.Key))
}
}
}
}

//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
s.lock.Lock()
defer s.lock.Unlock()
s.serverList[key] = string(val)
log.Println("put key :", key, "val:", val)
}

//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
s.lock.Lock()
defer s.lock.Unlock()
delete(s.serverList, key)
log.Println("del key:", key)
}

//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
s.lock.Lock()
defer s.lock.Unlock()
addrs := make([]string, 0)

for _, v := range s.serverList {
addrs = append(addrs, v)
}
return addrs
}

//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
return s.cli.Close()
}

func main() {
var endpoints = []string{"192.168.79.134:2379"}
ser := NewServiceDiscovery(endpoints)
defer ser.Close()
_ = ser.WatchService("/web")
for {
select {
case <-time.Tick(10 * time.Second):
log.Println(ser.GetServices())
}
}
}