发布于 

基于 Redis 实现分布式锁 🔒 - Golang

基于 Redis 的分布式锁,确保计划任务只由一个实例执行

分布式锁

分布式锁是一种在分布式系统中使用的锁机制,用来确保多个节点在执行关键代码或访问共享资源时互斥,避免并发冲突。分布式锁的目的是保证在一个多节点的分布式环境中,某个共享资源(如数据库记录、缓存对象、任务队列等)在同一时刻只能由一个节点操作。

普通锁的 作用对象是线程或进程,而分布式锁的作用对象是 跨节点的实例

分布式锁使用案例

  • 领导者选举:在分布式系统中,经常需要选举一个领导节点来协调行动或管理资源。分布式锁可用于确保在任何给定时间内只有一个节点成为领导者。
  • 任务调度:在分布式任务调度器中,分布式锁可确保计划任务只由一个工作节点执行,防止重复执行。
  • 资源分配:在管理共享资源(如文件系统、网络套接字或硬件设备)时,分布式锁可确保每次只有一个进程能访问资源,从而防止冲突并确保妥善的资源管理。
  • 微服务协调:当多个微服务需要执行协调操作(如更新不同数据库中的相关数据)时,分布式锁可确保这些操作以受控和有序的方式执行。
  • 库存管理:在电子商务平台中,分布式锁可以管理库存更新,确保在多个用户同时尝试购买同一商品时,库存水平得到准确维护。
  • 会话管理:在分布式环境中处理用户会话时,分布式锁可确保用户会话一次只被一台服务器修改,防止出现不一致的情况。

分布式锁的实现

分布式锁可以使用各种工具和框架来实现,如:

  • ZooKeeper:主要用于提供分布式协调服务,包括配置管理、分布式锁、领导者选举等
  • Redis:广泛用于缓存、会话管理等场景,通过 SETNX 或 Redlock 算法支持分布式锁
  • Consul:用于服务发现、配置管理和服务网格的工具,除了服务网格功能,Consul 也提供分布式锁机制
  • Etcd:分布式键值存储系统,主要用于存储配置数据、提供分布式协调等功能

基于 Redis 实现

Lua 脚本

使用 Lua 脚本是为了确保在 Redis 中的操作 原子性,特别是在分布式锁场景中,Lua 脚本可以确保多个 Redis 命令在同一个执行周期内被原子地执行,避免并发问题(被其他客户端的操作打断)。

  • 原子性保证:Lua 脚本内部的多个操作会在一个事务中完成,保证不会有其他客户端插入的命令破坏执行顺序。
  • 操作灵活性:Lua 脚本中可以包含复杂的逻辑,如条件判断、循环、计算等,因此可以实现比单独使用 Redis 命令更复杂的操作。
  • 高效性:由于脚本中的命令是直接在 Redis 服务器上执行的,减少了客户端与 Redis 之间的通信开销。

Redis 的 pipeline 是一种在客户端一次性发送多条命令给 Redis,然后批量执行和返回结果的模式。它的主要目的是减少客户端与 Redis 之间的通信次数,提高执行效率。Pipeline 模式无原子性保证,每条命令依旧是独立执行的,其他客户端可以在 pipeline 执行过程中插入命令,因此如果涉及多个操作的状态依赖,就可能导致不一致性。

特性 Pipeline Lua 脚本
原子性 无法保证原子性,操作可能被其他命令打断 保证原子性,所有操作在同一事务中执行
执行方式 批量发送命令,但 Redis 逐个顺序执行 将多个命令封装在一个脚本中,原子执行
并发操作 支持高效批量并发操作 适合需要多个命令组合原子操作的场景
复杂逻辑 只能执行单个命令,无条件判断、循环等复杂逻辑 支持复杂的逻辑,如条件判断、循环等
使用场景 批量执行独立的、无依赖的操作 原子操作、多步骤逻辑的复杂操作
性能 减少了客户端与 Redis 的网络通信开销 减少了网络开销,同时避免了操作间的并发问题
常见应用 批量写入、批量获取、批量修改 分布式锁、事务性操作、多步条件逻辑

