[翻译] 使用 Django model 锁处理并发
造成并发问题的原因:
假设我们的 model 定义如下:
class Account(models.Model):
id = models.AutoField(
primary_key=True,
)
user = models.ForeignKey(
User,
)
# 资产
balance = models.IntegerField(
default=0,
)
并且有如下的逻辑操作:
# 入金
def deposit(self, amount):
self.balance += amount
self.save()
# 出金
def withdraw(self, amount):
if amount > self.balance:
raise errors.InsufficientFunds()
self.balance -= amount
self.save()
如果是串行,上面的逻辑不会出什么问题,但是如果是并行的话,就有问题了:
假设现在有 user A 和 user B 同时对 balance(初始值为 100) 进行操作
- user A 获取到 Account obj,此时 balance 为 100
- user B 获取到 Account obj,此时 balance 为 100
- user A 进行 deposit(50),此时 balance 为 100 + 50 = 150
- user B 进行 withdraw(30),此时 balance 为 100 - 30 = 70
user A 存入了 50,user B 拿出了30,我们期望的结果是 120,最后结果是 70,所以问题就来了。由于并发的原因,B 进行操作的时候,获取到的 Account obj 与 A 是相同的(在 A 获取到 Account obj 但尚未对其进行操作保存时),所以 A 进行的操作对 B 并没有什么影响。
解决方法
悲观锁
即在请求对资源进行操作的时候,锁住要操作的资源,直到整个操作结束则释放锁。
在这里推荐用数据库级别的锁,因为数据库能很好地管理锁处理并发,而且低层面的锁不会因为其他进程或者线程的操作受到影响,另外 Django App 可以跑多进程,维护 App 级别的锁工作量非常大且复杂。
对于 Django 我们可以用 select_for_update() 的方式锁住我们要操作的 obj(对于数据库则是锁住要操作的行) 如:account_obj = Account.objects.select_for_update().get(id=1),这样获取到的 obj 就是有带锁的,其他请求想要对这个资源进行访问需要等待这个 obj 完成才能访问。
参见文档:https://docs.djangoproject.com/en/dev/ref/models/querysets/#select-for-update
这里有几点需要注意:
- 需要在原子操作(atomic)中使用
select_for_update()(原子操作结束才释放锁) - 若不需要等待可以带上
nowait=True的参数,遇到冲突的资源直接 raise DatabaseError - 若同时使用了
select_related,则相关资源也会加上锁
上锁代码:
@classmethod
def deposit(cls, id, amount):
with transaction.atomic():
account = (
cls.objects
.select_for_update()
.get(id=id)
)
account.balance += amount
account.save()
return account
@classmethod
def withdraw(cls, id, amount):
with transaction.atomic():
account = (
cls.objects
.select_for_update()
.get(id=id)
)
if account.balance < amount:
raise errors.InsufficentFunds()
account.balance -= amount
account.save()
return account
乐观锁
即在获取资源的时候并不会对相关资源上锁,而是加上一个字段 version,在入库的时候通过判断 version 保证资源更新不冲突。
举个例子:
数据表中有 version 字段,要对某一资源请求的时候,数据需要带上 version,然后用 version 查询数据库,此时若有两个并发请求,获取到的 obj 应该是一样的,然后这两个请求同时修改数据,并将 version 自增 1,在保存的时候,也先查询数据库中存不存在 POST 过来的 version,存在则 save,不存在说明该 version 已经被别人先修改了,version 应该已经增加 1,所以返回修改失败。
用 Django 表达就是 Model.Object.filter(version=version).update(data=data, version=version + 1)
返回 1 代表 update 了一行,0 代表失败
这里也有几点需要注意:
- 需要添加多一个字段,并增加逻辑判断
- 如果有很多并发操作,会造成很多请求失败,因为他会返回操作失败而不是像悲观锁那样会等待请求
- 需要注意在程序外对数据进行操作可能会造成冲突,比如在 Django shell 中,若没加上逻辑判断对 model 进行更新
- 由于不像悲观锁有
no_wait = False可以实现冲突等待,所以使用乐观锁可能需要自己实现失败重试机制
上锁代码:
def deposit(self, id, amount):
updated = Account.objects.filter(
id=self.id,
version=self.version,
).update(
balance=balance + amount,
version=self.version + 1,
)
return updated > 0
def withdraw(self, id, amount):
if self.balance < amount:
raise errors.InsufficentFunds()
updated = Account.objects.filter(
id=self.id,
version=self.version,
).update(
balance=balance - amount,
version=self.version + 1,
)
return updated > 0
什么时候用乐观锁,什么时候用悲观锁
- 如果操作对象有许多并发更新(写操作多),用悲观锁可能更好;操作对象读操作多写操作少,用乐观锁更好
- 如果更新发生在程序之外(例如,直接在数据库中或 manage shell 中),悲观锁更安全
- 如果接口被远程应用或操作系统调用并且有副作用,需要请确保它们的操作安全。比如调用是否会花很长时间导致超时?每次调用是否能保证幂等?
原文出处: https://medium.com/@hakibenita/how-to-manage-concurrency-in-django-models-b240fed4ee2