etcd go 手册

备忘录

ETCD 手册

KV 操作

WithIgnoreLease() 使用租约时可以用这个,当key 不存在时会返回错误 WithPrevKV() 可以返回更新前的KV值 WithIgnoreValue() 普通put 使用这个key不存在时会返回错误 WithSort(clientv3.SortByKey, clientv3.SortDescend) 可以让在查询的时候使用特定的排序方式 WithPrefix() 这可以可以按照key,查找前缀是key字符串的所有值; Get() 使用的WithRev(presp.Header.Revision) ,中的版本号可以时某次put操作返回的版本号,我觉得get的其实也是可以的;

看了下源码 ResponseHeader 这东西里面塞了: ClusterId 和这个消息交互的集群的id
MemberId 节点id Revision 消息版本 RaftTerm 选举的周期

1
2
3
4
5
6
    // 这里获取版本后,该版本之前的历史数据存储开始进行合并压缩
    // 这里会生成快照吗? 按照文档上说这个操作应该是要定时进行的
    compRev := resp.Header.Revision // specify compact revision of your choice

    ctx, cancel = context.WithTimeout(context.Background(), requestTimeout)
    _, err = cli.Compact(ctx, compRev)

func (Maintenance).Status(ctx Context, endpoint string) 可以获取集群的状态 func (Maintenance).Defragment(ctx Context, endpoint string) 这可以开启etcd 的碎片整理

授权管理

先是简单的通过用户名密码来验证

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// 这部分可以手动来进行的 
// etcdctl --user root role add r
    if _, err = cli.RoleAdd(context.TODO(), "r"); err != nil {
            log.Fatal(err)
    }
// etcdctl --user root role grant-permission r   foo zoo
// 使用 -prefix=true 可以仅指定开头前缀
    if _, err = cli.RoleGrantPermission(context.TODO(),"r",   "foo", "zoo", clientv3.PermissionType(clientv3.PermReadWrite),); err != nil {
            log.Fatal(err)
    }
// etcdctl --user root user add  u --new-user-password 123
    if _, err = cli.UserAdd(context.TODO(), "u", "123"); err != nil {
            log.Fatal(err)
    }
// etcdctl --user root user grant-role u r
    if _, err = cli.UserGrantRole(context.TODO(), "u", "r"); err != nil {
            log.Fatal(err)
    }
// etcdctl auth enable
    if _, err = cli.AuthEnable(context.TODO()); err != nil {
            log.Fatal(err)
    }

    // 这里使用 root 角色的用户来登录
    rootCli, err := clientv3.New(clientv3.Config{
        Endpoints:   exampleEndpoints(),
        DialTimeout: dialTimeout,
        Username:    "root",
        Password:    "123",
    })
    if err != nil {
        log.Fatal(err)
    }
    defer rootCli.Close()
    // root 用户可以获取别的 用户或者角色的数据 etcdctl --user root role get r
    resp, err := rootCli.RoleGet(context.TODO(), "r")
    if err != nil {
        log.Fatal(err)
    }
    // 可以获得 角色权限的信息
    fmt.Printf("user u permission: key %q, range end %q\n", resp.Perm[0].Key, resp.Perm[0].RangeEnd)
    // 这里关闭身份校验 etcdctl auth disable
    if _, err = rootCli.AuthDisable(context.TODO()); err != nil {
        log.Fatal(err)
    }

建立客户端连接时使用的证书

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
	tlsInfo := transport.TLSInfo{
			CertFile:      "/tmp/test-certs/test-name-1.pem",
			KeyFile:       "/tmp/test-certs/test-name-1-key.pem",
			TrustedCAFile: "/tmp/test-certs/trusted-ca.pem",
		}
	tlsConfig, err := tlsInfo.ClientConfig()
		if err != nil {
			log.Fatal(err)
		}
		cli, err := clientv3.New(clientv3.Config{
			Endpoints:   exampleEndpoints(),
			DialTimeout: dialTimeout,
			TLS:         tlsConfig,
		})

