Etcd数据库的基础使用

介绍

Etcd 是 CoreOS 基于 Raft 开发的高可用分布式 Key/Value 存储系统,可用于服务发现、共享配置以及一致性保障(如数据库选主、分布式锁等)。

  • 简单:支持 curl 方式的用户 API (HTTP+JSON)
  • 安全:可选 SSL 客户端证书认证
  • 快速:单实例可达每秒 10000 次写操作
  • 可靠:使用 Raft 实现分布式

Etcd 主要功能:

  • 基本的 key-value 存储
  • 监听机制
  • key 的过期及续约机制,用于监控和服务发现
  • 原子 CAS 和 CAD,用于分布式锁和 leader 选举

官网:https://etcd.io , 开源地址:https://github.com/etcd-io/etcd

集群部署(Docker方式)

执行脚本安装:

./install.sh

install.sh 脚本内容:

#!/bin/bash
DATA_PATH=/opt/mydocker/etcd3.4
COMPOSE_FILE_PATH=./docker-compose.yaml

mkdir -p ${DATA_PATH}
chmod 777 ${DATA_PATH}
mkdir -p ${DATA_PATH}/etcd1
chmod 777 ${DATA_PATH}/etcd1
mkdir -p ${DATA_PATH}/etcd2
chmod 777 ${DATA_PATH}/etcd2
mkdir -p ${DATA_PATH}/etcd3
chmod 777 ${DATA_PATH}/etcd3

docker-compose -f ${COMPOSE_FILE_PATH} stop
docker-compose -f ${COMPOSE_FILE_PATH} rm -f
docker-compose -f ${COMPOSE_FILE_PATH} up -d

docker-compose.yaml 配置:

version: '3'
services:
  etcd1:
    container_name: "etcd1"
    restart: always
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd1'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd1:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd1:2379'
      - '--initial-cluster-token=mys1cr2tt1k7n'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - "12379:2379"
    volumes:
      - /opt/mydocker/etcd3.4/etcd1:/etcd_data
  etcd2:
    container_name: "etcd2"
    restart: always
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd2'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd2:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd2:2379'
      - '--initial-cluster-token=mys1cr2tt1k7n'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - "22379:2379"
    volumes:
      - /opt/mydocker/etcd3.4/etcd2:/etcd_data
  etcd3:
    container_name: "etcd3"
    restart: always
    image: "quay.io/coreos/etcd:v3.4.7"
    entrypoint: /usr/local/bin/etcd
    command:
      - '--name=etcd3'
      - '--data-dir=/etcd_data'
      - '--initial-advertise-peer-urls=http://etcd3:2380'
      - '--listen-peer-urls=http://0.0.0.0:2380'
      - '--listen-client-urls=http://0.0.0.0:2379'
      - '--advertise-client-urls=http://etcd3:2379'
      - '--initial-cluster-token=mys1cr2tt1k7n'
      - '--heartbeat-interval=250'
      - '--election-timeout=1250'
      - '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
      - '--initial-cluster-state=new'
    ports:
      - "32379:2379"
    volumes:
      - /opt/mydocker/etcd3.4/etcd3:/etcd_data

终端操作:

docker exec -it etcd1 etcdctl endpoint health
docker exec -it etcd1 etcdctl put secret 'code123'
docker exec -it etcd1 etcdctl get secret
docker exec -it etcd1 etcdctl --write-out="json" get secret
docker exec -it etcd1 etcdctl --write-out="table" member list
docker exec -it etcd1 etcdctl endpoint status --cluster -w table
docker exec -it etcd1 etcdctl get / --prefix 

docker-compose ps                                                                                                                                                                 master 
Name               Command               State                          Ports                        
-----------------------------------------------------------------------------------------------------
etcd1   /usr/local/bin/etcd --name ...   Up      0.0.0.0:12379->2379/tcp,:::12379->2379/tcp, 2380/tcp
etcd2   /usr/local/bin/etcd --name ...   Up      0.0.0.0:22379->2379/tcp,:::22379->2379/tcp, 2380/tcp
etcd3   /usr/local/bin/etcd --name ...   Up      0.0.0.0:32379->2379/tcp,:::32379->2379/tcp, 2380/tcp

对外暴露的端口是 12379,22379,32379,所以请求时:

endpoints := []string{"127.0.0.1:12379","127.0.0.1:22379","127.0.0.1:32379"}

快速入门

https://etcd.io/docs/v3.4/dev-guide/interacting_v3

场景使用:Leader选举

如果现在部署了一个微服务的多个副本,但只能由其中一个副本来执行某些操作,那么需要从这些副本中选出一个leader。场景有:定时器服务执行(多副本保持高可用)

可参考:

https://github.com/etcd-io/etcd/tree/main/client/v3/concurrency

https://github.com/etcd-io/etcd/blob/main/tests/integration/clientv3/concurrency/example_election_test.go

示例:只有leader节点能执行crontab;lease租约有效期设为15s,leader节点异常,最长等待15s后由其他副本接力。

package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/client/v3"
	"go.etcd.io/etcd/client/v3/concurrency"
	"log"
	"time"
)

const prefix = "/election-demo" // 可以认为是资源,或者锁的前缀,前缀下资源版本号最小的那个是leader
const prop = "local"            // 值,任何值都可以,固定死

var leaderFlag bool

func main() {
	endpoints := []string{"127.0.0.1:12379", "127.0.0.1:22379", "127.0.0.1:32379"}
	donec := make(chan struct{})

	cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	// 这里开始选举leader节点
	go campaign(cli, prefix, prop)

	go func() {
		ticker := time.NewTicker(time.Duration(5) * time.Second)
		for {
			select {
			case <-ticker.C:
				// 只有leader才能执行定时器
				if leaderFlag == true {
					doCrontab()
				}
			}
		}
	}()

	<-donec
}

