最近在看etcd的相关源码,从简单的raftexample进行代码入手是一个不错的方案。相应代码入口在etcd-io/etcd/contrib/raftexample,建议可以先从github上将对应clone下来对照着来会好些。

raftexample大致流程图

我尝试用过先整理整个大概框架,然后再通过细分讲解里面具体模块的方式来说明这个服务的处理流程。
etcd-raftexampleda-zhi-liu-cheng-tu
挑里面的重点,简单讲解下流程

  • 这个示例中,我们Server支持PUTGETPOSTDELETE共四种请求方式,分别对应的也是数据的更新和获取,配置的新增和删除;
  • 步骤1、2,我们以其中的PUT方法展开说明,当收到client发送过来的数据更新或新建操作后,首先会调用我们的状态机(KVStore)向我们的通信管道(proposeC)发送数据;
  • 步骤3,raftNode模块从管道proposeC中监听到有数据事件到来后会将该数据继续往下层模块node传递,node模块也算是我们raft协议里面实现的核心部分,在这个地方我们先记住它是预先对我们数据进行了一些简单处理。随后node模块会通过另外一个管道告知我们数据预处理好了;
  • 步骤4,当raftNode收到从nkvode模块传递上来的准备就绪信息后就开始进行余下的操作,如wal日志写入-->更新raft示例的内存状态-->通过广播的形式将自己收到的消息发送给集群其它peer-->更新自身状态机-->触发快照-->告知底层node我已处理完成,可以发送下一个消息.

提前感知的结构体

这里提前感知两个结构体kvstore,raftNode,分别在这里扮演的是kv存储系统和对底层Raft核心库的封装逻辑处理的角色。
kvstore

// a key-value store backed by raft
type kvstore struct {
	proposeC    chan<- string // channel for proposing updates
	mu          sync.RWMutex
	kvStore     map[string]string // current committed key-value pairs
	snapshotter *snap.Snapshotter
} // kvstore.go

proposeC: 应用与底层Raft核心库之间的通信channel,当用户向应用通过 http 发送更新请求时,应用会将此请求通过channel传递给底层的Raft库。
kvStore: kv 结构的内存存储,即对应应用的状态机。
snapshotter: 由应用管理的快照snapshot接口。

raftNode

// A key-value stream backed by raft
type raftNode struct {
	proposeC    <-chan string            // proposed messages (k,v)
	confChangeC <-chan raftpb.ConfChange // proposed cluster config changes
	commitC     chan<- *string           // entries committed to log (k,v)
	errorC      chan<- error             // errors from raft session

	id          int      // client ID for raft session
	peers       []string // raft peer URLs
	join        bool     // node is joining an existing cluster
	waldir      string   // path to WAL directory
	snapdir     string   // path to snapshot directory
	getSnapshot func() ([]byte, error)
	lastIndex   uint64 // index of log at start

	confState     raftpb.ConfState
	snapshotIndex uint64
	appliedIndex  uint64

	// raft backing for the commit/error channel
	node        raft.Node
	raftStorage *raft.MemoryStorage
	wal         *wal.WAL

	snapshotter      *snap.Snapshotter
	snapshotterReady chan *snap.Snapshotter // signals when snapshotter is ready

	snapCount uint64
	transport *rafthttp.Transport
	stopc     chan struct{} // signals proposal channel closed
	httpstopc chan struct{} // signals http server to shutdown
	httpdonec chan struct{} // signals http server shutdown complete
}

proposeC: 同kvStore.proposeC通道类似,事实上,kvStore会将用户的更新请求传递给raftNode以使得其最终能传递给底层的Raft协议库。
confChangeC: Raft协议通过此channel来传递集群配置变更的请求给应用。
commitC: 底层Raft协议通过此channel可以向应用传递准备提交或应用的channel,最终kvStore会反复从此通道中读取可以提交的日志entry,然后正式应用到状态机。
node: 即底层Raft协议组件,raftNode可以通过node提供的接口来与Raft组件进行交互。
raftStorage: Raft协议的状态存储组件,应用在更新kvStore状态机时,也会更新此组件,并且通过raft.Config传给Raft协议。
wal: 管理WAL日志,前文提过etcd将日志的相关逻辑交由应用来管理。
snapshotter: 管理 snapshot文件,快照文件也是由应用来管理。
transport: 应用通过此接口与集群中其它的节点(peer)通信,比如传输日志同步消息、快照同步消息等。网络传输也是由应用来处理。

以上便是我们处理的大致流程,下面我们从代码的角度,剥洋葱的方式先剥一层来看看。

Server Listen模块

对client的监听逻辑如下:

func main() {
	// ... 一些初始化操作
    
        // 对raftNode的初始化操作
	commitC, errorC, snapshotterReady := newRaftNode(*id, strings.Split(*cluster, ","), *join, getSnapshot, proposeC, confChangeC)

        // 对KVStore的初始化操作
	kvs = newKVStore(<-snapshotterReady, proposeC, commitC, errorC)

	// the key-value http handler will propose updates to raft
        // 启动http监听
	serveHttpKVAPI(kvs, *kvport, confChangeC, errorC)
}

serveHttpKVAPI:

// serveHttpKVAPI starts a key-value server with a GET/PUT API and listens.
func serveHttpKVAPI(kv *kvstore, port int, confChangeC chan<- raftpb.ConfChange, errorC <-chan error) {
	srv := http.Server{
		Addr: ":" + strconv.Itoa(port),
		Handler: &httpKVAPI{
			store:       kv,
			confChangeC: confChangeC,
		},
	}
	go func() {
		if err := srv.ListenAndServe(); err != nil {
			log.Fatal(err)
		}
	}()

	// exit when raft goes down
	if err, ok := <-errorC; ok {
		log.Fatal(err)
	}
} // httpapi.go