事务

STM is an interface for software transactional memory. 事务使用 MVCC多版本控制,在事务执行的函数类使用 STM 来读写键值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// Txn 这个简单的事务接口,还是基于客户端连接来的
    kvc := clientv3.NewKV(cli)

		_, err = kvc.Put(context.TODO(), "key", "xyz")
		if err != nil {
			log.Fatal(err)
		}

    ctx, cancel := context.WithTimeout(context.Background(), requestTimeout)
    // if 条件成立 会执行 then 分支的修改,否则会执行else 分支的操作
    _, err = kvc.Txn(ctx).
        // txn value comparisons are lexical
        If(clientv3.Compare(clientv3.Value("key"), ">", "abc")).
        // the "Then" runs, since "xyz" > "abc"
        Then(clientv3.OpPut("key", "XYZ")).
        // the "Else" does not run
        Else(clientv3.OpPut("key", "ABC")).
        Commit()
//
    exchange := func(stm concurrency.STM) {
            from, to := rand.Intn(totalAccounts), rand.Intn(totalAccounts)
            if from == to {
                // nothing to do
                return
            }
            // read values
            fromK, toK := fmt.Sprintf("accts/%d", from), fmt.Sprintf("accts/%d", to)
            fromV, toV := stm.Get(fromK), stm.Get(toK)
            fromInt, toInt := 0, 0
            fmt.Sscanf(fromV, "%d", &fromInt)
            fmt.Sscanf(toV, "%d", &toInt)

            // transfer amount
            xfer := fromInt / 2
            fromInt, toInt = fromInt-xfer, toInt+xfer

            // write back
            stm.Put(fromK, fmt.Sprintf("%d", fromInt))
            stm.Put(toK, fmt.Sprintf("%d", toInt))
            return
        }

    // concurrently exchange values between accounts
    var wg sync.WaitGroup
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go func() {
            defer wg.Done()
            if _, serr := concurrency.NewSTM(cli, func(stm concurrency.STM) error {
                exchange(stm)
                return nil
            }); serr != nil {
                log.Fatal(serr)
            }
        }()
    }
    wg.Wait()

普通的 kv api 其实也有一个Txn ,但是同一个key 只能修改一次

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    orderingKv := ordering.NewKV(cli.KV,
            func(op clientv3.Op, resp clientv3.OpResponse, prevRev int64) error {
                return errOrderViolation
            })
	orderingTxn := orderingKv.Txn(ctx)
	_, err = orderingTxn.If(
		clientv3.Compare(clientv3.Value("b"), ">", "a"),
	).Then(
		clientv3.OpGet("foo"),
	).Commit()
	if err != nil {
		t.Fatal(err)
	}

租约

租约有点像 go 里面的上下文,租约过期时会撤销掉这期间的更改;同时在func (Lease).Revoke(ctx Context, id LeaseID) 释放租约的时候,之前修改会被视作失效了;func (Lease).KeepAliveOnce(ctx Context, id LeaseID) 可以手动续约,避免租约超期被取消了;

key 和 Lease 是多对一的关系。一个 key 最多只能挂绑定一个 Lease ,但是一个 Lease 上能挂多个 key 。租约在申请下来后,关联的操作,我觉得全是修改,会被关联到这个租约的 map 里面,这段事件应该是独占这些个 key 的所有权,所以加进来的key修改,在租约失效的时候,反向调用Txn 来删除这些key,就能把之前的版本恢复

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
	lease, err := cli.Grant(context.Background(), 100)
	if err != nil {
		t.Fatal(err)
	}
    // 每个会话会有一个唯一的ID 和TTL 存活时间
	s, err := concurrency.NewSession(cli, concurrency.WithLease(lease.ID))
	if err != nil {
		t.Fatal(err)
	}
	defer s.Close()
	assert.Equal(t, s.Lease(), lease.ID)

	go s.Orphan()
	select {
	case <-s.Done():
	case <-time.After(time.Millisecond * 100):
		t.Fatal("session did not get orphaned as expected")
	}

