基于协程的 Android 事务框架设计
背景
数据库事务(transaction)是访问并可能操作各种数据项的一个数据库操作序列,这些操作要么全部执行,要么全部不执行,是一个不可分割的工作单位。事务由事务开始与事务结束之间执行的全部数据库操作组成。
“要么全部成功,要么全部失败,那么在失败后,对于已成功的操作,是如何回滚的呢?” 数据库系统通常使用日志 (log) 来实现事务的回滚。在事务执行过程中,数据库会记录所有对数据的修改操作到日志中。当事务需要回滚时, 数据库会读取日志, 并执行逆向操作来撤销之前的修改。这种逆向操作被称之为“补偿操作(compensating operation)”
Android 使用思考
Transaction 在数据库操作中可以省去很多不必要的问题,那在 Android 开发中,我们如何更方便的使用呢?想要方便就需要尽可能多的减少辅助代码。那下面几个问题是我们需要着重考虑的。
如何简化补偿操作的辅助代码?数据如何传递?Transaction 嵌套场景同步/异步的处理Interceptor 设计Lifecycle 设计
接下来我们思考一个方便好用的 API 应该如何设计
1.New transaction
事务封装了一组 DSL 操作。要使用默认参数创建和执行事务,只需将函数块传递给transaction
函数:
transaction {
// do something here...
}
事务在当前线程上同步执行,因此它们将阻塞应用程序的其他部分!如果您需要异步执行事务,可以参考章节「使用协程」。
2.Transaction 返回值
transaction
支持直接返回值:
val result = transaction {
"some data"
}
println(result) // some data
3.使用嵌套事务
默认情况下,嵌套transaction
块共享其父transaction
块的交易资源,因此对子块的任何影响都会影响父块:
在这种场景下,事务A/B/C均为同一个事务。如果想要在一个事务中开启新事务,可以移步「使用协程」部分。
val finalResult = transaction { // 事务A
var result = 0
result += transaction { // 事务B
1
}
result += transaction { // 事务C
2
}
result
}
println(result) // 3
4.使用协程
Transaction 支持指定 CoroutineContext
来使用协程,本质上,这个框架可以基于协程和 ThreadLocal
去封装的。
目前一共支持四种方式去使用协程去开启一个事务。
4.1 suspendedTransaction
阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。
runBlocking {
val result = suspendedTransaction(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事物中使用 suspendedTransaction
,则使用之前的事务。
runBlocking {
val result = suspendedTransaction(Dispatchers.IO) { // 事务A
suspendedTransaction { // 事务B
// do something
suspendedTransaction { // 事务C
// do something
}
}
}
}
在这种情况下:
事务 A (outerTransaction = null) = 事务B = 事务C
4.2 newSuspendedTransaction
阻塞当前线程,协程开启一个新的事务,如果当前存在事务,则新建一个。
runBlocking {
val result = newSuspendedTransaction(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事务中使用newSuspendedTransaction
,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。
runBlocking {
val result = newSuspendedTransaction(Dispatchers.IO) { // 事务A
newSuspendedTransaction { // 事务B
// do something
newSuspendedTransaction { // 事务C
// do something
}
}
}
}
在这种情况下:
事务 A (outerTransaction = null)
└─ 事务 B (outerTransaction = 事务 A)
└─ 事务 C (outerTransaction = 事务 B)
4.3 suspendedTransactionAsync
异步执行,返回值:Deferred
,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则使用之前的事务。
runBlocking {
val result = suspendedTransactionAsync(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}.await()
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事物中使用suspendedTransactionAsync
,则使用之前的事务。
runBlocking {
val result = suspendedTransactionAsync(Dispatchers.IO) { // 事务A
suspendedTransactionAsync { // 事务B
// do something
suspendedTransactionAsync { // 事务C
// do something
}
}
}
}
在这种情况下:
事务 A (outerTransaction = null) = 事务B = 事务C
4.4 newSuspendedTransactionAsync
异步执行,不阻塞当前线程,使用协程开启一个新的事务,如果当前存在事务,则新建一个事务。
runBlocking {
val result = newSuspendedTransactionAsync(Dispatchers.IO) {
println("Transaction # ${this.id}") // Transaction # 3
1
}.await()
println("Result: $result") // Result: 1
}
在嵌套场景执行时,如果在其他事物中使用newSuspendedTransactionAsync,则新建一个事务,使用链表进行管理事务嵌套场景的关联关系。
建议异步处理时,遇到协程嵌套场景,请保重内部的逻辑都执行完毕,否则会导致生命周期的顺序不能得到保障。
runBlocking {
val result = newSuspendedTransactionAsync(Dispatchers.IO) { // 事务A
newSuspendedTransactionAsync { // 事务B
// do something
newSuspendedTransactionAsync { // 事务C
// do something
}.await()
}.await()
}
}
在这种情况下:
事务 A (outerTransaction = null)
└─ 事务 B (outerTransaction = 事务 A)
└─ 事务 C (outerTransaction = 事务 B)
5.Transaction interceptor
可以在事务中定义拦截器,用于在不同阶段可以处理更多的事情。必须要在 configuration
中定义拦截器,具体拦截器的用法如下:
// 定义拦截器
class FooInterceptor: StatementInterceptor {
override fun beforeExecution(transaction: Transaction, context: StatementContext) {}
override fun afterExecution(transaction: Transaction, contexts: List<StatementContext>) {}
override fun beforeCommit(transaction: Transaction) {}
override fun afterCommit() {}
// 补偿操作
override fun beforeRollback(transaction: Transaction, e:Throwable?) {}
override fun afterRollback() {}
}
runBlocking {
suspendedTransaction(configuration = {
// 注册拦截器
registerInterceptor(FooInterceptor())
}){
// do something here
}
}
拦截器中提供了多个方法,具体的生命周期如下图所示。
6.数据通信
可以在一个事务中去传递和设置数据,用法如下:
runBlocking {
suspendedTransaction(configuration = {
putData("logger", Logger("test"))
}){
val logger:Logger = getData("logger")
suspendedTransaction {
val innerLogger:Logger = getData("logger")
assertEquals(logger, innerLogger)
}
}
}
7.事务一致性保证
通过 ThreadContextElement/ThreadLocal
保证事务唯一性,利用协程实现异步事务,并在 finally
块中确保提交或回滚操作。同时,通过拦截器机制提供事务生命周期钩子, 增强了事务框架的灵活性和可扩展性。
7.1 同步场景
在同步场景里,整个事务的执行是会阻塞当前线程运行的,transaction
方法内会保证每个线程在事务里只有唯一一个事务实例。
使用 ThreadLocal
变量来保存当前数据库连接以及事务上下文。这确保了每个线程在执行 transaction
函数时都拥有自己的事务上下文,避免了事务混乱。
// 发生事务嵌套时,或者在最外层 transaction 执行的代码块内,只会保证有唯一一个 transaction
transaction {
val aTransaction = this
transaction {
Assert.assertEquals(aTransaction, this)
}
}
核心代码如下:
7.2 协程场景
在协程场景下,我们需要保证在每个协程中,事务的一致性,基于 ThreadContextElement + ThreadLocal
去进行传播和恢复线程的上下文信息。
ThreadContextElement 工作原理:
传播上下文: 当一个协程从一个线程切换到另一个线程时, 如果协程的 CoroutineContext 中包含 ThreadContextElement,那么 ThreadContextElement 的 updateThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将一些线程相关的上下文信息存储到目标线程中。
恢复上下文: 当协程在目标线程上执行完成后,ThreadContextElement 的 restoreThreadContext 方法会被调用。在这个方法中,ThreadContextElement 可以将之前存储的上下文信息从目标线程中移除, 并将目标线程的上下文恢复到原来的状态。
知道了 ThreadContextElement
的作用后,我们只需要在每次 transaction
创建的时候,在 coroutineContext
中去使用 ThreadContextElement
即可。
新建 CoroutineScope :
ThreadContextElement 的使用:
8.异步嵌套场景
需要特别注意这种情况:内外均为异步事务,但是内部的事务执行没有 await
,代码如下:
runBlocking {
newSuspendedTransactionAsync { // TransactionA
newSuspendedTransactionAsync { // TransactionB
// do something...
}.await()
}
}
生命周期可能会如下执行:
- beforeExecution (A)
- statement(A)
- afterExecution (A)
- 事务 B:
beforeExecution (B)
statement(B)
afterExecution (B)
afterExecution (B)
beforeCommit (B) (如果事务 B 内部没有异常)
afterCommit (B) (如果事务 B 内部没有异常) - afterExecution (A)
- beforeCommit (A) (如果事务 A、B 内部没有异常)
- afterCommit (A) (如果事务 A、B 内部没有异常)
由于事务 B 没有使用 await
等待,因此事务 A 无法保证事务 B 的执行结果。如果事务 B 抛出异常,事务 A 可能会继续执行,导致数据不一致。建议在嵌套事务场景中,建议使用 await
等待内层事务的完成,以确保事务的原子性和数据一致性。
newSuspendedTransactionAsync { // 事务 A
// ... 事务 A 的语句 1 ...
val deferredB = newSuspendedTransactionAsync { // 事务 B
// ... 事务 B 的语句 1 ...
// ... 事务 B 的语句 2 ...
}
// ... 事务 A 的语句 2 ...
deferredB.await() // 等待事务 B 完成
}