这里在入参的地方将状态机kv和配置变更confChangeC传递进了回调函数,以便能利用其中的通信管道等

ServeHTTP:

func (h *httpKVAPI) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	key := r.RequestURI
	defer r.Body.Close()
	switch {
	case r.Method == "PUT":
		v, err := ioutil.ReadAll(r.Body)
		if err != nil {
			log.Printf("Failed to read on PUT (%v)\n", err)
			http.Error(w, "Failed on PUT", http.StatusBadRequest)
			return
		}

		h.store.Propose(key, string(v))

		// Optimistic-- no waiting for ack from raft. Value is not yet
		// committed so a subsequent GET on the key may return old value
		w.WriteHeader(http.StatusNoContent)
	
    ...
    
	default:
		w.Header().Set("Allow", "PUT")
		w.Header().Add("Allow", "GET")
		w.Header().Add("Allow", "POST")
		w.Header().Add("Allow", "DELETE")
		http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
	}
}

有新的请求到达时会触发该接口,使用请求URI作为key;
读出其中body后,通过调用==h.store.Propose(key, string(v))==来讲value传递给管道时间proposeC,也就是最上面的步骤1

raftNode模块

这是整个项目的核心部分之一,负责了从node、httpserver、kvs等模块的各个监听事件,并把对应模块进行了串联操作,这里用到了raftNode,我们先来看对这个结构体的定义

func newRaftNode(id int, peers []string, join bool, getSnapshot func() ([]byte, error), proposeC <-chan string,
	confChangeC <-chan raftpb.ConfChange) (<-chan *string, <-chan error, <-chan *snap.Snapshotter) {

	commitC := make(chan *string)
	errorC := make(chan error)

	rc := &raftNode{
		proposeC:    proposeC,
		confChangeC: confChangeC,
		commitC:     commitC,
		errorC:      errorC,
		id:          id,
		peers:       peers,
		join:        join,
		waldir:      fmt.Sprintf("raftexample-%d", id),
		snapdir:     fmt.Sprintf("raftexample-%d-snap", id),
		getSnapshot: getSnapshot,
		snapCount:   defaultSnapshotCount,
		stopc:       make(chan struct{}),
		httpstopc:   make(chan struct{}),
		httpdonec:   make(chan struct{}),

		snapshotterReady: make(chan *snap.Snapshotter, 1),
		// rest of structure populated after WAL replay
	}
	go rc.startRaft()
	return commitC, errorC, rc.snapshotterReady
} // raft.go

首先进行的是初始化raftNode实例,其中包含了串联整个上下层的proposeC、confChangeC。然后将commitC、errorC、snapshotterReady返回出去,以便被其他模块流程使用。
下面是startRaft,在一些关键的步骤上加上了注释:

func (rc *raftNode) startRaft() {
	if !fileutil.Exist(rc.snapdir) {
		if err := os.Mkdir(rc.snapdir, 0750); err != nil {
			log.Fatalf("raftexample: cannot create dir for snapshot (%v)", err)
		}
	}
	rc.snapshotter = snap.New(zap.NewExample(), rc.snapdir)
	rc.snapshotterReady <- rc.snapshotter

	oldwal := wal.Exist(rc.waldir) // 判断是否已存在WAL日志(在节点当机重启时会执行) 
	rc.wal = rc.replayWAL() // 重放WAL日志以应用到raft实例中

	rpeers := make([]raft.Peer, len(rc.peers))
	for i := range rpeers {
		rpeers[i] = raft.Peer{ID: uint64(i + 1)}
	}
	c := &raft.Config{ // 初始化自身底层raft协议实例的配置结构
		ID:                        uint64(rc.id),
		ElectionTick:              10,
		HeartbeatTick:             1,
		Storage:                   rc.raftStorage,
		MaxSizePerMsg:             1024 * 1024,
		MaxInflightMsgs:           256,
		MaxUncommittedEntriesSize: 1 << 30,
	}
        // 启动底层raft 
	if oldwal {
		rc.node = raft.RestartNode(c)
	} else {
		startPeers := rpeers
		if rc.join {
			startPeers = nil
		}
		rc.node = raft.StartNode(c, startPeers)
	}

        // 初始化网络组件
	rc.transport = &rafthttp.Transport{
		Logger:      zap.NewExample(),
		ID:          types.ID(rc.id),
		ClusterID:   0x1000,
		Raft:        rc,
		ServerStats: stats.NewServerStats("", ""),
		LeaderStats: stats.NewLeaderStats(strconv.Itoa(rc.id)),
		ErrorC:      make(chan error),
	}

	rc.transport.Start()
	for i := range rc.peers {
		if i+1 != rc.id {
			rc.transport.AddPeer(types.ID(i+1), []string{rc.peers[i]})
		}
	}

	go rc.serveRaft() // 本节点与其他节点相互通信的http服务监听
	go rc.serveChannels() // 本节点与底层raft之间的通信模块
} // raft.go

StartNode,RestartNode功能类似,都是启动或重启底层的raft,该部分是底层raft的核心部分
serveRaft是用来和其他节点进行交互的部分,比如新加入一个节点
serveChannels本节点与底层raft之间的通信模块

© 2019·蜀ICP备18036663号-1 · 本页总阅读量 · 本站总访问量 · 本站总访客数