kotlinlang.org/docs/flow.html#sequences
Asynchronous Flow
suspending ํจ์๋ ๋น๋๊ธฐ์ ์ผ๋ก ํ๋์ ๊ฐ์ ๋ฐํํฉ๋๋ค. ๋น๋๊ธฐ์ ์ผ๋ก ๊ณ์ฐ๋ ์ฌ๋ฌ ๊ฐ๋ค์ ์ด๋ป๊ฒ ๋ฐํํ ์ ์์๊น์? Kotlin Flows๋ ์ฌ๊ธฐ์ ๋ฑ์ฅํฉ๋๋ค.
์ฌ๋ฌ ๊ฐ๋ค์ ํ์ํ๊ธฐ
์ฌ๋ฌ ๊ฐ๋ค์ Kotlin์์ collection์ ์ด์ฉํด์ ํํ๋ ์ ์์ต๋๋ค. ์๋ฅผ ๋ค์ด์, simple() ํจ์๋ ์ธ ๊ฐ์ ์ซ์๋ค์ List๋ฅผ ๋ฐํํ๊ณ forEach๋ฅผ ์ด์ฉํด์ ๋ชจ๋ ์ซ์๋ค์ ์ถ๋ ฅํฉ๋๋ค.
fun simple(): List<Int> = listOf(1, 2, 3)
fun main() {
simple().forEach { value -> println(value) }
}
๊ฒฐ๊ณผ
1
2
3
Sequence
๋ง์ฝ CPU๋ฅผ ์ฌ์ฉํด blocking ์ฝ๋(๊ฐ ๊ณ์ฐ๋ง๋ค 100ms๊ฐ ๊ฑธ๋ฆฌ์ฃ )๋ฅผ ๊ฐ์ง๊ณ ์ซ์๋ค์ ๊ณ์ฐํ๋ค๋ฉด, Sequence๋ฅผ ์ฌ์ฉํด์ ์ซ์๋ค์ ๋ํ๋ผ ์ ์์ต๋๋ค.
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
๊ฒฐ๊ณผ
1
2
3
์ฝ๋ ๊ฒฐ๊ณผ๊ฐ์ ๊ฐ์ ์ซ์๋ค์ด์ง๋ง, ํ๋์ฉ ์ถ๋ ฅ๋ ๋๋ง๋ค 100ms๊ฐ ๊ฑธ๋ฆฝ๋๋ค.
Suspending ํจ์
๊ทธ๋ฌ๋, ์ด ๊ณ์ฐ์ ์ฝ๋๋ฅผ ์ํํ๋ ๋ฉ์ธ ์ฐ๋ ๋๋ฅผ ๋ง์ต๋๋ค. ๋น๋๊ธฐ ์ฝ๋๋ก ์ด ๊ฐ๋ค์ด ๊ณ์ฐ๋ ๋, ์ฐ๋ฆฌ๋ simple ํจ์์ suspend ์์ ์๋ฅผ ํ์ํ๊ณ , blocking ์์ด ์ฝ๋๋ ๋์๊ฐ๊ณ ๋ฆฌ์คํธ๋ก ๊ฒฐ๊ณผ๋ฅผ ๋ฐํ๋ฐ์ ์ ์์ต๋๋ค.
import kotlinx.coroutines.*
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
๊ฒฐ๊ณผ
1
2
3
์ด ์ฝ๋๋ 1์ด๋ฅผ ๊ธฐ๋ค๋ฆฌ๊ณ ์ซ์๋ค์ ์ถ๋ ฅํ์ฃ .
Flows
List<Int> ๊ฒฐ๊ณผ ํ์ ์ ์ฌ์ฉํ๋ ๊ฒ์, ์ค์ง ํ ๋ฒ์ ๋ชจ๋ ๊ฐ๋ค์ ๋ฐํํ ์ ์๋ค๋ ๊ฒ์ ์๋ฏธํฉ๋๋ค. ๋น๋๊ธฐ์ ์ผ๋ก ๊ณ์ฐ๋๋ ๊ฐ๋ค์ ํ๋ฆ์ ๋ํ๋ด๊ธฐ ์ํด์, Flow<Int>ํ์ ์ ๋น๋๊ธฐ์ ์ผ๋ก ๊ณ์ฐ๋๋ ๊ฐ๋ค์ ์ํ Sequence<Int> ํ์ ์ ์ฌ์ฉํ๋ ๊ฒ์ฒ๋ผ ์ฌ์ฉํ ์ ์์ต๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
๊ฒฐ๊ณผ
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
์ด ์ฝ๋๋ ๋ฉ์ธ ์ฐ๋ ๋๋ฅผ blocking ํ์ง ์๊ณ ๊ฐ ์ซ์๋ค์ ์ถ๋ ฅํ๊ธฐ ์ ์ 100ms๋ฅผ ๊ธฐ๋ค๋ฆฝ๋๋ค. ๋ฉ์ธ ์ฐ๋ ๋์์ ๋๊ณ ์๋ ๋ถ๋ฆฌ๋ ์ฝ๋ฃจํด์์ ๋งค 100ms๋ง๋ค "I'm not blocked"๊ฐ ์ถ๋ ฅ๋๋ ๊ฒ์ ํ์ธํ ์ ์์ต๋๋ค.
Flow์ ์ด์ ์์ ๋ค ์ฝ๋์์ ๋ค์๊ณผ ๊ฐ์ ์ฐจ์ด์ ๋ค์ ์ฃผ๋ชฉํ์ธ์.
- Flow ํ์ ์ ์ํ ๋น๋ ํจ์๊ฐ flow๋ฅผ ํธ์ถํฉ๋๋ค.
- flow{...} ๋น๋ ๋ธ๋ก ์์ ์ฝ๋๋ ์ผ์์ค์ํ ์ ์์ต๋๋ค.
- simple ํจ์์ suspend ์์ ์๊ฐ ๋ถ์ง ์์ต๋๋ค.
- emit ํจ์๋ฅผ ์ด์ฉํด์ flow์์ ๊ฐ๋ค์ด ๋ฐฉ์ถ๋ฉ๋๋ค(emitted).
- collect ํจ์๋ฅผ ์ด์ฉํด์ flow์์ ๊ฐ๋ค์ด ์์ง๋ฉ๋๋ค(collected).
Flow๋ ์ฐจ๊ฐ์ต๋๋ค.
Flow๋ sequence์ ๋น์ทํ๊ฒ cold stream์ ๋๋ค. flow๊ฐ ์์ง๋๊ธฐ ์ ๊น์ง flow ๋น๋ ์์ ์ฝ๋๋ ๋์๊ฐ์ง ์์ต๋๋ค. ์๋์ ์์ ๋ฅผ ํตํด ๋ช ํํ๊ฒ ์ ์ ์์ฃ .
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
๊ฒฐ๊ณผ
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
์ด๊ฒ์ด (flow๋ฅผ ๋ฐํํ๋)simple ํจ์๊ฐ suspend ์์ ์๋ก ํ์๋์ง ์๋ ํต์ฌ ์ด์ ์ ๋๋ค. ๊ทธ ์์ฒด๋ก simple() ํธ์ถ์ ๋น ๋ฅด๊ฒ ๋ฐํํ๊ณ ์๋ฌด๊ฒ๋ ๊ธฐ๋ค๋ฆฌ์ง ์์ต๋๋ค. flow๋ ๋งค ๋ฒ ์์ง๋ ๋๋ง๋ค ์์ํ๊ณ , ์ด๊ฒ์ collect๋ฅผ ๋ค์ ํธ์ถํ ๋ "Flow started"๋ฅผ ๋ณด๊ฒ๋๋ ์ด์ ์ ๋๋ค.
Flow ์ทจ์ ๊ธฐ์ด
Flow๋ ์ฝ๋ฃจํด์ ์ผ๋ฐ์ ์ธ ํ๋ ฅ ์ทจ์๋ฅผ ๋ฐ๋ฅด๊ณ ์์ต๋๋ค. ๋ณดํต, flow collection์ flow๊ฐ (delay ์ฒ๋ผ)์ทจ์ ๊ฐ๋ฅํ suspending ํจ์ ์์์ ์ผ์์ค๋จ๋ ๋์ ์ทจ์๋ ์ ์์ต๋๋ค. ์๋์ ์์ ๋ ์ด๋ป๊ฒ flow๊ฐ withTimeoutOrNull ์์์ ์คํ ์ค์ผ ๋ ํ์์์์ผ๋ก ์ทจ์๋๊ณ ์ฝ๋ ์ํ์ ์ค์งํ๋์ง๋ฅผ ๋ณด์ฌ์ค๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}
๊ฒฐ๊ณผ
Emitting 1
1
Emitting 2
2
Done
์ด๋ป๊ฒ simple ํจ์์์ ์ซ์ ๋ ๊ฐ๋ง์ด flow์์ ๋ด๋ณด๋ด๊ฒ ๋๊ณ ๊ฒฐ๊ณผ๋ฅผ ๋ง๋๋์ง ์ฃผ๋ชฉํ์ธ์.
Flow ๋น๋๋ค
์ด์ ์์ ๋ค์์ ๋์จ flow{...} ๋น๋๋ ๊ฐ์ฅ ๊ธฐ์ด์ ์ธ ๊ฒ์ ๋๋ค. flow์ ๋ ์ฌ์ด ์ ์ธ์ ์ํ ๋ค๋ฅธ ๋น๋๋ค์ด ์์ต๋๋ค.
flowOf ๋น๋๋ ๊ฐ์ ๊ณ ์ ๋ ์งํฉ์ ๋ด๋ณด๋ด๋ flow๋ฅผ ์ ์ํฉ๋๋ค.
๋ค์ํ collection๋ค๊ณผ sequence๋ค์ .asFlow() ํ์ฅ ํจ์๋ฅผ ์ด์ฉํด์ flow๋ก ๋ณํ๋ ์ ์์ต๋๋ค.
1๋ถํฐ 3๊น์ง์ ์ซ์๋ฅผ ์ถ๋ ฅํ๋ ์์ ๋ ๋ค์๊ณผ ๊ฐ์ด ์์ฑํ ์ ์์ต๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
// Convert an integer range to a flow
(1..3).asFlow().collect { value -> println(value) }
}
๊ฒฐ๊ณผ
1
2
3
์ค๊ฐ flow ์ฐ์ฐ์๋ค
collection๊ณผ sequence์ ๋ง์ฐฌ๊ฐ์ง๋ก Flow๋ ์ฐ์ฐ์๋ฅผ ๊ฐ์ง๊ณ ๋ณํ๋ ์ ์์ต๋๋ค. ์ค๊ฐ ์ฐ์ฐ์๋ค์ upstream flow์ ์ ์ฉ๋์ด downstream flow๋ฅผ ๋ฐํํฉ๋๋ค. ์ด ์ฐ์ฐ์๋ค์ flow์ ๋ง์ฐฌ๊ฐ์ง๋ก ์ฐจ๊ฐ์ต๋๋ค. ์ด๋ฌํ ์ฐ์ฐ์๋ฅผ ํธ์ถํ๋ ๊ฒ์ ์์ฒด๋ก suspending ํจ์๋ ์๋๋๋ค. ๋น ๋ฅด๊ฒ ๋์ํ๊ณ , ์๋กญ๊ฒ ๋ณํ๋ flow์ ์ ์๋ฅผ ๋ฐํํฉ๋๋ค.
๊ธฐ์ด ์ฐ์ฐ์๋ค์ map, filter ๊ฐ์ด ์น์ํ ์ด๋ฆ๋ค์ ๊ฐ์ง๊ณ ์์ต๋๋ค. ์ํ์ค์์ ์ค์ํ ์ฐจ์ด์ ์ ์ด ์ฐ์ฌ์๋ค ์์ ์ฝ๋ ๋ธ๋ก๋ค์ suspending ํจ์๋ค์ ํธ์ถํ ์ ์๋ค๋ ๊ฒ์ ๋๋ค.
์๋ฅผ๋ค์ด์, ์ ๋ ฅ ์์ฒญ์ flow๋ map ์ฐ์ฐ์๋ก ๊ฒฐ๊ณผ๋ฅผ ๋งคํํ ์ ์์ต๋๋ค. ์์ฒญ์ ์ํํ๋ ๊ฒ์ด suspending ํจ์๋ก ๊ตฌํ๋๋, ์๊ฐ์ด ๊ธธ๊ฒ ๊ฑธ๋ฆฌ๋ ์ฐ์ฐ์ผ ๊ฒฝ์ฐ์๋ ๋ง์ ๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
๋ค์ 3์ค์ ๋ง๋ค๊ณ , ๊ฐ ์ค์ ๊ฐ 1์ด ํ์ ๋ฑ์ฅํฉ๋๋ค.
response 1
response 2
response 3
๋ณํ ์ฐ์ฐ์
flow ๋ณํ ์ฐ์ฐ์๋ค ์ค์์, ๊ฐ์ฅ ์ผ๋ฐ์ ์ธ ๊ฒ์ transform์ด๋ผ ๋ถ๋ฆฝ๋๋ค. map, filter ๊ฐ์ด ๊ฐ๋จํ ๋ณํ์ ๋ชจ๋ฐฉํ๋๋ฐ ์ฌ์ฉ๋ ์ ์๊ณ , ๋ ๋ณต์กํ ๋ณํ๋ ๊ตฌํํ ์ ์์ต๋๋ค. transform ์ฐ์ฐ์๋ฅผ ์ด์ฉํด์, ์์์ ํ์๋งํผ ์์ ๊ฐ์ ๋ด๋ณด๋ผ ์ ์์ต๋๋ค.
์๋ฅผ ๋ค์ด์, transform์ ์ด์ฉํด ์๊ฐ์ด ๊ธธ๊ฒ ๊ฑธ๋ฆฌ๋ ๋น๋๊ธฐ ์์ฒญ์ ์ํํ๊ธฐ ์ ์ string์ ๋ด๋ณด๋ผ ์ ์๊ณ ์๋ต์ ๋ค์ด์ด ๋ณด๋ผ ์ ์์ต๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
}
๊ฒฐ๊ณผ
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
ํฌ๊ธฐ ์ ํ ์ฐ์ฐ์
take์ ๊ฐ์ด ํฌ๊ธฐ ์ ํ ์ค๊ฐ ์ฐ์ฐ์๋ ํด๋นํ๋ ์ ํ์ ๋๋ฌํ๋ฉด flow์ ์ํ์ ์ทจ์ํฉ๋๋ค. ์ฝ๋ฃจํด์์ ์ทจ์๋ ํญ์ exception์ ๋ฐ์ํ๋ฉด์ ์ํ๋ฉ๋๋ค. ๊ทธ๋์ ๋ชจ๋ ๋ฆฌ์์ค ๊ด๋ฆฌ ํจ์๋ค(try {...} finally {...} ๋ธ๋ก๋ค)์ ๋ณดํต ์ทจ์ํ๋ ๊ฒฝ์ฐ์ ์ฐ์ฐํฉ๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
๊ฒฐ๊ณผ
1
2
Finally in numbers
์ด ์ฝ๋์ ์ถ๋ ฅ ๊ฒฐ๊ณผ๋ numbers() ํจ์ ์์ flow{...} body์ ์ํ์ด ๋ ๋ฒ์งธ ์ซ์๋ฅผ ๋ด๋ณด๋ธ ์ดํ์ ์ค์ง๋ ๊ฒ์ ๋ช ํํ๊ฒ ๋ณด์ฌ์ค๋๋ค.
Terminal flow ์ฐ์ฐ์๋ค
flow์์ Terminal ์ฐ์ฐ์๋ค์ flow์ collection์ ์์ํ๋ suspending ํจ์๋ค์ด๋ค. collect ํจ์๊ฐ ๊ฐ์ฅ ๋ํ์ ์ธ ๊ฒ์ด๊ณ , ๋ ์ฌ์ด ๋ค๋ฅธ terminal ์ฐ์ฐ์๋ค์ด ์๋ค.
- toList, toSet์ ๋ค์ํ collection๋ค๋ก ๋ณํ์ํจ๋ค.
- first ๋ก ์ฒซ ๊ฐ์ ๊ตฌํ๊ณ , single๋ก ๊ฐ ํ๋๋ฅผ ๋ด๋ณด๋ธ๋ค.
- reduce์ fold๋ก flow๋ฅผ ๊ฐ์ผ๋ก ์ค์ธ๋ค.
์์๊ฐ ์ฌ๊ธฐ ์๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
}
๊ฒฐ๊ณผ
55
flow๋ ์์ฐจ์ ์ ๋๋ค.
flow์ ๊ฐ ๊ฐ๋ณ collection์ ์ฌ๋ฌ flow๋ค์์ ์ฐ์ฐํ๋ ํน์ ํ ์ฐ์ฌ์๋ค์ด ์ฌ์ฉ๋์ง ์๋ํ ์์ฐจ์ ์ผ๋ก ์ํ๋ฉ๋๋ค. collection์ terminal ์ฐ์ฌ์๋ฅผ ํธ์ถํ๋ ์ฝ๋ฃจํด์์ ๋ฐ๋ก ์์ ํฉ๋๋ค. ๋ณดํต ์๋ก์ด ์ฝ๋ฃจํด์ด ์์๋์ง ์์ต๋๋ค. ๋ฐฉ์ถ๋ ๊ฐ๊ฐ์ ๊ฐ์ upsteam์์ downstream๊น์ง ๋ชจ๋ ์ค๊ฐ ์ฐ์ฐ์๋ค์ ์ํด์ ์ฒ๋ฆฌ๋๊ณ ๋์ ํ์ terminal ์ฐ์ฐ์๋ก ์ ๋ฌ๋ฉ๋๋ค.
์ง์ ์ ์๋ค์ ํํฐ๋งํ๊ณ ๋์ ๋ฌธ์์ด๋ก ๋งคํํ๋ ๋ค์ ์๋ฅผ ๋ณด์ธ์.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking<Unit> {
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
}
๊ฒฐ๊ณผ
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow context
flow์ collection์ ํธ์ถํ๋ ์ฝ๋ฃจํด์ context์์ ํญ์ ๋ฐ์ํฉ๋๋ค. ์๋ฅผ ๋ค์ด, simple flow๊ฐ ์๊ณ , simple flow์ ์ธ๋ถ ๊ตฌํ์ฌํญ๊ณผ ๊ด๋ จ์์ด ๋ค์ ์ฝ๋๋ ์ด ์ฝ๋์ ์์ฑ์์ ์ํด ์ง์ ๋ context์์ ์คํํฉ๋๋ค.
withContext(context) {
simple().collect { value ->
println(value) // run in the specified context
}
}
flow์ ์ด๋ฌํ ์์ฑ์ context perservation(๋ณด์กด)์ด๋ผ๊ณ ๋ถ๋ฆ
๋๋ค.
๋ณดํต, flow{...} ๋น๋ ์์ ์ฝ๋๋ ํด๋นํ๋ flow์ collector์ ์ํด ์ ๊ณต๋ context ์์์ ์คํํฉ๋๋ค. ์๋ฅผ ๋ค์ด, ํธ์ถ๋ ์ฐ๋ ๋๋ฅผ printํ๊ณ ์ซ์ 3๊ฐ๋ฅผ ๋ด๋ณด๋ด๋ simple ํจ์์ ๊ตฌํ์ ๊ณ ๋ คํด๋ณด์ฃ .
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
๊ฒฐ๊ณผ
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect ๊ฐ main thread์์ ํธ์ถ๋์๊ธฐ ๋๋ฌธ์, simple์ flow์ body๋ main thread์์ ํธ์ถ๋์์ต๋๋ค. ์ด๊ฒ์ ์คํ context๋ฅผ ์ ๊ฒฝ์ฐ์ง ์๊ณ ํธ์ถ์๋ฅผ ์ฐจ๋จํ์ง ์๋, ๋น ๋ฅธ ์คํ, ๋๋ ๋น๋๊ธฐ์ ์ธ ์ฝ๋๋ฅผ ์ํ ์๋ฒฝํ ๊ธฐ๋ณธ๊ฐ์ ๋๋ค.
์๋ชป๋ ๋ฐฉ์ถ(emission) withContext
๊ทธ๋ฌ๋, ์ฅ๊ธฐ๊ฐ CPU๋ฅผ ์ฌ์ฉํ๋ ์ฝ๋๋ Dispatchers.Default์ context์์ ์คํ๋ ํ์๊ฐ ์๊ณ , UI๋ฅผ ์ ๋ฐ์ดํธํ๋ ์ฝ๋๋ Dispatchers.Main์ context์์ ์คํ๋์ด์ผ ํฉ๋๋ค. ๋๊ฐ, withContext๋ ์ฝํ๋ฆฐ ์ฝ๋ฃจํด์ ์ฌ์ฉํ๋ ์ฝ๋์์ context๋ฅผ ๋ณ๊ฒฝํ๋๋ฐ ์ฌ์ฉ๋๋๋ฐ์, ๊ทธ๋ฌ๋ flow{...} ๋น๋ ์์ ์ฝ๋๋ context ๋ณด์ ์์ฑ์ ์ง์ผ์ผ ํ๊ณ , ๋ค๋ฅธ context์์ emitํ๋ ๊ฑด ํ์ฉํ์ง ์์ต๋๋ค.
๋ค์ ์ฝ๋๋ฅผ ์๋ํด๋ณด์ฃ .
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
๊ฒฐ๊ณผ. ๋ค์๊ณผ ๊ฐ์ exception์ด ๋์ต๋๋ค.
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
> flow ์์์ withContext ์ฌ์ฉ ๋ชปํจ.
flowOn ์ฐ์ฐ์
์์ธ๋ก flow emission(๋ฐฉ์ถ)์ context๋ฅผ ๋ฐ๊พธ๋ ๋ฐ ์ฌ์ฉํ๋ flowOn ํจ์๊ฐ ์์ต๋๋ค. flow์ context๋ฅผ ๋ฐ๊พธ๋๋ฐ ์ฌ๋ฐ๋ฅธ ๋ฐฉ๋ฒ์ด ์๋์ ์์ ๋ณด์ฌ์ง๋๋ค. ์๋์ ์๋ ํด๋นํ๋ ์ฐ๋ ๋๊ฐ ์ด๋ป๊ฒ ์ผํ๋์ง ๋ณด์ฌ์ฃผ๊ณ ์ด๋ฆ์ ์ถ๋ ฅํฉ๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
๊ฒฐ๊ณผ
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
collection์ด ๋ฉ์ธ ์ฐ๋ ๋์์ ๋์ํ๋ ๋์, flow{...}๊ฐ ๋ฐฑ๊ทธ๋ผ์ด๋ ์ฐ๋ ๋์์ ์ด๋ป๊ฒ ์ผํ๋์ง๋ฅผ ์์๋์ธ์.
์ฌ๊ธฐ์ ๊ด์ฐฐํด์ผ ํ๋ ๋ค๋ฅธ ๊ฒ์ flowOn ์ฐ์ฐ์๊ฐ flow์ ๊ธฐ๋ณธ ์์ฐจ์ ์ธ ํน์ฑ์ ๋ฐ๊พธ์๋ค๋ ๊ฒ์ ๋๋ค. ํ์ฌ collection์ ํ ์ฝ๋ฃจํด("coroutine#1)์์ ๋์ํ๊ณ , collectํ๋ ์ฝ๋ฃจํด๊ณผ ํจ๊ป ๋์์ ๋ค๋ฅธ ์ฐ๋ ๋์์ ๋๊ณ ์๋ ๋ ๋ค๋ฅธ ์ฝ๋ฃจํด("coroutine#2")์์ emission์ด ์ผ์ด๋ฌ์ต๋๋ค. flowOn ์ฐ์ฌ์๋ context์์ CoroutineDispatcher๋ฅผ ๋ฐ๊ฟ์ผ ํ ๋, upstream flow๋ฅผ ์ํด ๋ ๋ค๋ฅธ ์ฝ๋ฃจํด์ ์์ฑํ์ต๋๋ค.
Buffering
๋ค๋ฅธ ์ฝ๋ฃจํด๋ค์์์ ํ flow์ ๋ค๋ฅธ ๋ถ๋ถ๋ค์ ์คํํ๋ ๊ฒ์ flow๋ฅผ ์์งํ๋ ์ ์ฒด์ ์ธ ์๊ฐ์ ๊ด์ ์์๋ ์ ์ฉํ ์ ์์ต๋๋ค. ํนํ ์ฅ์๊ฐ ์ํํ๋ ๋น๋๊ธฐ ์ฐ์ฐ์ ํฌํจํด์ ๋ง์ด์ฃ . ์๋ฅผ ๋ค์ด์, simple flow์ emission์ด ๋๋ฆด ๋, ํ ์์๊ฐ ์์ฑ๋๋๋ฐ 100ms๊ฐ ๊ฑธ๋ฆฌ๊ณ , ์์งํ๋ ๊ฒ๋ ๋๋ ค์ ํ ์์๋ฅผ ์ฒ๋ฆฌํ๋๋ฐ 300ms๊ฐ ๊ฑธ๋ฆฌ๋ ๊ฒฝ์ฐ๋ฅผ ๊ณ ๋ คํด๋ด ์๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
๊ฒฐ๊ณผ
1
2
3
Collected in 1222 ms
buffer ์ฐ์ฐ์๋ฅผ ์ฌ์ฉํด์ ์์ฐจ์ ์ผ๋ก ์คํํ๋ ๋์ ์, simple flow์ emitting ์ฝ๋์ collectiong ์ฝ๋๋ฅผ ๋์์ ์คํํ ์ ์์ต๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
๊ฒฐ๊ณผ
1
2
3
Collected in 1057 ms
๊ฐ์ ์ซ์๋ค์ ๋ ๋น ๋ฅด๊ฒ ์์ฑํ๊ณ , ์ฒ๋ฆฌ ํ์ดํ๋ผ์ธ์ ํจ๊ณผ์ ์ผ๋ก ์์ฑํด์, ์ฒซ ๋ฒ์งธ ์ซ์๋ 100ms๋ง ๊ธฐ๋ค๋ฆฌ๊ณ ๊ทธ ํ์ ๊ฐ ์ซ์๋ฅผ ์ฒ๋ฆฌํ๋๋ฐ์ 300ms๋ง ๊ฑธ๋ฆฝ๋๋ค.
Conflation / ํฉ์ฑ
flow๊ฐ ์ฐ์ฐ์ ๋ถ๋ถ์ ์ธ ๊ฒฐ๊ณผ๋ค์ด๋ ์ฐ์ฐ ์ํ ์ ๋ฐ์ดํธ์ ์ผ๋ถ ๊ฒฐ๊ณผ๋ค์ ๋ํ๋ผ ๋, ๋ชจ๋ ๊ฐ์ ์ฒ๋ฆฌํ ํ์๋ ์์ต๋๋ค. ๋์ ์ ๊ฐ์ฅ ์ต๊ทผ๊ฐ๋ง ์ฒ๋ฆฌํ๋ฉด ๋์ฃ . ์ด ๊ฒฝ์ฐ์, conflate ์ฐ์ฐ์๊ฐ collector๊ฐ ๊ฐ๋ค์ ์ฒ๋ฆฌํ๋๋ฐ ๋๋ฌด ์ค๋ ๊ฑธ๋ฆด ๋์ ์ค๊ฐ ๊ฐ๋ค์ ๊ฑด๋๋ฐ๊ฒ ํด์ค๋๋ค. ์ด์ ์์ ๋ฅผ ๊ธฐ๋ฐ์ ์์ฑํ์ต๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
๊ฒฐ๊ณผ
1
3
Collected in 756 ms
์ฒซ ๋ฒ์งธ ์ซ์๊ฐ ์์ง๋ ์ฒ๋ฆฌ ์ค์ผ ๋์์ ๋๋ฒ์งธ, ์ธ๋ฒ์งธ ๊ฐ์ด ์ด๋ฏธ ์์ฐ๋์๊ณ ์, ๊ทธ๋์ ๋ ๋ฒ์งธ ๊ฐ์ ์์ถ(conflated/๊ฒฐํฉ)๋์ด ๊ฐ์ฅ ์ต๊ทผ ๊ฐ(์ธ๋ฒ์งธ ๊ฐ)์ด collector๋ก ์ ๋ฌ๋์์ต๋๋ค.
์ต์ ๊ฐ ์ฒ๋ฆฌํ๊ธฐ
Conflation์ emitter์ collector ๋ ๋ค ๋๋ฆด ๋ ์ฒ๋ฆฌ ์๋๋ฅผ ๋น ๋ฅด๊ฒ ํ๋ ํ๋์ ๋ฐฉ๋ฒ์ ๋๋ค. ๋ด๋ณด๋ด์ง(emitted) ๊ฐ๋ค์ ๋๋ฝ์ํค๋ ๊ฑฐ์ฃ . ๋ค๋ฅธ ๋ฐฉ๋ฒ์ ๋๋ฆฐ collector๋ฅผ ์ทจ์ํ๊ณ ์ ๊ฐ์ ๋ด๋ณด๋ด๋ ๋งค ์๊ฐ๋ง๋ค collector๋ฅผ ์ฌ์์ํ๋ ๊ฒ๋๋ค. ์ด๊ฒ์ xxx ์ฐ์ฐ์์ ๊ผญ ํ์์ ์ธ ๋ก์ง์ ์ํํ๋ ์ ๊ฐ์ ๋ฐ์ block ์์ ์ฝ๋๋ ์ทจ์ํ๋ xxxLatest ์ฐ์ฐ์๋ค์ ์ผ์ข ์ ๋๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
}
๊ฒฐ๊ณผ
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 684 ms
collectLatest์ body๋ 300ms๊ฐ ๊ฑธ๋ฆฌ๋, ์ ๊ฐ๋ค์ ๋งค 100ms๋ง๋ค ๋ฐฉ์ถ๋๋, block์ ๋งค ์ ๊ฐ๋ง๋ค ์คํํ๋ ๋ง์ง๋ง ๊ฐ์ผ๋ก๋ง ์๋ฃ๋๋ ๊ฑธ ๋ณผ ์ ์์ต๋๋ค.