Distributed systems use etcd as a consistent key-value store for configuration management, service discovery, and coordinating distributed work. Many organizations use etcd to implement production systems such as container schedulers, service discovery services, and distributed data storage. Common distributed patterns using etcd include leader election, distributed locks, and monitoring machine liveness.
// Output: // acquired lock for s1 // cannot acquire lock for s2, as already locked in another session // released lock for s1 // acquired lock for s2 }
// NewSession gets the leased session for a client. // 创建一个 带有租约 的会话 funcNewSession(client *v3.Client, opts ...SessionOption) (*Session, error)
// 创建 实现锁的 对象 Mutex // Mutex implements the sync Locker interface with etcd funcNewMutex(s *Session, pfx string) *Mutex
// Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. // // 如果锁被其他 session 持有,则会一直阻塞到能获取到锁 func(m *Mutex) Lock(ctx context.Context) error
// TryLock locks the mutex if not already locked by another session. // If lock is held by another session, return immediately after attempting necessary cleanup // The ctx argument is used for the sending/receiving Txn RPC. // // 如果锁被其他 session 持有,则不会阻塞,而是直接返回 func(m *Mutex) TryLock(ctx context.Context) error
// session 会话选项 type sessionOptions struct { ttl int// time to live leaseID v3.LeaseID // 租约id
ctx context.Context }
// Session represents a lease kept alive for the lifetime of a client. // Fault-tolerant applications may use sessions to reason about liveness. type Session struct { client *v3.Client // etcd 客户端对象 opts *sessionOptions // session 选项 id v3.LeaseID // 租约id
// Lock locks the mutex with a cancelable context. If the context is canceled // while trying to acquire the lock, the mutex tries to clean its stale lock entry. func(m *Mutex) Lock(ctx context.Context) error { // 尝试加锁(核心) resp, err := m.tryAcquire(ctx) if err != nil { return err }
// if no key on prefix / the minimum rev is key, already hold the lock // 持有锁 session 中 pfx 的 kvs,即:当前是谁持有的这把锁 ownerKey := resp.Responses[1].GetResponseRange().Kvs // ① 如果还没有持有这把锁,则 len(ownerKey) == 0 // ② 如果持有锁的人是自己,则 ownerKey[0].CreateRevision == m.myRev // // 只要符合上面2个条件,就说明是自己获得了锁 iflen(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev { m.hdr = resp.Header // 获得了锁,直接返回 returnnil }
client := m.s.Client() // wait for deletion revisions prior to myKey // TODO: early termination if the session key is deleted before other session keys with smaller revisions.
// make sure the session is not expired, and the owner key still exists. // 再次确认一下 session 还未过期 gresp, werr := client.Get(ctx, m.myKey) if werr != nil { m.Unlock(client.Ctx()) return werr }
iflen(gresp.Kvs) == 0 { // is the session key lost? return ErrSessionExpired } m.hdr = gresp.Header
// put self in lock waiters via myKey; oldest waiter holds lock // (重要)在这里可以看到 租约id 与 m.myKey 进行绑定 put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC // 看到这里是 m.pfx,即:key前缀,而不是完整的key // v3.WithFirstCreate() 表示获取最先创建的,即:Revision最小的 getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
// cmp 为判断条件 // put 为创建 key // get 为查询 key // getOwner 为查询持有key前缀 m.pfx 的最小的 Revision 信息