首页 Android 正文
  • 本文约5717字,阅读需29分钟
  • 77
  • 0

基于协程的 Android 事务框架设计

摘要

背景 数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。 “要么全部成功,要么全部失败,那么在失败后,对于已成功的操作,是如何回滚的呢?” 数据库系统通常使用日志 (log) 来实现事务的回滚。在事务执行过程...

背景

数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。

“要么全部成功,要么全部失败,那么在失败后,对于已成功的操作,是如何回滚的呢?” 数据库系统通常使用日志 (log) 来实现事务的回滚。在事务执行过程中,数据库会记录所有对数据的修改操作到日志中。当事务需要回滚时, 数据库会读取日志, 并执行逆向操作来撤销之前的修改。这种逆向操作被称之为“补偿操作(compensating operation)”

Android 使用思考

Transaction 在数据库操作中可以省去很多不必要的问题,那在 Android 开发中,我们如何更方便的使用呢?想要方便就需要尽可能多的减少辅助代码。那下面几个问题是我们需要着重考虑的。

如何简化补偿操作的辅助代码?数据如何传递?Transaction 嵌套场景同步/异步的处理Interceptor 设计Lifecycle 设计
接下来我们思考一个方便好用的 API 应该如何设计

1.New transaction

事务封装了一组 DSL 操作。要使用默认参数创建和执行事务,只需将函数块传递给transaction函数:

transaction {
    // do something here...
}

事务在当前线程上同步执行,因此它们将阻塞应用程序的其他部分!如果您需要异步执行事务,可以参考章节「使用协程」。

2.Transaction 返回值

transaction 支持直接返回值:

val result = transaction {
    "some data"
}
println(result) // some data

3.使用嵌套事务

默认情况下,嵌套transaction块共享其父transaction块的交易资源,因此对子块的任何影响都会影响父块:

在这种场景下,事务A/B/C均为同一个事务。如果想要在一个事务中开启新事务,可以移步「使用协程」部分。

val finalResult = transaction { // 事务A
    var result = 0
    result += transaction { // 事务B
        1
    }
    result += transaction { // 事务C
        2
    }
    result
}
println(result) // 3

4.使用协程

Transaction 支持指定 CoroutineContext 来使用协程,本质上,这个框架可以基于协程和 ThreadLocal 去封装的。

目前一共支持四种方式去使用协程去开启一个事务。

4.1 suspendedTransaction

阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。

runBlocking {
    val result = suspendedTransaction(Dispatchers.IO) {
        println("Transaction # ${this.id}") // Transaction # 3
        1
    }
    println("Result: $result") // Result: 1
}

在嵌套场景执行时,如果在其他事物中使用 suspendedTransaction,则使用之前的事务。

runBlocking {
    val result = suspendedTransaction(Dispatchers.IO) { // 事务A
        suspendedTransaction { // 事务B
            // do something
            suspendedTransaction { // 事务C
                // do something
            }
        }
    }
}
在这种情况下:

事务 A (outerTransaction = null) = 事务B = 事务C

4.2 newSuspendedTransaction

阻塞当前线程,协程开启一个新的事务,如果当前存在事务,则新建一个。

runBlocking {
    val result = newSuspendedTransaction(Dispatchers.IO) {
        println("Transaction # ${this.id}") // Transaction # 3
        1
    }
    println("Result: $result") // Result: 1
}

在嵌套场景执行时,如果在其他事务中使用newSuspendedTransaction,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。

runBlocking {
    val result = newSuspendedTransaction(Dispatchers.IO) { // 事务A
        newSuspendedTransaction { // 事务B
            // do something
            newSuspendedTransaction { // 事务C
                // do something
            }
        }
    }
}
在这种情况下:

事务 A (outerTransaction = null)
    └─ 事务 B (outerTransaction = 事务 A)
        └─ 事务 C (outerTransaction = 事务 B)

4.3 suspendedTransactionAsync

异步执行,返回值:Deferred,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。

runBlocking {
    val result = suspendedTransactionAsync(Dispatchers.IO) {
        println("Transaction # ${this.id}") // Transaction # 3
        1
    }.await()
    println("Result: $result") // Result: 1
}

在嵌套场景执行时,如果在其他事物中使用suspendedTransactionAsync,则使用之前的事务。

runBlocking {
    val result = suspendedTransactionAsync(Dispatchers.IO) { // 事务A
        suspendedTransactionAsync { // 事务B
            // do something
            suspendedTransactionAsync { // 事务C
                // do something
            }
        }
    }
}
在这种情况下:

事务 A (outerTransaction = null) = 事务B = 事务C

4.4 newSuspendedTransactionAsync

异步执行,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则新建一个事务。

runBlocking {
    val result = newSuspendedTransactionAsync(Dispatchers.IO) {
        println("Transaction # ${this.id}") // Transaction # 3
        1
    }.await()
    println("Result: $result") // Result: 1
}

在嵌套场景执行时,如果在其他事物中使用newSuspendedTransactionAsync,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。

建议异步处理时,遇到协程嵌套场景,请保重内部的逻辑都执行完毕,否则会导致生命周期的顺序不能得到保障。

