请教一个关于并发控制的问题

101 天前
 tuoov

现在有这样一个函数 processBatch ,负责读取数据,执行一些操作后再更新它们,相关的数据库操作都在事务内执行。伪代码如下:

function processBatch():
    tx = db.beginTransaction()
    // 1. 批量读取:取出最多 N 条“待处理”数据
    items = tx.query("SELECT * FROM tasks WHERE status = 'PENDING' LIMIT N")
    
    for item in items:
        // 2. 业务处理
        doBusinessLogic(item)
        // 3. 更新状态
        tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
    tx.commit()
// 线程 A
spawn threadA:
    processBatch()

// 线程 B (几乎同时执行)
spawn threadB:
    processBatch()

但由于 processBatch 在多个地方都会被调用,因此存在并发问题。线程 A 和线程 B 执行时可能查询到同一批数据,导致这批数据被处理两次。解决这个问题有两个方案:

我的问题是:

  1. 哪个方案更符合最佳实践?原因是什么
  2. 在保持 processBatch 会被多个地方调用不变的前提下,有没有更好的方案?
  3. 如果想学习这类并发相关的问题和解决方案,应该搜索什么关键词

感谢各位赐教

3697 次点击
所在节点    数据库
34 条回复
wxyz
101 天前
感觉是外部事务的颗粒度太大了,
一次查询多条数据,但没有立即更新该批次数据的状态,肯定会导致查询到重复的数据的;
建议增加一层内存或缓存级别的互斥锁,锁任务的 id 以及一个任务一个事务,这样可以保证一个任务每次只有一个线程处理。
wyntalgeer
101 天前
噗……不会并发执行的多线程?
luckyrayyy
101 天前
互联网公司的习惯一般并发控制都放在业务逻辑上,不太依赖数据库,就是用方案 A ,当然你数据库还是需要设置合理的隔离级别。在你的业务场景里,遇到并发,没抢到锁的线程是等待,还是直接返回报错不处理了,如果是前者的话,可以把所有的处理逻辑放到一个有序队列里,依次执行。
kanepan19
101 天前
简单的很

for item in items:
//乐观锁
boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)
if(!flag){
// 抛异常
}
// 2. 业务处理
doBusinessLogic(item)
// 3. 更新状态
tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)
tx.commit()
kanepan19
101 天前
接上面的 加一个状态,处理中
lvlongxiang199
101 天前
` tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ?", item.id)` -> ` tx.execute("UPDATE tasks SET status = 'DONE' WHERE id = ? and status = 'PENDING' ", item.id)`

`processBatch` 也可以想办法做成串行的
kanepan19
101 天前
boolean flag = tx.execute("UPDATE tasks SET status = 'processing' WHERE id = ? and status = 'PENDING' ", item.id)

多线程 只有一个成功。
是否需要抛异常,看业务决定
Georgedoe
101 天前
3. 哥们不会还没用过 deepseek 和 chatgpt 吧
siweipancc
101 天前
不要在数据库玩这个……
hwdq0012
101 天前
1.读写分离,写放队列执行
2.读到的数据可能会重复,处理掉,用 etcd ,zookeeper, rides 之类的
3.没写过后端,轻喷
rekulas
101 天前
加锁不够优雅,不如消息队列,不过数据少用不上队列,但你可以用生产者消费者的思维来改写,保证只有一个生产者就行了
vikaptain
101 天前
悲观锁、乐观锁、队列
unused
101 天前
除了上面提到的,还可以考虑用某种查询条件对数据分区,让不同线程查询到不同的数据
yinmin
101 天前
方案 A 和方案 B 都有问题。

如果 doBusinessLogic 是 IO 密集型,推荐使用 ThreadPoolExecutor 。操作步骤如下:

1. 设置 ThreadPoolExecutor 并发 workers ,例如 workers=5 也就是有 5 个并发同时处理;
2. 函数 processBatch:将 pending 的任务 ID submit 到 ThreadPoolExecutor 队列里;
3. ThreadPoolExecutor workers 处理:先 update tasks set status='RUNNING' where id=? and status='PENDING',如果返回更新记录数=0 ,就直接 return 不处理; (这是一个防冲突的技巧),再 doBusinessLogic(id),然后 update tasks set status='DONE' where id=?

好处是:突发高并发时,任务是加入到队列的,不会挤爆服务器;可以设置并发 workers 同时处理。
Ayanokouji
100 天前
这不是并发的问题,是设计的问题。
简单点,给 processBatch 加一个 start_time 和 end_time 参数,保证查到不一样的数据。
mooyo
100 天前
两阶段乐观锁吧,

第一阶段先拿一部分,从 Waiting 改成 Running ,拿到了再去执行。
sagaxu
100 天前
SELECT 取出 id 列表,遍历的时候按照随机顺序(如果业务逻辑允许),用 SELECT FOR UPDATE 锁住每一行,检测状态,把 PENDING 更新为 PROCESSING ,处理完成后再更新为 DONE 。这里要有一个机制,把停留在 PROCESSING 超时的任务重新放回 PENDING 或者标记为 DONE 。

如果类似任务比较多的话,可以引入一个任务调度系统,别自己搞了,要填的坑和细节非常多。
encounter2017
100 天前
这完全没必要用锁,是设计的问题,流程调整下就好了。
我理解你这块是离线的业务对吧。

首先 select id from tasks where status = 'PENDING' 拿出全量需要处理的数据,做成一个离线文件或者放内存里(看你自己的数据规模决定)
接着实现一个缓冲,简单点可以在内存里面构造一个比如长度为 16 的队列,存放下一批需要处理的数据

然后是设置并发度,比如说 4 ,这一块你用线程/纤程/进程 实现都可以,依次从队列里面取任务,队列空了在获取下一批数据到队列里面。

这一块自己实现细节还挺多的,比如任务失败了如何重跑,需不需要做背压之类的。

我之前做过类似的,用框架实现,对应的代码就很简洁,伪代码类似这样

```
ids.toStream.buffer(16).mapPar(4)(row => processData(row))
```
EMMMMMMMMM
100 天前
你就不能给任务分一下片吗?
线程 1:WHERE status = 'PENDING' and id % 2 = 0
线程 2:WHERE status = 'PENDING' and id % 2 = 1

说实话, 在互联网干了七八年了,多线程的代码屈指可数,都是多进程
Romic
100 天前
1. 使用分布式锁 性能一般
2. 使用数据库的乐观锁 加 version 开发成本太高,需要多维护一个 version 字段
3. 推荐方案,将数据分批打散,比如 1000 条数据 2 个现成并行执行,那么 1-500 是线程 A 执行。501-1000 线程 B 执行。经典方案。
话说这种问题直接丢给 ai ,很多方案。以前的 deepseek R1 模型的方案完整 准确率高。现在好像不行啦。
gg 思密达。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://ex.noerr.eu.org/t/1127560

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX