最近项目中需要对锁的粒度进行优化,使用分布式锁的场景是:一个员工每天会进行作业,每次作业都会上报一条作业数据,其中有个 biz_time 字段表示作业实际发生的时间(系统会合并统计它多个作业任务的开始时间和结束时间,即最后数据表中会记录这个人当天的第一个任务的开始时间为 start_time ,最后一个任务的结束时间为 end_time ),但是上报作业数据是会发生乱序的,每次更新时间范围的左右区间不能发生并发覆盖更新。 当然系统需求会更加复杂,例如他的不同作业类型切换,需要终止当天的作业时长统计,并开启一段新作业的统计等等,先不用考虑。
现在是用乐观锁通过 version 控制的,由于并发量比较大,希望改造成分布式锁来实现单行数据的并发数据竞争。 预期能实现成[user_id + [start_time, end_time]]粒度,也就是某个人的作业时间在某个时间范围被锁住(类似数据库的间隙锁,锁定一个范围)。 但是没有思路怎么实现,在此请教一下各位大佬!
![]() |
1
namenone 3 天前
key 精细化到秒、分钟、小时,按照你的业务需求来定。
比如分钟 用户 A 10:01 ~ 用户 A 10:03 那就是 3 个锁的 key ,如果是秒那 key 的数量会爆炸; 优化就是布隆过滤器,时间 hash 过后存入布隆过滤器,但是可能存在,不一定存在。 |
2
ShawyerPeng OP 感谢思路,想问下这种分布式锁间隙锁的场景 有成熟的方案吗?
考虑到管理 key range 范围的秒级分片粒度锁的原子性获取和释放,感觉实现的复杂度还是挺大的。 另外,用布隆过滤器优化,感觉这几个地方还需要考虑: 1. 布隆过滤器的判断和加锁也是非原子性的 2. 布隆过滤器误判的场景:返回存在,但实际不存在锁。会导致误以为该间隙被锁。如果再去查 Redis ,这个操作在高并发场景也不能保证原子性吧? |
3
ShawyerPeng OP @namenone 感谢思路,想问下这种分布式锁间隙锁的场景 有成熟的方案吗?
考虑到管理 key range 范围的秒级分片粒度锁的原子性获取和释放,感觉实现的复杂度还是挺大的。 另外,用布隆过滤器优化,感觉这几个地方还需要考虑: 1. 布隆过滤器的判断和加锁也是非原子性的 2. 布隆过滤器误判的场景:返回存在,但实际不存在锁。会导致误以为该间隙被锁。如果再去查 Redis ,这个操作在高并发场景也不能保证原子性吧? |
4
MidGap 3 天前
"每次更新时间范围的左右区间不能发生并发覆盖更新" 这个为啥要加锁呢 不是取 min(start)和 max(end)吗 0.o
|
5
ShawyerPeng OP @MidGap 两个线程同时读到当前数据库的那一行 startTime = 100, end_time = 200
线程 A 上报的作业时间为 201 ,这时候想更新右区间(在线程 B 更新后才更新):update end_time = 201 此时线程 B 上报的作业时间为 202 ,B 先成功更新数据库右区间:update end_time = 202 。 线程 A 此时才去更新数据库 update end_time = 201 ,这时候把值 202 覆盖了,出现问题。 |
6
MidGap 3 天前
@ShawyerPeng 丢队列得了 哈哈
|
![]() |
7
zpfhbyx 3 天前
这个汇总不应该是详细日志 来反向出来的么, 单条记录入库,然后 max min
|
![]() |
9
crysislinux 3 天前 via Android
没确切理解你的需求,是需要锁住一个滑动窗口么,是的话可以参考 rate limit 的做法
|
10
ShawyerPeng OP @zpfhbyx 我们需要保证是近实时的分析,由于逻辑复杂(比如需要实现超时机制,一个人中间一直没有作业的休息间隔超过 20 分钟则需要终止这段作业时长统计;还需要有不同作业类型切换的顶替机制;还有时长跨天拆分机制,把原来的一条数据库记录一拆为二,保证单据绑定正确日期的工时)很难放到离线 HSQL 中实现正确的统计逻辑。
|
11
ShawyerPeng OP @crysislinux 感觉好像要解决的问题不太一样?我是想在高并发场景解决:来了一个值 c ,如果它在[a, b]窗口范围内,没拿到锁,则不允许更新滑动窗口的左区间或者右区间(如果 c < a 则更新为[c, b],如果 c > b 则更新为[a, c])。
因为我要防止并发更新,所以要锁住这个窗口范围。 |
12
emmmbu 3 天前
锁数据库那一行 id 不就得了,只允许一个线程拿到锁,然后查询并且更新 end_time
|
![]() |
13
YangQingLin 3 天前
我在想,是否可以利用数据库自身的锁来解决。把时间比大小推到数据库更新的时候计算,比如下面这样:
```sql UPDATE work_sessions SET start_time = LEAST(start_time, :new_biz_time), -- LEAST 是 SQL 函数,取最小值 end_time = GREATEST(end_time, :new_biz_time) -- GREATEST 是 SQL 函数,取最大值 WHERE user_id = :user_id AND date = :date; ``` 利用数据库本身的原子性来更新时间范围,是不是也能达到想要的结果? 对于休息间隔重新计算,我觉得可以增加一个缓存层,每次来新的 biz_time 先和缓存中的内容进行判断,超过 20 分钟或者跨天的话就使用 insert 新开一条记录。 如果上面这种方式也不合适的话,我觉得也可以从时间排序方面入手,你希望保证的是“近实时”的分析,那我这里假设分析结果可以延迟 1 分钟更新,那么使用一个以 biz_time 作为 key 的小顶堆的数据结构接收那些并发的请求,然后只有当堆顶的 biz_time 与当前相差超过 1 分钟之后才会取数据更新入库,这样可以把一分钟之内的数据乱序问题抹除。然后同样将时间戳大小对比丢给数据库更新的时候计算,不知道这样是否能满足你的需求。 |
14
ShawyerPeng OP @emmmbu select for update 在高并发场景下性能很差吧
|
15
emmmbu 3 天前
@ShawyerPeng #14 如果高并发锁竞争很激烈,那我觉得最好用队列,跟你说的这个范围锁还是怎么锁都无法解决
|
16
z1829909 3 天前 via Android
感觉方案上有些问题,加锁更复杂而且 io 变多。
不如直接所有改动都扔队列,然后消费的地方攒一批数据做合并,然后写入更新,这样 io 也会降低。 |
![]() |
17
fkdtz 3 天前
虽然不太能理解需求场景,但就上述提出的两个问题来说,可以限制在已有 [a, b] 区间的情况下,更新左区间时价格限制条件 where c < a ,右区间同理加个 where b < c ,也就是左右区间只能单调向左和向右移动。
但总觉得哪里有点怪,一个人每天开始结束作业会非常频繁吗?一秒钟开始结束几百次作业? 即使是极限情况下,一天也就 8 万多秒,考虑类似卖车票的逻辑,每一秒就是一个资源,锁一个区间其实是锁住这个区间内所有秒数,使用 bitmap 类型对 86400 个位做管理,在 lua 里完成这个批量操作,一次 IO 就能解决。 但这个场景是否真的需要用锁呢?锁从本质上来说就是将并行改为了串行,所以一切能用锁解决的,用队列一样可以解决同时更简洁复杂度更低,只不过区别在于给调用方的响应是同步的还是异步的。 |
18
dddd1919 3 天前 ![]() 所以你需要的是根据需要加锁的场景,为一组互斥条件设计一个一致性算法,使得这组条件的计算结果落在同一 key 上,然后为这个 key 加锁
|
![]() |
19
zdking08135 3 天前
大致能细化需求,用户会上报[user_id, start_time, end_time]一个三元组; start_time 和 end_time 可能互相会重叠。怎么保证对指定 user_id 的任务时间统计做近似实时查询。
问题 1:为什么用户的上报会有并发,同一个 user_id 会同时从多个来源上报? 问题 2:一个用户可以并发执行任务吗?即,会不会出现 B 任务的 start_time 在 A 任务的[s, e]区间内? 问题 3:任务的平均执行时长大约有多少?单用户一天上报平均多少次? |
20
ShawyerPeng OP @fkdtz 有一种比较恶心的场景就是,业务的单据模型是有主单和明细维度,作业数据的上报时机是主单状态为已完成(即所有的明细都为已完成时,当然,每个明细可能有各自的操作人和完成时间),所以要等到最后再一起上报。
业务方由于某些实现原因,无法一次请求批量上报,而是拆成每个明细进行上报。所以对本系统的消费者来说,有高并发更新的场景(一个主单可能有几百个明细)。 我也想到了 Batch Consume 攒一批消息批量消费,确实一定程度上解决了这种情况下的并发问题。 不过我在想有没有多种方式配合起来实现会更加完善。例如直接分布式锁/消息队列串行化。 您说的串行,在 RocketMQ 里的实现,可以使用有序消息实现。 对于 Producer 来说,自定义负载均衡策略,根据操作人 operator 字段做 partition key ,路由到固定的一个 Message Queue ; 对于 Consumer 实例来说,通过 MessageListenerOrderly 顺序消费的实现(包括:拉取消息时消费实例对 MessageQueue 加锁、消费消息时线程池中的线程对 MessageQueue 也加锁、对 ProccessQueue 也加锁保证 rebalance 了也要等到提交 offset 才能让新的消费者消费)。 但是有序消费消费失败会原地重试阻塞其他消息消费的特性,和我们的场景是有冲突了。 我们的目标只是为了尽量避免并发冲突,而不需要如此严格的有序性。串行化消费有更好的办法吗? |
21
ShawyerPeng OP @zdking08135 问题 1 ,我在楼上回复了~是业务方因为某些原因,举个例子把 1 点到 2 点之间的所有操作攒到一起最后都为完成状态时,才一起上报过来(还不是批量上报,而是拆分成明细行进行上报,消费方无法在一个请求中在内存中处理 N 个时间窗口的更新逻辑,当然了,MQ 可以攒一批消息进行批量消费)
问题 2:业务的异常 case ,代码逻辑会做异常处理,本问题可以忽略不考虑 问题 3 ,不同的作业类型不一样,可能 1 秒一个,也可能十几分钟才做完一个任务。 |
22
ShawyerPeng OP @z1829909 有道理,不过消息队列 by partition 串行化,可以不用这么重的有序消费功能实现吗?
|
![]() |
23
geebos PRO 你这个场景每人每天只会有一条 start_time, end_time 记录吧
|
![]() |
24
YangQingLin 2 天前
@ShawyerPeng by partition 又不需要在消息队列里面做,消息队列可以暂存数据,攒到一定量之后一起取出来,排序、分类、合并,这不是挺快的嘛,复杂度顶天了 O(N*log(N))
|
25
spritecn 2 天前
延迟队列..延个 5 分钟够用了
|
![]() |
26
LiaoMatt 2 天前
我的理解, 一天的时间是固定的, 粒度使用分钟, 那么需要 24 * 60 = 1440 分钟, 转换成 byte 除以 8 就是 180B, 每次更新把时间区间覆盖的 bit 置为 1, 等到所有数据更新完成后,在从左读最小值, 从右读最大值, 从而得到工作时长; 这样的好处是每次操作都是幂等, 不需要关心版本竞争问题
|
![]() |
27
sujin190 2 天前 via Android
看了半天似乎你这个不就是某个人开始结束时间更新不应该因为时序和并发问题覆盖回跳吧,不用那么纠结按人加锁就是了吧,我看你补充说可能操作的时候不上报结束统一上报,但也就是几百个而已
加锁要分析的是冲突率、单操作耗时和等待时长,然后以此计算线程连接各种资源是否能满足需要的并发,不是加锁就不能高并发吧 就你这也就计算更新个时间耗时这么低,按人加锁也没多大影响吧 |
![]() |
28
sujin190 2 天前 via Android
顺便说如果你用的是大家用挺多的 redis 分布式锁,因为 redis 协议原因,冲突率较高情况下性能会有较大下降,单个加锁延时在冲突率高情况下提高不少,也是需要考虑的
一般来说各种介绍分布式锁使用的文章说到的场景实际冲突率都低于百分之一吧,可以统计或者预估下冲突率在啥价格 如果冲突率实在高又并发不均衡就是有较大峰值,那还是好好的用 kafka 来搞吧 |
![]() |
29
litchinn 2 天前
不知道我看懂需求没,你是有一个消息流,每次要更新一个人每天的 start_time 和 end_time ,并保证在消息无序的情况下不会发送时间回跳对吗
用 ringbuffer 转到单线程操作,如果服务是多实例,那再考虑分布式锁 |
30
unused 2 天前
@ShawyerPeng #11 这个 a, b 哪来的,和 c 什么关联
|
![]() |
31
shigella 2 天前
看了半天。
因为不希望用户间存在相互阻塞的情况,所以 Kafka 按分区串行是不行的了。 因为需要处理跨天的情况,所以单纯的锁 user 粒度是不够的,所以才提出需要 user+时间范围。 最后我感觉还是#26 那种按天将最小时间粒度 bit 将提交的区间存起来,然后取结果的时候取头尾的办法最合适。 |
32
cnhongwei 2 天前
把锁分两个锁,实际的区间数据,可以放到 redis 中单个 value 中,或放到一个 set 中,因为没有一个原子的操作来一次完成更新与比较,所以另外放一个锁,先获取这个锁,对新的时间范围和现有的时间范围(初始为空)做比较,如果有重复,获取锁失败,释放第一个锁;如果比较没有重复,将新的时候范围和现有的时间合并,获取锁成功,释放第一个锁,并做下一步处理。
|
33
z1829909 2 天前 via Android
@ShawyerPeng 这里的重是指引入了消息队列,中间件变多,还是消息队列配套的消费代码重
|
![]() |
34
cloudzhou 2 天前
把思路转变一下,引入一个中心调度,分发 [user_id + [start_time, end_time]] job
收到的都可以处理,把锁竞争放在分发这一步 处理完成发送中心调度标记完成/失败,继续分发 对于业务开发,只要收到消息就可以处理,然后发送成功失败回调,不考虑锁 |
35
WithoutSugarMiao 2 天前
直接用逻辑来判断可以吗?比如 设置 redis 的 key 为 userid + 10:20-10:30 ,含义是 锁住该 user 今天的十点二十到十点三十这段?
|
![]() |
36
liuhan907 2 天前
你的分布式锁本身打算用什么实现?如果用 redis 这类自带脚本的东西那简单做一个按开始秒数作为 key 的有序集合,每次上锁用 lua 检查集合内是否有交集区间,有的话就失败让客户端等待,没有就写进去然后上锁成功。超时失败重试之类的都可以参考标准 redis 锁。感觉一小时就能搞完
|