在 Lua 脚本中,KEYSARGV 是由外部传入的参数:

  • KEYS 用于传递键名,通常用于指定需要操作的 Redis 键
  • ARGV 用于传递附加参数,如锁的值和过期时间等

在客户端(例如 Go 代码)调用 Redis 的 EVAL 命令时,可以将键名和参数以列表形式传入,KEYSARGV 会相应地映射为脚本中的输入

SETNX

不直接使用 SETNX 指令,而是通过 Lua 脚本完成锁的获取与过期设置,因为 Lua 脚本保证多个操作的原子性,可以提供更复杂的控制逻辑,包括锁的可重入和延长过期时间,避免 SETNXEXPIRE 分开执行导致的死锁风险。使用 Lua 脚本的优势:

  • 原子性检查与更新:Lua 脚本可以在同一个执行流中完成检查锁的持有者和更新锁的操作,避免了分布式环境下的竞态问题
  • 可重入锁:代码中的 Lua 脚本实现了锁的可重入逻辑。如果当前客户端已经持有锁,那么它可以通过脚本延长锁的过期时间,这对于某些需要长时间运行并延长锁持有时间的场景是非常有用的

在 Redis 2.6.12 及以上版本,SET 命令可以通过带多个参数实现和 SETNX 类似的功能,同时设置过期时间

加锁

Lua 脚本解析

1
2
3
4
5
6
7
8
9
10
11
12
13
local val = redis.call('get', KEYS[1])
-- 获取当前锁的持有者
if val == false then
-- key 不存在,表示锁没有被持有,进行加锁
return redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[2])
elseif val == ARGV[1] then
-- 锁的持有者是当前客户端,刷新过期时间,支持可重入
redis.call('expire', KEYS[1], ARGV[2])
return "OK"
else
-- 锁被其他客户端持有,返回空字符串
return ""
end
  • **KEYS[1]**:传入的锁的键名,表示锁的唯一标识
  • **ARGV[1]**:表示锁的值,用于标识持有锁的客户端,通常是一个唯一的字符串
  • **ARGV[2]**:锁的过期时间,单位是秒,用于防止死锁的发生

Go 代码解析

ClientLock 结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
type Client struct {
client redis.Cmdable // Redis 客户端接口
g singleflight.Group // 用于防止缓存击穿的工具,可以让并发请求变成单个请求
valuer func() string // 用于生成唯一的值,确保锁的持有者唯一(通常是 UUID)
}
type Lock struct {
client redis.Cmdable // 持有的 Redis 客户端,用于操作锁
key string // 锁的键,用于标识锁
value string // 锁的值,用于标识持有者,避免误释放其他客户端的锁
expiration time.Duration // 锁的过期时间
unlock chan struct{} // 用于信号化锁的释放操作,退出自动续约逻辑
signalUnlockOnce sync.Once // 确保解锁操作只执行一次,防止重复释放
}

Lock 方法实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
type RetryStrategy interface {
// Next 返回下一次重试的间隔,如果不需要继续重试,那么第二参数发挥 false
Next() (time.Duration, bool)
}

type FixIntervalRetry struct {
// 重试间隔
Interval time.Duration
// 最大次数
Max int
cnt int
}

func (f *FixIntervalRetry) Next() (time.Duration, bool) {
f.cnt++
return f.Interval, f.cnt <= f.Max
}