func campaign(c *clientv3.Client, election string, prop string) {
	for {
		fmt.Println("campaign")

		// 创建session,session参与选主,etcd的client需要自己传入。
		// session中keepAlive机制会一直续租,如果keepAlive断掉,session.Done会收到退出信号
		s, err := concurrency.NewSession(c, concurrency.WithTTL(15))
		if err != nil {
			fmt.Println(err)
			time.Sleep(1 * time.Second) // 重新选举,不致于重试的频率太高
			continue
		}

		fmt.Println("lease_id:", s.Lease())

		// 创建一个新的选举election
		e := concurrency.NewElection(s, election)
		ctx := context.TODO()

		// 调用Campaign方法,成为leader的节点会运行出来,非leader节点会阻塞在里面
		if err = e.Campaign(ctx, prop); err != nil {
			fmt.Println(err)
			time.Sleep(1 * time.Second) // 重新选举,不致于重试的频率太高
			continue
		}

		// 运行到这的协程,成为leader
		fmt.Println("elect: success")
		leaderFlag = true

		select {
		// 如果因为网络因素导致与etcd断开了keepAlive,会收到信号,重新选举
		case <-s.Done():
			leaderFlag = false
			fmt.Println("elect: expired")
			time.Sleep(1 * time.Second) // 重新选举,不致于重试的频率太高
			continue
		}
	}
}

func doCrontab() {
	fmt.Println("doCrontab")
}

选举原理

etcd中concurrency包下已经帮我们实现好了选主,我们只需要调用其api实现就可以了,下面我们分析下etcd是如何实现选主机制的。直接进行源码分析:

// Campaign puts a value as eligible for the election on the prefix
// key.
// Multiple sessions can participate in the election for the
// same prefix, but only one can be the leader at a time.
//
// If the context is 'context.TODO()/context.Background()', the Campaign
// will continue to be blocked for other keys to be deleted, unless server
// returns a non-recoverable error (e.g. ErrCompacted).
// Otherwise, until the context is not cancelled or timed-out, Campaign will
// continue to be blocked until it becomes the leader.
 
// 多个etcd的session可以通过prefix来参与选举。但是只有一个session能成为leader。
// Campaign方法会阻塞,直到session成功成为leader才返回。
func (e *Election) Campaign(ctx context.Context, val string) error {
	s := e.session
	client := e.session.Client()
	// 根据前缀和租约创建当前key
	k := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())
	// 如果是第一次创建key,那么key的revision为0
	// 这里用到了etcd的事务,如果if判断为true,那么put这个key,否则get这个key;最终都能获取到这个key的内容。
	txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))
	txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))
	txn = txn.Else(v3.OpGet(k))
	resp, err := txn.Commit()
	if err != nil {
		return err
	}
	e.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, s
	// 这里是事务中if判断为false,即执行了else
	if !resp.Succeeded {
		kv := resp.Responses[0].GetResponseRange().Kvs[0]
		e.leaderRev = kv.CreateRevision
		if string(kv.Value) != val { // 判定val是否相同,不相同的话,在不更换leader的情况下,更新val
			if err = e.Proclaim(ctx, val); err != nil {
				e.Resign(ctx)
				return err
			}
		}
	}
	// 等待prefix前缀下所有比当前key的revision小的其他key都被删除后,才返回,竞选为leader
	_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)
	if err != nil {
		// clean up in case of context cancel
		select {
		case <-ctx.Done():
			e.Resign(client.Ctx())
		default:
			e.leaderSession = nil
		}
		return err
	}
	e.hdr = resp.Header
 
	return nil
}


func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {
	cctx, cancel := context.WithCancel(ctx)
	defer cancel()
 
	var wr v3.WatchResponse
	// 这里watch指定的key,对于这个key所有events事件,都会收到服务端的推送。
	wch := client.Watch(cctx, key, v3.WithRev(rev))
	for wr = range wch {
		for _, ev := range wr.Events {
			if ev.Type == mvccpb.DELETE { // 如果当前这个key被删除了,那么会退出这个方法,watch下一个key。
				return nil
			}
		}
	}
	if err := wr.Err(); err != nil {
		return err
	}
	if err := ctx.Err(); err != nil {
		return err
	}
	return fmt.Errorf("lost watcher waiting for delete")
}
 
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {
        // option,获取createRevision不大于maxCreateRev的key,只取最后一个revision最大的
	getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))
	for {
		// 获取前缀prefix下,所有比指定revision小的key
		resp, err := client.Get(ctx, pfx, getOpts...)
		if err != nil {
			return nil, err
		}
		if len(resp.Kvs) == 0 {
			return resp.Header, nil
		}
		lastKey := string(resp.Kvs[0].Key)
		// 去watch revision最大的key,这里也会阻塞的watch。外层有循环判断,要等所有比revision小的key的没了,才退出。
                // 下方有具体的说明
		if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {
			return nil, err
		}
	}
}

总体来说还是比较好理解的,主要是利用watch机制来实现了节点在不是leader的时候的阻塞机制。

可以看到,每个节点都创建了自己的key,但是这些key的前缀是一致的,选主是根据前缀去选主的。

如果有a,b,c三个节点同时去竞选,分别对应竞选的createRevision是0,1,2,那么每个节点会watch比自己createRevision小并且最大的节点,这是个循环的过程,等到所有比自己createRevision小的节点都被删除后,自己才成为leader。

对应的,a节点会成为leader,b节点在watch a节点,c节点在watch b节点。如果b节点key被删除了,c节点会去watch a节点。

如果a节点key被删除了,b节点会成为leader。