使用租约来控制的会话会比租约更早结束,以免出现并发控制的问题?这个和上面的互斥锁连用就可以实现租约时长来控制的互斥锁,超时会退出,并撤销操作? 另外可以给租约设置 TTL 也就是生存时间

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
	s, err := concurrency.NewSession(cli, concurrency.WithTTL(setTTL))
	if err != nil {
		t.Fatal(err)
	}
	defer s.Close()

	leaseID := s.Lease()
	// TTL retrieved should be less than the set TTL, but not equal to default:60 or exprired:-1
	resp, err := cli.Lease.TimeToLive(context.Background(), leaseID)
	if err != nil {
		t.Log(err)
	}
	if resp.TTL == -1 {
		t.Errorf("client lease should not be expired: %d", resp.TTL)

	}
	if resp.TTL == 60 {
		t.Errorf("default TTL value is used in the session, instead of set TTL: %d", setTTL)
	}
	if resp.TTL >= int64(setTTL) || resp.TTL < int64(setTTL)-20 {
		t.Errorf("Session TTL from lease should be less, but close to set TTL %d, have: %d", setTTL, resp.TTL)
	}

这里可以看到 租约的实际时间是比设置的要短的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
	lease, err := cli.Grant(context.Background(), 100)
	if err != nil {
		t.Fatal(err)
	}
	s, err := concurrency.NewSession(cli, concurrency.WithLease(lease.ID))
	if err != nil {
		t.Fatal(err)
	}
	defer s.Close()
	assert.Equal(t, s.Lease(), lease.ID)
    // 主要是通过 会话的上下文的Done 来控制会话内操作的退出
	childCtx, cancel := context.WithCancel(s.Ctx())
	defer cancel()

	go s.Orphan()
	select {
	case <-childCtx.Done():
	case <-time.After(time.Millisecond * 100):
		t.Fatal("child context of session context is not canceled")
	}

会话和 go 原生的上下文的使用; 总结一下:

  • 租约加 会话加互斥锁 可以实现分布式锁
  • 租约加会话加 上下文,可以取消会话内协程的执行

分布式锁

etcd 3有个并发api ,调用这个api 可以实现分布式锁,锁会持有到主动解锁或者租期到了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
   // 新建会话是一个标准流程表,因为下面申请锁需要通过一个会话来进行
   s1, err := concurrency.NewSession(cli)
   if err != nil {
   	t.Fatal(err)
   }
   defer s1.Close()
   m1 := concurrency.NewMutex(s1, "/my-lock/")
   if err = m1.Lock(context.TODO()); err != nil {
   t.Fatal(err)
   }
   // 这之间就是s1 获得锁的临界区
   if err := m1.Unlock(context.TODO()); err != nil {
   t.Fatal(err)
   }

如果先调用解锁,会得到ErrLockReleased 也就是锁已经被释放了,或者没有获得锁,总而言之就是当前没有持有锁

服务发现和注册

实际是etcd根据mainID去磁盘查数据,磁盘中数据以revision.main+revision.sub为key(bbolt 数据库中的key),所以就会依次遍历出所有的版本数据。同时判断遍历到的value中的key(etcd中的key)是不是用户watch的,是则推送给用户。

这里每次都会遍历数据库性能可能会很差,实际使用时一般用户只会关注最新的revision,不会去关注旧数据。

采用了MVCC,以一种优雅的方式解决了锁带来的问题。执行写操作或删除操作时不会再原数据上修改而是创建一个新版本。这样并发的读取操作仍然可以读取老版本的数据,写操作也可以同时进行。这个模式的好处在于读操作不再阻塞,事实上根本就不需要锁。 客户端读key的时候指定一个版本号,服务端保证返回比这个版本号更新的数据,但不保证返回最新的数据。 MVCC能最大化地实现高效地读写并发,尤其是高效地读,非常适合读多写少的场景。