func (c *Client) Lock(ctx context.Context, key string, expiration time.Duration, retry RetryStrategy, timeout time.Duration) (*Lock, error) {
val := c.valuer() // 生成唯一的锁值,标识当前客户端
var timer *time.Timer
defer func() {
if timer != nil {
timer.Stop() // 在退出之前停止定时器,避免资源泄露
}
}()
for {
lctx, cancel := context.WithTimeout(ctx, timeout) // 为每次获取锁尝试设置超时
res, err := c.client.Eval(lctx, luaLock, []string{key}, val, expiration.Seconds()).Result()
cancel()
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
// 非超时错误,比如 Redis server 崩溃、网络异常等,不再继续尝试
return nil, err
}
if res == "OK" {
// 锁获取成功,返回锁对象
return newLock(c.client, key, val, expiration), nil
}
interval, ok := retry.Next()
if !ok {
// 如果重试次数耗尽,返回错误
if err != nil {
err = fmt.Errorf("最后一次重试错误: %w", err)
} else {
err = fmt.Errorf("锁被人持有: %w", ErrFailedToPreemptLock)
}
return nil, fmt.Errorf("rlock: 重试机会耗尽,%w", err)
}
// 设置重试的等待时间
if timer == nil {
timer = time.NewTimer(interval)
} else {
timer.Reset(interval)
}
// 等待重试或者上下文结束
select {
case <-timer.C: // 定时器到达,表示重试间隔已过,可以再次尝试获取锁
case <-ctx.Done(): // 上下文 ctx 被取消
return nil, ctx.Err()
}
}
}

解锁

Lua 脚本解析

1
2
3
4
5
6
7
if redis.call("get", KEYS[1]) == ARGV[1]
then
-- 如果锁的值与传入的值一致,表示当前客户端持有该锁
return redis.call("del", KEYS[1]) -- 删除键,释放锁
else
return 0 -- 锁未被当前客户端持有,返回 0
end

Go 代码解析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (l *Lock) Unlock(ctx context.Context) error {
// 使用 Lua 脚本进行解锁操作,保证原子性
res, err := l.client.Eval(ctx, luaUnlock, []string{l.key}, l.value).Int64()

defer func() {
// 确保只进行一次解锁信号操作,防止重复解锁引起 panic
l.signalUnlockOnce.Do(func() {
l.unlock <- struct{}{} // 向 unlock 通道发送信号
close(l.unlock) // 关闭 unlock 通道
})
}()

if err == redis.Nil {
// 锁不存在或者锁的值与预期不符,表示当前锁不属于调用者
return ErrLockNotHold
}
if err != nil {
// Redis 操作出错,返回错误
return err
}
if res != 1 {
// Lua 脚本执行结果不是 1,表示锁未被成功释放
return ErrLockNotHold
}
return nil // 成功释放锁
}

续约

约操作对于长时间运行的任务非常重要,以确保在任务未完成时锁不会意外过期导致其他客户端获得锁,从而引起竞态条件。

1
2
3
4
5
6
7
if redis.call("get", KEYS[1]) == ARGV[1]
then
-- 如果锁的值与传入的值一致,表示当前客户端持有该锁
return redis.call("expire", KEYS[1], ARGV[2]) -- 更新锁的过期时间
else
return 0 -- 锁未被当前客户端持有,返回 0 表示续约失败
end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func (l *Lock) Refresh(ctx context.Context) error {
// 使用 Lua 脚本执行锁续约操作,保证续约的原子性
res, err := l.client.Eval(ctx, luaRefresh,
[]string{l.key}, l.value, l.expiration.Seconds()).Int64()

if err != nil {
// Redis 操作出错,返回错误
return err
}
if res != 1 {
// Lua 脚本执行结果不是 1,表示锁续约失败,可能是锁不存在或者锁的值不匹配
return ErrLockNotHold
}
return nil // 成功续约锁
}