runBlocking {
    val result = newSuspendedTransactionAsync(Dispatchers.IO) { // 事务A
        newSuspendedTransactionAsync { // 事务B
            // do something
            newSuspendedTransactionAsync { // 事务C
                // do something
            }.await()
        }.await()
    }
}
在这种情况下:

事务 A (outerTransaction = null)
    └─ 事务 B (outerTransaction = 事务 A)
        └─ 事务 C (outerTransaction = 事务 B)

5.Transaction interceptor

可以在事务中定义拦截器,用于在不同阶段可以处理更多的事情。必须要在 configuration 中定义拦截器,具体拦截器的用法如下:

// 定义拦截器
class FooInterceptor: StatementInterceptor {
    override fun beforeExecution(transaction: Transaction, context: StatementContext) {}
    override fun afterExecution(transaction: Transaction, contexts: List<StatementContext>) {}
    override fun beforeCommit(transaction: Transaction) {}
    override fun afterCommit() {}
    // 补偿操作
    override fun beforeRollback(transaction: Transaction, e:Throwable?) {}
    override fun afterRollback() {}
}

runBlocking {
    suspendedTransaction(configuration = {
        // 注册拦截器
        registerInterceptor(FooInterceptor())
    }){
        // do something here
    }
}

拦截器中提供了多个方法,具体的生命周期如下图所示。

基于协程的 Android 事务框架设计

6.数据通信

可以在一个事务中去传递和设置数据,用法如下:

runBlocking {
    suspendedTransaction(configuration = {
        putData("logger", Logger("test"))
    }){
        val logger:Logger = getData("logger")
        suspendedTransaction {
            val innerLogger:Logger = getData("logger")
            assertEquals(logger, innerLogger)
        }
    }
}

7.事务一致性保证

通过 ThreadContextElement/ThreadLocal 保证事务唯一性,利用协程实现异步事务,并在 finally 块中确保提交或回滚操作。同时,通过拦截器机制提供事务生命周期钩子, 增强了事务框架的灵活性和可扩展性。

7.1 同步场景

在同步场景里,整个事务的执行是会阻塞当前线程运行的,transaction 方法内会保证每个线程在事务里只有唯一一个事务实例。

使用 ThreadLocal 变量来保存当前数据库连接以及事务上下文。这确保了每个线程在执行 transaction 函数时都拥有自己的事务上下文,避免了事务混乱。

// 发生事务嵌套时,或者在最外层 transaction 执行的代码块内,只会保证有唯一一个 transaction
transaction {
    val aTransaction = this
    transaction {
        Assert.assertEquals(aTransaction, this)
    }
}

核心代码如下:

基于协程的 Android 事务框架设计

7.2 协程场景

在协程场景下,我们需要保证在每个协程中,事务的一致性,基于 ThreadContextElement + ThreadLocal 去进行传播和恢复线程的上下文信息。

ThreadContextElement 工作原理:
传播上下文: 当一个协程从一个线程切换到另一个线程时, 如果协程的 CoroutineContext 中包含 ThreadContextElement,那么 ThreadContextElement 的 updateThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将一些线程相关的上下文信息存储到目标线程中。
恢复上下文: 当协程在目标线程上执行完成后,ThreadContextElement 的 restoreThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将之前存储的上下文信息从目标线程中移除, 并将目标线程的上下文恢复到原来的状态。

知道了 ThreadContextElement 的作用后,我们只需要在每次 transaction 创建的时候,在 coroutineContext 中去使用 ThreadContextElement 即可。

新建 CoroutineScope :

基于协程的 Android 事务框架设计

ThreadContextElement 的使用:

基于协程的 Android 事务框架设计

8.异步嵌套场景

需要特别注意这种情况:内外均为异步事务,但是内部的事务执行没有 await,代码如下:

runBlocking {
    newSuspendedTransactionAsync { // TransactionA
        newSuspendedTransactionAsync { // TransactionB
           // do something...
        }.await()
    }
}

生命周期可能会如下执行:

  • beforeExecution (A)
  • statement(A)
  • afterExecution (A)
  • 事务 B:
    beforeExecution (B)
    statement(B)
    afterExecution (B)
    afterExecution (B)
    beforeCommit (B) (如果事务 B 内部没有异常)
    afterCommit (B) (如果事务 B 内部没有异常)
  • afterExecution (A)
  • beforeCommit (A) (如果事务 A、B 内部没有异常)
  • afterCommit (A) (如果事务 A、B 内部没有异常)

由于事务 B 没有使用 await 等待,因此事务 A 无法保证事务 B 的执行结果。如果事务 B 抛出异常,事务 A 可能会继续执行,导致数据不一致。建议在嵌套事务场景中,建议使用 await 等待内层事务的完成,以确保事务的原子性和数据一致性。

newSuspendedTransactionAsync { // 事务 A
    // ... 事务 A 的语句 1 ...
    val deferredB = newSuspendedTransactionAsync { // 事务 B
        // ... 事务 B 的语句 1 ...
        // ... 事务 B 的语句 2 ...
    }
    // ... 事务 A 的语句 2 ...
    deferredB.await() // 等待事务 B 完成
}

参考


扫描二维码,在手机上阅读


    评论