造成并发问题的原因:

假设我们的 model 定义如下:

    class Account(models.Model):
        id = models.AutoField(
            primary_key=True,
        )
        user = models.ForeignKey(
            User,
        )
        # 资产
        balance = models.IntegerField(
            default=0,
        )

并且有如下的逻辑操作:

    # 入金
    def deposit(self, amount):
        self.balance += amount
        self.save()
    # 出金
    def withdraw(self, amount):
        if amount > self.balance:
            raise errors.InsufficientFunds()
        self.balance -= amount
        self.save()

如果是串行,上面的逻辑不会出什么问题,但是如果是并行的话,就有问题了:
假设现在有 user A 和 user B 同时对 balance(初始值为 100) 进行操作

  1. user A 获取到 Account obj,此时 balance 为 100
  2. user B 获取到 Account obj,此时 balance 为 100
  3. user A 进行 deposit(50),此时 balance 为 100 + 50 = 150
  4. user B 进行 withdraw(30),此时 balance 为 100 - 30 = 70

user A 存入了 50,user B 拿出了30,我们期望的结果是 120,最后结果是 70,所以问题就来了。由于并发的原因,B 进行操作的时候,获取到的 Account obj 与 A 是相同的(在 A 获取到 Account obj 但尚未对其进行操作保存时),所以 A 进行的操作对 B 并没有什么影响。

解决方法

悲观锁

即在请求对资源进行操作的时候,锁住要操作的资源,直到整个操作结束则释放锁。

在这里推荐用数据库级别的锁,因为数据库能很好地管理锁处理并发,而且低层面的锁不会因为其他进程或者线程的操作受到影响,另外 Django App 可以跑多进程,维护 App 级别的锁工作量非常大且复杂。

对于 Django 我们可以用 select_for_update() 的方式锁住我们要操作的 obj(对于数据库则是锁住要操作的行) 如:account_obj = Account.objects.select_for_update().get(id=1),这样获取到的 obj 就是有带锁的,其他请求想要对这个资源进行访问需要等待这个 obj 完成才能访问。
参见文档:https://docs.djangoproject.com/en/dev/ref/models/querysets/#select-for-update

这里有几点需要注意:

  1. 需要在原子操作(atomic)中使用 select_for_update() (原子操作结束才释放锁)
  2. 若不需要等待可以带上 nowait=True 的参数,遇到冲突的资源直接 raise DatabaseError
  3. 若同时使用了 select_related,则相关资源也会加上锁

上锁代码:

    @classmethod
    def deposit(cls, id, amount):
       with transaction.atomic():
           account = (
               cls.objects
               .select_for_update()
               .get(id=id)
           )
          
           account.balance += amount
           account.save()
        return account
    
    @classmethod
    def withdraw(cls, id, amount):
       with transaction.atomic():
           account = (
               cls.objects
               .select_for_update()
               .get(id=id)
           )
          
           if account.balance < amount:
               raise errors.InsufficentFunds()
           account.balance -= amount
           account.save()
      
       return account

乐观锁

即在获取资源的时候并不会对相关资源上锁,而是加上一个字段 version,在入库的时候通过判断 version 保证资源更新不冲突。

举个例子:
数据表中有 version 字段,要对某一资源请求的时候,数据需要带上 version,然后用 version 查询数据库,此时若有两个并发请求,获取到的 obj 应该是一样的,然后这两个请求同时修改数据,并将 version 自增 1,在保存的时候,也先查询数据库中存不存在 POST 过来的 version,存在则 save,不存在说明该 version 已经被别人先修改了,version 应该已经增加 1,所以返回修改失败。
用 Django 表达就是 Model.Object.filter(version=version).update(data=data, version=version + 1)
返回 1 代表 update 了一行,0 代表失败

这里也有几点需要注意:

  1. 需要添加多一个字段,并增加逻辑判断
  2. 如果有很多并发操作,会造成很多请求失败,因为他会返回操作失败而不是像悲观锁那样会等待请求
  3. 需要注意在程序外对数据进行操作可能会造成冲突,比如在 Django shell 中,若没加上逻辑判断对 model 进行更新
  4. 由于不像悲观锁有 no_wait = False 可以实现冲突等待,所以使用乐观锁可能需要自己实现失败重试机制

上锁代码:

    def deposit(self, id, amount):
       updated = Account.objects.filter(
           id=self.id,
           version=self.version,
       ).update(
           balance=balance + amount,
           version=self.version + 1,
       )
       return updated > 0
    def withdraw(self, id, amount):       
       if self.balance < amount:
           raise errors.InsufficentFunds()
      
       updated = Account.objects.filter(
           id=self.id,
           version=self.version,
       ).update(
           balance=balance - amount,
           version=self.version + 1,
       )
      
       return updated > 0

什么时候用乐观锁,什么时候用悲观锁

  • 如果操作对象有许多并发更新(写操作多),用悲观锁可能更好;操作对象读操作多写操作少,用乐观锁更好
  • 如果更新发生在程序之外(例如,直接在数据库中或 manage shell 中),悲观锁更安全
  • 如果接口被远程应用或操作系统调用并且有副作用,需要请确保它们的操作安全。比如调用是否会花很长时间导致超时?每次调用是否能保证幂等?

原文出处: https://medium.com/@hakibenita/how-to-manage-concurrency-in-django-models-b240fed4ee2