自动续约机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
func (l *Lock) AutoRefresh(interval time.Duration, timeout time.Duration) error {
// 创建一个定时器,每隔 interval 时间触发一次
ticker := time.NewTicker(interval)
// 刷新超时 channel,用于通知在发生超时时重试
ch := make(chan struct{}, 1)
defer func() {
// 确保在函数退出时停止定时器并关闭通道
ticker.Stop()
close(ch)
}()

for {
select {
case <-ticker.C:
// 定时器触发时创建一个带有超时时间的上下文,用于刷新锁
ctx, cancel := context.WithTimeout(context.Background(), timeout)
err := l.Refresh(ctx)
cancel()

// 如果刷新操作超时,则继续尝试
if err == context.DeadlineExceeded {
// 因为有两个地方可能要写入数据到 ch,而 ch 容量只有 1
// 如果写入不成功,则说明前一次调用超时且尚未被处理
select {
case ch <- struct{}{}:
default:
}
continue
}

// 如果刷新过程中出现了其他错误,则返回错误
if err != nil {
return err
}

case <-ch:
// 超时处理,重新尝试刷新锁
ctx, cancel := context.WithTimeout(context.Background(), timeout)
err := l.Refresh(ctx)
cancel()

// 如果仍然超时,则继续重试
if err == context.DeadlineExceeded {
select {
case ch <- struct{}{}:
default:
}
continue
}

// 处理其他错误
if err != nil {
return err
}

case <-l.unlock:
// 当检测到 unlock 信号时,退出续约逻辑,说明锁已经被释放
return nil
}
}
}

Redlock

Redlock 是一种 Redis 分布式锁实现方案,旨在提供比单实例 Redis 锁更高的 容错性和可靠性。Redlock 是由 Redis 的创造者提出的一种分布式锁算法,它通过在多个 Redis 节点(通常为 5 个)上进行锁的操作来实现分布式锁的可靠性。Redlock 的核心思想是将锁保存在多个独立的 Redis 实例中,以实现容错性:

  • 客户端需要在多数(即超过一半) Redis 实例上成功获取锁,才能认为锁获取成功。
  • 通过多实例的方案来避免单点故障的问题,如果部分 Redis 实例不可用,只要多数节点仍可用,系统依然可以正常工作。

在单实例实现中,通过 SET key value NX PX ttl 或通过 GET + DEL 来实现分布式锁的获取和释放,同时还使用了自动续约机制,保证锁在任务长时间运行期间不会过期。

  • 锁的获取是一个单实例上的简单原子操作
  • 锁的释放和续约使用了 Lua 脚本,确保在 Redis 服务器端以原子操作方式执行
  • 自动续约机制 用于延长锁的过期时间,确保在任务执行较长时间时不会因为锁的自然过期导致其他客户端意外获取到锁

Redlock 在设计时使用了一组步骤来确保锁的获取过程既高效又一致:

  1. 客户端按照顺序对每个 Redis 实例尝试加锁(通过 SET NX PX 操作),并设置一个较短的超时时间,确保获取锁操作不会阻塞
  2. 客户端必须在所有节点上尝试加锁的时间内成功获取到大多数节点的锁,才能认为获取锁成功
  3. 锁的超时时间应该比客户端操作的预计完成时间更长,以确保操作能够在锁过期之前完成
  4. 在所有操作完成后,客户端会尝试对所有节点释放锁

定时任务

本地

time

  • **time.Tick**:用于创建一个周期性的定时器,返回一个在固定时间间隔发送信号的通道。适用于需要周期性执行的任务,但需注意内存泄漏风险,建议使用 time.NewTicker 以确保资源的可管理性
  • **time.After**:用于创建一个延迟执行的定时器,在经过指定的时间后向通道发送信号,适用于一次性延迟或超时控制

cron

robfig/cron 基于时间调度器、时间解析器、Ticker 和 Goroutine 实现了类似 Unix cron 的功能。通过 Cron 表达式灵活地定义任务的执行时间,并使用 Go 的并发特性(Goroutine)来确保任务可以高效并发执行,广泛用于 Go 程序中需要定时任务调度的场景。

使用 在线工具 可以帮助验证 Cron 表达式是否符合预期

分布式

本地定时任务加上分布式锁可以实现分布式定时任务的效果。通过使用分布式锁机制,能够有效避免多个节点同时执行同一任务的问题,从而实现协调多个节点的执行,确保任务在集群环境下只执行一次。