Kotlin Flow的6个必知高阶技巧
一、智能缓冲策略优化流处理
背压问题核心解析
当生产者和消费者速率不匹配时,传统流处理会导致数据积压。Kotlin的buffer()
操作符通过创建缓冲区解决该问题:
flow {
(1..5).forEach {
delay(100) // 模拟生产延迟
emit(it)
}
}.buffer(32) // 设置32个元素的缓冲区
.collect { value ->
delay(300) // 模拟消费延迟
println(value)
}
技术要点:
buffer()
默认使用64个元素的缓冲区- 缓冲区满时生产者挂起,避免内存溢出
- 结合
conflate()
可丢弃中间值保留最新数据
缓冲策略选择矩阵
二、精准流量控制技巧
节流操作实战
flow {
var value = 0
while (true) {
emit(value++)
delay(10) // 每10ms发射数据
}
}.conflate() // 仅保留最新值
.collect {
delay(100) // 每100ms处理数据
println(it)
}
执行效果:
0
12
24
36
...
说明:中间11个值被丢弃,仅处理最新数据
时间窗口采样
flow {
(1..20).forEach {
emit(it)
delay(50)
}
}.sample(200) // 200ms时间窗口采样
.collect {
println(it) // 输出:4,8,12,16,20
}
三、多流操作高级策略
复合流合并技术
val flowA = flowOf("A1", "A2", "A3").onEach { delay(100) }
val flowB = flowOf("B1", "B2").onEach { delay(150) }
merge(flowA, flowB).collect {
println(it) // 输出:A1, B1, A2, A3, B2
}
动态流切换方案
fun getFlowByType(type: String): Flow<String> =
when(type) {
"A" -> flowOf("A1", "A2")
"B" -> flowOf("B1", "B2")
else -> emptyFlow()
}
flatMapConcat { type ->
getFlowByType(type) // 顺序执行流
}
四、流共享与状态管理
SharedFlow热流配置
val sharedFlow = MutableSharedFlow<Int>(
replay = 2, // 新订阅者接收最近2个值
extraBufferCapacity = 10 // 额外缓冲区
)
// 生产者
launch {
(1..15).forEach {
sharedFlow.emit(it)
delay(50)
}
}
// 消费者1
launch {
sharedFlow.collect {
println("C1: $it")
}
}
// 延迟消费者
launch {
delay(300)
sharedFlow.collect {
println("C2: $it") // 收到13,14(replay=2)
}
}
StateFlow状态管理
val state = MutableStateFlow(0)
// 状态更新
fun increment() {
state.value += 1
}
// 状态监听
state.collect {
println("状态更新:$it")
}
五、复杂流异常处理
精细化异常捕获
flow {
emit(1)
throw RuntimeException("测试异常")
}.catch { cause ->
if (cause is IOException) {
emit(-1) // 特定异常恢复
} else {
throw cause // 重新抛出
}
}.onCompletion { cause ->
cause?.let {
println("流终止原因: ${it.message}")
}
}.collect { ... }
超时控制机制
withTimeoutOrNull(250) {
flow {
emit(1)
delay(300) // 超时操作
emit(2)
}.collect()
} ?: println("操作超时")
六、Flow与响应式UI集成
Android UI状态管理
class ViewModel : ViewModel() {
private val _uiState = MutableStateFlow<UiState>(Loading)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
fun loadData() {
viewModelScope.launch {
_uiState.value = Loading
try {
val data = repository.fetchData()
_uiState.value = Success(data)
} catch (e: Exception) {
_uiState.value = Error(e.message)
}
}
}
}
// Activity中监听
lifecycleScope.launchWhenStarted {
viewModel.uiState.collect { state ->
when(state) {
is Loading -> showProgress()
is Success -> showData(state.data)
is Error -> showError(state.message)
}
}
}
防抖搜索实现
searchInputFlow
.debounce(300) // 300ms防抖
.distinctUntilChanged() // 值去重
.flatMapLatest { query ->
searchApi(query) // 取消前次搜索
}
.flowOn(Dispatchers.IO) // 切换IO线程
.collect { results ->
updateUI(results)
}
总结
- 流量控制三剑客:
buffer
/conflate
/sample
应对不同背压场景 - 多流操作选择:
flatMapConcat
:顺序执行flatMapMerge
:并发执行flatMapLatest
:取最新流
- 状态共享方案:
- SharedFlow:多订阅者广播
- StateFlow:单一值状态管理
- 生产环境实践:
val scope = CoroutineScope(SupervisorJob() + CoroutineExceptionHandler { _, e -> logError(e) })
性能优化
扩展建议:
- 结合
Channel
实现复杂生产者-消费者模型 - 使用
flowOn
精确控制执行上下文 - 在Compose中使用
collectAsStateWithLifecycle