客户端使用watch 来获取服务端地址

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
    var serviceTarget = "Hello"
    type remoteService struct {
      name string
      nodes map[string]string
      mutex sync.Mutex
    }
    service = &remoteService {
      name: serviceTarget
    } 
    kv := clientv3.NewKV(etcdClient)
    rangeResp, err := kv.Get(context.TODO(), service.name, clientv3.WithPrefix())
    if err != nil {
       panic(err)
    }

    service.mutex.Lock()
    for _, kv := range rangeResp.Kvs {
        service.nodes[string(kv.Key)] = string(kv.Value)
    }
    service.mutex.Unlock()

    go watchServiceUpdate(etcdClient, service)


// 监控服务目录下的事件
func watchServiceUpdate(etcdClient clientv3.Client, service *remoteService) {
    watcher := clientv3.NewWatcher(client)
    // Watch 服务目录下的更新
    watchChan := watcher.Watch(context.TODO(), service.name, clientv3.WithPrefix())
    for watchResp := range watchChan {
        // 这里对增删时间的响应,会使用互斥锁来解决并发的数据修改问题
          for _, event := range watchResp.Events {
                service.mutex.Lock()
                switch (event.Type) {
                    case mvccpb.PUT://PUT事件,目录下有了新key
                      service.nodes[string(event.Kv.Key)] = string(event.Kv.Value)
                    case mvccpb.DELETE://DELETE事件,目录中有key被删掉(Lease过期,key 也会被删掉)
                      delete(service.nodes, string(event.Kv.Key))
                }
                service.mutex.Unlock()
          }
    }
}

服务端主要是注意租约的维护

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 将服务注册到etcd上
func RegisterServiceToETCD(ServiceTarget string, value string) {
    dir = strings.TrimRight(ServiceTarget, "/") + "/"

    client, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
    panic(err)
    }

    kv := clientv3.NewKV(client)
    lease := clientv3.NewLease(client)
    var curLeaseId clientv3.LeaseID = 0

    for {
        if curLeaseId == 0 {
            leaseResp, err := lease.Grant(context.TODO(), 10)
            if err != nil {
              panic(err)
            }

            key := ServiceTarget + fmt.Sprintf("%d", leaseResp.ID)
            if _, err := kv.Put(context.TODO(), key, value, clientv3.WithLease(leaseResp.ID)); err != nil {
                  panic(err)
            }
            curLeaseId = leaseResp.ID
        } else {
      // 续约租约,如果租约已经过期将curLeaseId复位到0重新走创建租约的逻辑
            if _, err := lease.KeepAliveOnce(context.TODO(), curLeaseId); err == rpctypes.ErrLeaseNotFound {
                curLeaseId = 0
                continue
            }
        }
        time.Sleep(time.Duration(1) * time.Second)
    }
}

使用 watch 监视的时候 clientv3.WithRev(1) 可以指定从哪个版本开始获取,clientv3.WithFragment() 会允许服务端将事件分页发送过来

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
select {
	case ws := <-wch:
		// 没启用分页的时候,因为对应的 key 的值太大了,旧没接收到
		if !fragment && exceedRecvLimit {
			if len(ws.Events) != 0 {
				t.Fatalf("expected 0 events with watch fragmentation, got %d", len(ws.Events))
			}
			exp := "code = ResourceExhausted desc = grpc: received message larger than max ("
			if !strings.Contains(ws.Err().Error(), exp) {
				t.Fatalf("expected 'ResourceExhausted' error, got %v", ws.Err())
			}
			return
		}

		// 启用分页将每次发送的数据分成限制内大小后,拿到的分页数,这个事件本身是键值对的一个切片,里面的元素是类似CPP 的 pair 这种键值二元组
		if len(ws.Events) != 10 {
			t.Fatalf("expected 10 events with watch fragmentation, got %d", len(ws.Events))
		}
		if ws.Err() != nil {
			t.Fatalf("unexpected error %v", ws.Err())
		}

	case <-time.After(testutil.RequestTimeout):
		t.Fatalf("took too long to receive events")
	}

使用 cfg.ClientMaxCallRecvMsgSize = 1.5 * 1024 * 1024 修改集群配置时,会限制集群给客户端发送消息大小

观测

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import(
	grpcprom "github.com/grpc-ecosystem/go-grpc-prometheus"
	"github.com/prometheus/client_golang/prometheus/promhttp"
)
    // 这样在客户端的 grpc 连接里面塞两个Prometheus的中间件进去
		cli, err := clientv3.New(clientv3.Config{
			Endpoints: exampleEndpoints(),
			DialOptions: []grpc.DialOption{
				grpc.WithUnaryInterceptor(grpcprom.UnaryClientInterceptor),
				grpc.WithStreamInterceptor(grpcprom.StreamClientInterceptor),
			},
		})
        if err!=nil{
            log.Fatal(err)
        }
        defer cli.close()
        // 开个 http 服务端
        ln, err := net.Listen("tcp", ":0")
        if err != nil {
			log.Fatal(err)
		}
        defer ln.close()
        http.Serve(ln, promhttp.Handler()) // 现在就可以被监听到了

调优

io优先级

1
sudo ionice -c2 -n0 -p `pgrep etcd`

快照触发数量

1
etcd --snapshot-count=5000

心跳和选举时间

1
 etcd --heartbeat-interval=100 --election-timeout=500

cpu

1
echo performance | tee /sys/devices/system/cpu/cpu*/cpufreq/scaling_governor

一些维护操作

1
2
3
4
5
6
7
8
9
etcdctl member list -w table  # 可以查看节点信息
etcdctl move-leader  XXID   --endpoints 127.0.0.1:2379 
etcdctl member remove xxxID
# 重新将一个节点添加到集群里面来
etcdctl member add etcd01 --peer-urls="https://xxxxxxx:2380"
# 对某个节点存储快照
etcdctl --endpoints=https://10.184.4.240:2380 snapshot save snapshot.db
# 从节点快照恢复数据
etcdctl snapshot restore snapshot.db --name etcd01 --initial-cluster etcd01=https://10.184.4.238:2379,etcd02=https://10.184.4.239:2379,etcd03=https://10.184.4.240:2379  --initial-cluster-token etcd-cluster --initial-advertise-peer-urls https://10.184.4.238:2380

使用客户端api 也是可以实现上面的操作的

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 添加一个 节点进来   2380 一般是这个端口,用来做集群间通信的,那个2379的是用监听客户端的
    mresp, err := cli.MemberAdd(context.Background(), []string{"http://localhost:32380"})
    if err != nil {
    	log.Fatal(err)
    }
    fmt.Println("added member.PeerURLs:", mresp.Member.PeerURLs)
    fmt.Println("members count:", len(mresp.Members))

    // Restore original cluster state
    _, err = cli.MemberRemove(context.Background(), mresp.Member.ID)
    if err != nil {
    	log.Fatal(err)
    }
          // 这个添加进来做从节点?
       mresp, err := cli.MemberAddAsLearner(context.Background(), []string{"http://localhost:32381"})
    if err != nil {
    	log.Fatal(err)
    }
          // 这里用来获取集群的节点列表
          resp, err := cli.MemberList(context.Background())
    if err != nil {
    	log.Fatal(err)
    }
          // 修改节点的内部通信地址
          peerURLs := []string{"http://localhost:12380"}
    _, err = cli.MemberUpdate(context.Background(), resp.Members[0].ID, peerURLs)
    if err != nil {
    	log.Fatal(err)
    }

快照

etcd 的快照和虚拟机的快照比较类似,是摸一个时间点etcd 节点的所有数据;快照是一个checkpoint,避免因为wal 数据被无限制写入,导致体量超大,通过checkpoint做一个记录,后续的wal可以做增量,checkpoint生成的快照充当的应该是快照前的数据,发生修改后的数据会在wal上,(也不能这么说,因为wal记录本来就是修改记录)。

Licensed under CC BY-NC-SA 4.0
往日已经不在,未来尚未开始
使用 Hugo 构建
主题 StackJimmy 设计