๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

๋นˆ ๊ตฌ๋ฉ ์ฑ„์šฐ๊ธฐ

[Kotlin Coroutine] Flow - 1

kotlinlang.org/docs/flow.html#sequences

 

Asynchronous Flow - Help | Kotlin

 

kotlinlang.org

Asynchronous Flow

 

suspending ํ•จ์ˆ˜๋Š” ๋น„๋™๊ธฐ์ ์œผ๋กœ ํ•˜๋‚˜์˜ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ๋น„๋™๊ธฐ์ ์œผ๋กœ ๊ณ„์‚ฐ๋œ ์—ฌ๋Ÿฌ ๊ฐ’๋“ค์„  ์–ด๋–ป๊ฒŒ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ์„๊นŒ์š”? Kotlin Flows๋Š” ์—ฌ๊ธฐ์„œ ๋“ฑ์žฅํ•ฉ๋‹ˆ๋‹ค.

 

์—ฌ๋Ÿฌ ๊ฐ’๋“ค์„ ํ‘œ์‹œํ•˜๊ธฐ

์—ฌ๋Ÿฌ ๊ฐ’๋“ค์€ Kotlin์—์„œ collection์„ ์ด์šฉํ•ด์„œ ํ‘œํ˜„๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด์„œ, simple() ํ•จ์ˆ˜๋Š” ์„ธ ๊ฐœ์˜ ์ˆซ์ž๋“ค์˜ List๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ  forEach๋ฅผ ์ด์šฉํ•ด์„œ ๋ชจ๋“  ์ˆซ์ž๋“ค์„ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.

fun simple(): List<Int> = listOf(1, 2, 3)
 
fun main() {
    simple().forEach { value -> println(value) } 
}

๊ฒฐ๊ณผ

1
2
3

Sequence

๋งŒ์•ฝ CPU๋ฅผ ์‚ฌ์šฉํ•ด blocking ์ฝ”๋“œ(๊ฐ ๊ณ„์‚ฐ๋งˆ๋‹ค 100ms๊ฐ€ ๊ฑธ๋ฆฌ์ฃ )๋ฅผ ๊ฐ€์ง€๊ณ  ์ˆซ์ž๋“ค์„ ๊ณ„์‚ฐํ•œ๋‹ค๋ฉด, Sequence๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ˆซ์ž๋“ค์„ ๋‚˜ํƒ€๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

๊ฒฐ๊ณผ

1
2
3

์ฝ”๋“œ ๊ฒฐ๊ณผ๊ฐ’์€ ๊ฐ™์€ ์ˆซ์ž๋“ค์ด์ง€๋งŒ, ํ•˜๋‚˜์”ฉ ์ถœ๋ ฅ๋  ๋•Œ๋งˆ๋‹ค 100ms๊ฐ€ ๊ฑธ๋ฆฝ๋‹ˆ๋‹ค.

 

Suspending ํ•จ์ˆ˜

๊ทธ๋Ÿฌ๋‚˜, ์ด ๊ณ„์‚ฐ์€ ์ฝ”๋“œ๋ฅผ ์ˆ˜ํ–‰ํ•˜๋Š” ๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ๋ฅผ ๋ง‰์Šต๋‹ˆ๋‹ค. ๋น„๋™๊ธฐ ์ฝ”๋“œ๋กœ ์ด ๊ฐ’๋“ค์ด ๊ณ„์‚ฐ๋  ๋•Œ, ์šฐ๋ฆฌ๋Š” simple ํ•จ์ˆ˜์— suspend ์ˆ˜์ •์ž๋ฅผ ํ‘œ์‹œํ•˜๊ณ , blocking ์—†์ด ์ฝ”๋“œ๋Š” ๋Œ์•„๊ฐ€๊ณ  ๋ฆฌ์ŠคํŠธ๋กœ ๊ฒฐ๊ณผ๋ฅผ ๋ฐ˜ํ™˜๋ฐ›์„ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import kotlinx.coroutines.*                 
                           
suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

๊ฒฐ๊ณผ

1
2
3

์ด ์ฝ”๋“œ๋Š” 1์ดˆ๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์ˆซ์ž๋“ค์„ ์ถœ๋ ฅํ•˜์ฃ .

 

Flows

List<Int> ๊ฒฐ๊ณผ ํƒ€์ž…์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์€, ์˜ค์ง ํ•œ ๋ฒˆ์— ๋ชจ๋“  ๊ฐ’๋“ค์„ ๋ฐ˜ํ™˜ํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์„ ์˜๋ฏธํ•ฉ๋‹ˆ๋‹ค. ๋น„๋™๊ธฐ์ ์œผ๋กœ ๊ณ„์‚ฐ๋˜๋Š” ๊ฐ’๋“ค์˜ ํ๋ฆ„์„ ๋‚˜ํƒ€๋‚ด๊ธฐ ์œ„ํ•ด์„œ, Flow<Int>ํƒ€์ž…์„ ๋น„๋™๊ธฐ์ ์œผ๋กœ ๊ณ„์‚ฐ๋˜๋Š” ๊ฐ’๋“ค์„ ์œ„ํ•œ Sequence<Int> ํƒ€์ž…์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ฒ˜๋Ÿผ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

              
fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}

๊ฒฐ๊ณผ

I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3

์ด ์ฝ”๋“œ๋Š” ๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ๋ฅผ blocking ํ•˜์ง€ ์•Š๊ณ  ๊ฐ ์ˆซ์ž๋“ค์„ ์ถœ๋ ฅํ•˜๊ธฐ ์ „์— 100ms๋ฅผ ๊ธฐ๋‹ค๋ฆฝ๋‹ˆ๋‹ค. ๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ์—์„œ ๋Œ๊ณ  ์žˆ๋Š” ๋ถ„๋ฆฌ๋œ ์ฝ”๋ฃจํ‹ด์—์„œ ๋งค 100ms๋งˆ๋‹ค "I'm not blocked"๊ฐ€ ์ถœ๋ ฅ๋˜๋Š” ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

Flow์™€ ์ด์ „ ์˜ˆ์ œ๋“ค ์ฝ”๋“œ์—์„œ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ์ฐจ์ด์ ๋“ค์„ ์ฃผ๋ชฉํ•˜์„ธ์š”.

 

  • Flow ํƒ€์ž…์„ ์œ„ํ•œ ๋นŒ๋” ํ•จ์ˆ˜๊ฐ€ flow๋ฅผ ํ˜ธ์ถœํ•ฉ๋‹ˆ๋‹ค.
  • flow{...} ๋นŒ๋” ๋ธ”๋ก ์•ˆ์˜ ์ฝ”๋“œ๋Š” ์ผ์‹œ์ค‘์‹œํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.
  • simple ํ•จ์ˆ˜์— suspend ์ˆ˜์ •์ž๊ฐ€ ๋ถ™์ง€ ์•Š์Šต๋‹ˆ๋‹ค.
  • emit ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•ด์„œ flow์—์„œ ๊ฐ’๋“ค์ด ๋ฐฉ์ถœ๋ฉ๋‹ˆ๋‹ค(emitted).
  • collect ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•ด์„œ flow์—์„œ ๊ฐ’๋“ค์ด ์ˆ˜์ง‘๋ฉ๋‹ˆ๋‹ค(collected).

 

Flow๋Š” ์ฐจ๊ฐ‘์Šต๋‹ˆ๋‹ค.

Flow๋Š” sequence์™€ ๋น„์Šทํ•˜๊ฒŒ cold stream์ž…๋‹ˆ๋‹ค. flow๊ฐ€ ์ˆ˜์ง‘๋˜๊ธฐ ์ „๊นŒ์ง€ flow ๋นŒ๋” ์•ˆ์˜ ์ฝ”๋“œ๋Š” ๋Œ์•„๊ฐ€์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ์•„๋ž˜์˜ ์˜ˆ์ œ๋ฅผ ํ†ตํ•ด ๋ช…ํ™•ํ•˜๊ฒŒ ์•Œ ์ˆ˜ ์žˆ์ฃ .

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

     
fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple()
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}

๊ฒฐ๊ณผ

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

์ด๊ฒƒ์ด (flow๋ฅผ ๋ฐ˜ํ™˜ํ•˜๋Š”)simple ํ•จ์ˆ˜๊ฐ€ suspend ์ˆ˜์ •์ž๋กœ ํ‘œ์‹œ๋˜์ง€ ์•Š๋Š” ํ•ต์‹ฌ ์ด์œ ์ž…๋‹ˆ๋‹ค. ๊ทธ ์ž์ฒด๋กœ simple() ํ˜ธ์ถœ์€ ๋น ๋ฅด๊ฒŒ ๋ฐ˜ํ™˜ํ•˜๊ณ  ์•„๋ฌด๊ฒƒ๋„ ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š์Šต๋‹ˆ๋‹ค. flow๋Š” ๋งค ๋ฒˆ ์ˆ˜์ง‘๋  ๋•Œ๋งˆ๋‹ค ์‹œ์ž‘ํ•˜๊ณ , ์ด๊ฒƒ์€ collect๋ฅผ ๋‹ค์‹œ ํ˜ธ์ถœํ•  ๋•Œ "Flow started"๋ฅผ ๋ณด๊ฒŒ๋˜๋Š” ์ด์œ ์ž…๋‹ˆ๋‹ค.

 

Flow ์ทจ์†Œ ๊ธฐ์ดˆ

Flow๋Š” ์ฝ”๋ฃจํ‹ด์˜ ์ผ๋ฐ˜์ ์ธ ํ˜‘๋ ฅ ์ทจ์†Œ๋ฅผ ๋”ฐ๋ฅด๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ๋ณดํ†ต, flow collection์€ flow๊ฐ€ (delay ์ฒ˜๋Ÿผ)์ทจ์†Œ ๊ฐ€๋Šฅํ•œ suspending ํ•จ์ˆ˜ ์•ˆ์—์„œ ์ผ์‹œ์ค‘๋‹จ๋  ๋•Œ์— ์ทจ์†Œ๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์•„๋ž˜์˜ ์˜ˆ์ œ๋Š” ์–ด๋–ป๊ฒŒ flow๊ฐ€ withTimeoutOrNull ์•ˆ์—์„œ ์‹คํ–‰ ์ค‘์ผ ๋•Œ ํƒ€์ž„์•„์›ƒ์œผ๋กœ ์ทจ์†Œ๋˜๊ณ  ์ฝ”๋“œ ์ˆ˜ํ–‰์„ ์ค‘์ง€ํ•˜๋Š”์ง€๋ฅผ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค.

 

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}

๊ฒฐ๊ณผ

Emitting 1
1
Emitting 2
2
Done

์–ด๋–ป๊ฒŒ simple ํ•จ์ˆ˜์—์„œ ์ˆซ์ž ๋‘ ๊ฐœ๋งŒ์ด flow์—์„œ ๋‚ด๋ณด๋‚ด๊ฒŒ ๋˜๊ณ  ๊ฒฐ๊ณผ๋ฅผ ๋งŒ๋“œ๋Š”์ง€ ์ฃผ๋ชฉํ•˜์„ธ์š”.

 

Flow ๋นŒ๋”๋“ค

์ด์ „ ์˜ˆ์ œ๋“ค์—์„œ ๋‚˜์˜จ flow{...} ๋นŒ๋”๋Š” ๊ฐ€์žฅ ๊ธฐ์ดˆ์ ์ธ ๊ฒƒ์ž…๋‹ˆ๋‹ค. flow์˜ ๋” ์‰ฌ์šด ์„ ์–ธ์„ ์œ„ํ•˜ ๋‹ค๋ฅธ ๋นŒ๋”๋“ค์ด ์žˆ์Šต๋‹ˆ๋‹ค.

 

flowOf ๋นŒ๋”๋Š” ๊ฐ’์˜ ๊ณ ์ •๋œ ์ง‘ํ•ฉ์„ ๋‚ด๋ณด๋‚ด๋Š” flow๋ฅผ ์ •์˜ํ•ฉ๋‹ˆ๋‹ค.

๋‹ค์–‘ํ•œ collection๋“ค๊ณผ sequence๋“ค์€ .asFlow() ํ™•์žฅ ํ•จ์ˆ˜๋ฅผ ์ด์šฉํ•ด์„œ flow๋กœ ๋ณ€ํ™˜๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

1๋ถ€ํ„ฐ 3๊นŒ์ง€์˜ ์ˆซ์ž๋ฅผ ์ถœ๋ ฅํ•˜๋Š” ์˜ˆ์ œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™์ด ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {
    // Convert an integer range to a flow
    (1..3).asFlow().collect { value -> println(value) } 
}

๊ฒฐ๊ณผ

1
2
3

 

์ค‘๊ฐ„ flow ์—ฐ์‚ฐ์ž๋“ค

collection๊ณผ sequence์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ Flow๋Š” ์—ฐ์‚ฐ์ž๋ฅผ ๊ฐ€์ง€๊ณ  ๋ณ€ํ˜•๋  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž๋“ค์€ upstream flow์— ์ ์šฉ๋˜์–ด downstream flow๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค. ์ด ์—ฐ์‚ฐ์ž๋“ค์€ flow์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ ์ฐจ๊ฐ‘์Šต๋‹ˆ๋‹ค. ์ด๋Ÿฌํ•œ ์—ฐ์‚ฐ์ž๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ๊ฒƒ์€ ์ž์ฒด๋กœ suspending ํ•จ์ˆ˜๋Š” ์•„๋‹™๋‹ˆ๋‹ค. ๋น ๋ฅด๊ฒŒ ๋™์ž‘ํ•˜๊ณ , ์ƒˆ๋กญ๊ฒŒ ๋ณ€ํ˜•๋œ flow์˜ ์ •์˜๋ฅผ ๋ฐ˜ํ™˜ํ•ฉ๋‹ˆ๋‹ค.

 

๊ธฐ์ดˆ ์—ฐ์‚ฐ์ž๋“ค์€ map, filter ๊ฐ™์ด ์นœ์ˆ™ํ•œ ์ด๋ฆ„๋“ค์„ ๊ฐ€์ง€๊ณ  ์žˆ์Šต๋‹ˆ๋‹ค. ์‹œํ€€์Šค์™€์˜ ์ค‘์š”ํ•œ ์ฐจ์ด์ ์€ ์ด ์—ฐ์‚ฌ์ž๋“ค ์•ˆ์˜ ์ฝ”๋“œ ๋ธ”๋ก๋“ค์€ suspending ํ•จ์ˆ˜๋“ค์„ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค.

 

์˜ˆ๋ฅผ๋“ค์–ด์„œ, ์ž…๋ ฅ ์š”์ฒญ์˜ flow๋Š” map ์—ฐ์‚ฐ์ž๋กœ ๊ฒฐ๊ณผ๋ฅผ ๋งคํ•‘ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ์š”์ฒญ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ฒƒ์ด suspending ํ•จ์ˆ˜๋กœ ๊ตฌํ˜„๋˜๋Š”, ์‹œ๊ฐ„์ด ๊ธธ๊ฒŒ ๊ฑธ๋ฆฌ๋Š” ์—ฐ์‚ฐ์ผ ๊ฒฝ์šฐ์—๋„ ๋ง์ž…๋‹ˆ๋‹ค. 

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

          
suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

๋‹ค์Œ 3์ค„์„ ๋งŒ๋“ค๊ณ , ๊ฐ ์ค„์€ ๊ฐ 1์ดˆ ํ›„์— ๋“ฑ์žฅํ•ฉ๋‹ˆ๋‹ค.

response 1
response 2
response 3

๋ณ€ํ™˜ ์—ฐ์‚ฐ์ž

flow ๋ณ€ํ™˜ ์—ฐ์‚ฐ์ž๋“ค ์ค‘์—์„œ, ๊ฐ€์žฅ ์ผ๋ฐ˜์ ์ธ ๊ฒƒ์€ transform์ด๋ผ ๋ถˆ๋ฆฝ๋‹ˆ๋‹ค. map, filter ๊ฐ™์ด ๊ฐ„๋‹จํ•œ ๋ณ€ํ™˜์„ ๋ชจ๋ฐฉํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋  ์ˆ˜ ์žˆ๊ณ , ๋” ๋ณต์žกํ•œ ๋ณ€ํ™˜๋„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. transform ์—ฐ์‚ฐ์ž๋ฅผ ์ด์šฉํ•ด์„œ, ์ž„์˜์˜ ํšŸ์ˆ˜๋งŒํผ ์ž„์˜ ๊ฐ’์„ ๋‚ด๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

 

์˜ˆ๋ฅผ ๋“ค์–ด์„œ, transform์„ ์ด์šฉํ•ด ์‹œ๊ฐ„์ด ๊ธธ๊ฒŒ ๊ฑธ๋ฆฌ๋Š” ๋น„๋™๊ธฐ ์š”์ฒญ์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์ „์— string์„ ๋‚ด๋ณด๋‚ผ ์ˆ˜ ์žˆ๊ณ  ์‘๋‹ต์„ ๋’ค์ด์–ด ๋ณด๋‚ผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .transform { request ->
            emit("Making request $request") 
            emit(performRequest(request)) 
        }
        .collect { response -> println(response) }
}

๊ฒฐ๊ณผ

Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

ํฌ๊ธฐ ์ œํ•œ ์—ฐ์‚ฐ์ž

take์™€ ๊ฐ™์ด ํฌ๊ธฐ ์ œํ•œ ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž๋Š” ํ•ด๋‹นํ•˜๋Š” ์ œํ•œ์— ๋„๋‹ฌํ•˜๋ฉด flow์˜ ์ˆ˜ํ–‰์„ ์ทจ์†Œํ•ฉ๋‹ˆ๋‹ค. ์ฝ”๋ฃจํ‹ด์—์„œ ์ทจ์†Œ๋Š” ํ•ญ์ƒ  exception์„ ๋ฐœ์ƒํ•˜๋ฉด์„œ ์ˆ˜ํ–‰๋ฉ๋‹ˆ๋‹ค. ๊ทธ๋ž˜์„œ ๋ชจ๋“  ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ ํ•จ์ˆ˜๋“ค(try {...} finally {...} ๋ธ”๋ก๋“ค)์€ ๋ณดํ†ต ์ทจ์†Œํ•˜๋Š” ๊ฒฝ์šฐ์— ์—ฐ์‚ฐํ•ฉ๋‹ˆ๋‹ค.    

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}

๊ฒฐ๊ณผ

1
2
Finally in numbers

์ด ์ฝ”๋“œ์˜ ์ถœ๋ ฅ ๊ฒฐ๊ณผ๋Š” numbers() ํ•จ์ˆ˜ ์•ˆ์˜ flow{...} body์˜ ์ˆ˜ํ–‰์ด ๋‘ ๋ฒˆ์งธ ์ˆซ์ž๋ฅผ ๋‚ด๋ณด๋‚ธ ์ดํ›„์— ์ค‘์ง€๋œ ๊ฒƒ์„ ๋ช…ํ™•ํ•˜๊ฒŒ ๋ณด์—ฌ์ค๋‹ˆ๋‹ค. 

 

Terminal flow ์—ฐ์‚ฐ์ž๋“ค

flow์—์„œ Terminal ์—ฐ์‚ฐ์ž๋“ค์€ flow์˜ collection์„ ์‹œ์ž‘ํ•˜๋Š” suspending ํ•จ์ˆ˜๋“ค์ด๋‹ค. collect ํ•จ์ˆ˜๊ฐ€ ๊ฐ€์žฅ ๋Œ€ํ‘œ์ ์ธ ๊ฒƒ์ด๊ณ , ๋” ์‰ฌ์šด ๋‹ค๋ฅธ terminal ์—ฐ์‚ฐ์ž๋“ค์ด ์žˆ๋‹ค.

 

  • toList, toSet์€ ๋‹ค์–‘ํ•œ collection๋“ค๋กœ ๋ณ€ํ™˜์‹œํ‚จ๋‹ค.
  • first ๋กœ ์ฒซ ๊ฐ’์„ ๊ตฌํ•˜๊ณ , single๋กœ ๊ฐ’ ํ•˜๋‚˜๋ฅผ ๋‚ด๋ณด๋‚ธ๋‹ค.
  • reduce์™€ fold๋กœ flow๋ฅผ ๊ฐ’์œผ๋กœ ์ค„์ธ๋‹ค.

์˜ˆ์‹œ๊ฐ€ ์—ฌ๊ธฐ ์žˆ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5                           
        .reduce { a, b -> a + b } // sum them (terminal operator)
    println(sum)     
}

๊ฒฐ๊ณผ

55

flow๋Š” ์ˆœ์ฐจ์ ์ž…๋‹ˆ๋‹ค.

flow์˜ ๊ฐ ๊ฐœ๋ณ„ collection์€ ์—ฌ๋Ÿฌ flow๋“ค์—์„œ ์—ฐ์‚ฐํ•˜๋Š” ํŠน์ •ํ•œ ์—ฐ์‚ฌ์ž๋“ค์ด ์‚ฌ์šฉ๋˜์ง€ ์•Š๋Š”ํ•œ ์ˆœ์ฐจ์ ์œผ๋กœ ์ˆ˜ํ–‰๋ฉ๋‹ˆ๋‹ค. collection์€ terminal ์—ฐ์‚ฌ์ž๋ฅผ ํ˜ธ์ถœํ•˜๋Š” ์ฝ”๋ฃจํ‹ด์—์„œ ๋ฐ”๋กœ ์ž‘์—…ํ•ฉ๋‹ˆ๋‹ค. ๋ณดํ†ต ์ƒˆ๋กœ์šด ์ฝ”๋ฃจํ‹ด์ด ์‹œ์ž‘๋˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค. ๋ฐฉ์ถœ๋œ ๊ฐ๊ฐ์˜ ๊ฐ’์€ upsteam์—์„œ downstream๊นŒ์ง€ ๋ชจ๋“  ์ค‘๊ฐ„ ์—ฐ์‚ฐ์ž๋“ค์— ์˜ํ•ด์„œ ์ฒ˜๋ฆฌ๋˜๊ณ  ๋‚˜์„œ ํ›„์— terminal ์—ฐ์‚ฐ์ž๋กœ ์ „๋‹ฌ๋ฉ๋‹ˆ๋‹ค.

 

์ง์ˆ˜ ์ •์ˆ˜๋“ค์„ ํ•„ํ„ฐ๋งํ•˜๊ณ  ๋‚˜์„œ ๋ฌธ์ž์—ด๋กœ ๋งคํ•‘ํ•˜๋Š” ๋‹ค์Œ ์˜ˆ๋ฅผ ๋ณด์„ธ์š”.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main() = runBlocking<Unit> {

    (1..5).asFlow()
        .filter {
            println("Filter $it")
            it % 2 == 0              
        }              
        .map { 
            println("Map $it")
            "string $it"
        }.collect { 
            println("Collect $it")
        }                      
}

๊ฒฐ๊ณผ

Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow context

flow์˜ collection์€ ํ˜ธ์ถœํ•˜๋Š” ์ฝ”๋ฃจํ‹ด์˜ context์—์„œ ํ•ญ์ƒ ๋ฐœ์ƒํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, simple flow๊ฐ€ ์žˆ๊ณ , simple flow์˜ ์„ธ๋ถ€ ๊ตฌํ˜„์‚ฌํ•ญ๊ณผ ๊ด€๋ จ์—†์ด ๋‹ค์Œ ์ฝ”๋“œ๋Š” ์ด ์ฝ”๋“œ์˜ ์ž‘์„ฑ์ž์— ์˜ํ•ด ์ง€์ •๋œ context์—์„œ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค. 

withContext(context) {
    simple().collect { value ->
        println(value) // run in the specified context
    }
}

flow์˜ ์ด๋Ÿฌํ•œ ์†์„ฑ์„ context perservation(๋ณด์กด)์ด๋ผ๊ณ  ๋ถ€๋ฆ…๋‹ˆ๋‹ค.

๋ณดํ†ต, flow{...} ๋นŒ๋” ์•ˆ์˜ ์ฝ”๋“œ๋Š” ํ•ด๋‹นํ•˜๋Š” flow์˜ collector์— ์˜ํ•ด ์ œ๊ณต๋œ context ์•ˆ์—์„œ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค. ์˜ˆ๋ฅผ ๋“ค์–ด, ํ˜ธ์ถœ๋œ ์“ฐ๋ ˆ๋“œ๋ฅผ printํ•˜๊ณ  ์ˆซ์ œ 3๊ฐœ๋ฅผ ๋‚ด๋ณด๋‚ด๋Š” simple ํ•จ์ˆ˜์˜ ๊ตฌํ˜„์„ ๊ณ ๋ คํ•ด๋ณด์ฃ .

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}  

๊ฒฐ๊ณผ

[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collect ๊ฐ€ main thread์—์„œ ํ˜ธ์ถœ๋˜์—ˆ๊ธฐ ๋•Œ๋ฌธ์—, simple์˜ flow์˜ body๋Š” main thread์—์„œ ํ˜ธ์ถœ๋˜์—ˆ์Šต๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ ์‹คํ–‰ context๋ฅผ ์‹ ๊ฒฝ์“ฐ์ง€ ์•Š๊ณ  ํ˜ธ์ถœ์ž๋ฅผ ์ฐจ๋‹จํ•˜์ง€ ์•Š๋Š”, ๋น ๋ฅธ ์‹คํ–‰, ๋˜๋Š” ๋น„๋™๊ธฐ์ ์ธ ์ฝ”๋“œ๋ฅผ ์œ„ํ•œ ์™„๋ฒฝํ•œ ๊ธฐ๋ณธ๊ฐ’์ž…๋‹ˆ๋‹ค.

 

์ž˜๋ชป๋œ ๋ฐฉ์ถœ(emission) withContext

๊ทธ๋Ÿฌ๋‚˜, ์žฅ๊ธฐ๊ฐ„ CPU๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์ฝ”๋“œ๋Š” Dispatchers.Default์˜ context์—์„œ ์‹คํ–‰๋  ํ•„์š”๊ฐ€ ์žˆ๊ณ , UI๋ฅผ ์—…๋ฐ์ดํŠธํ•˜๋Š” ์ฝ”๋“œ๋Š” Dispatchers.Main์˜ context์—์„œ ์‹คํ–‰๋˜์–ด์•ผ ํ•ฉ๋‹ˆ๋‹ค. ๋Œ€๊ฐœ, withContext๋Š” ์ฝ”ํ‹€๋ฆฐ ์ฝ”๋ฃจํ‹ด์„ ์‚ฌ์šฉํ•˜๋Š” ์ฝ”๋“œ์—์„œ context๋ฅผ ๋ณ€๊ฒฝํ•˜๋Š”๋ฐ ์‚ฌ์šฉ๋˜๋Š”๋ฐ์š”, ๊ทธ๋Ÿฌ๋‚˜ flow{...} ๋นŒ๋” ์•ˆ์˜ ์ฝ”๋“œ๋Š” context ๋ณด์ „ ์†์„ฑ์„ ์ง€์ผœ์•ผ ํ•˜๊ณ , ๋‹ค๋ฅธ context์—์„œ emitํ•˜๋Š” ๊ฑด ํ—ˆ์šฉํ•˜์ง€ ์•Š์Šต๋‹ˆ๋‹ค.

 

๋‹ค์Œ ์ฝ”๋“œ๋ฅผ ์‹œ๋„ํ•ด๋ณด์ฃ .

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
                      
fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
} 

๊ฒฐ๊ณผ. ๋‹ค์Œ๊ณผ ๊ฐ™์€ exception์ด ๋‚˜์˜ต๋‹ˆ๋‹ค.

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
	at ...

> flow ์•ˆ์—์„œ withContext ์‚ฌ์šฉ ๋ชปํ•จ.

 

flowOn ์—ฐ์‚ฐ์ž

์˜ˆ์™ธ๋กœ flow emission(๋ฐฉ์ถœ)์˜ context๋ฅผ ๋ฐ”๊พธ๋Š” ๋ฐ ์‚ฌ์šฉํ•˜๋Š” flowOn ํ•จ์ˆ˜๊ฐ€ ์žˆ์Šต๋‹ˆ๋‹ค. flow์˜ context๋ฅผ ๋ฐ”๊พธ๋Š”๋ฐ ์˜ฌ๋ฐ”๋ฅธ ๋ฐฉ๋ฒ•์ด ์•„๋ž˜์˜ ์˜ˆ์— ๋ณด์—ฌ์ง‘๋‹ˆ๋‹ค. ์•„๋ž˜์˜ ์˜ˆ๋Š” ํ•ด๋‹นํ•˜๋Š” ์“ฐ๋ ˆ๋“œ๊ฐ€ ์–ด๋–ป๊ฒŒ ์ผํ•˜๋Š”์ง€ ๋ณด์—ฌ์ฃผ๊ณ  ์ด๋ฆ„์„ ์ถœ๋ ฅํ•ฉ๋‹ˆ๋‹ค.

 

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")
           
fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}   

๊ฒฐ๊ณผ

[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

collection์ด ๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ์—์„œ ๋™์ž‘ํ•˜๋Š” ๋™์•ˆ, flow{...}๊ฐ€ ๋ฐฑ๊ทธ๋ผ์šด๋“œ ์“ฐ๋ ˆ๋“œ์—์„œ ์–ด๋–ป๊ฒŒ ์ผํ•˜๋Š”์ง€๋ฅผ ์•Œ์•„๋‘์„ธ์š”.

 

์—ฌ๊ธฐ์„œ ๊ด€์ฐฐํ•ด์•ผ ํ•˜๋Š” ๋‹ค๋ฅธ ๊ฒƒ์€ flowOn ์—ฐ์‚ฐ์ž๊ฐ€ flow์˜ ๊ธฐ๋ณธ ์ˆœ์ฐจ์ ์ธ ํŠน์„ฑ์„ ๋ฐ”๊พธ์—ˆ๋‹ค๋Š” ๊ฒƒ์ž…๋‹ˆ๋‹ค. ํ˜„์žฌ collection์€ ํ•œ ์ฝ”๋ฃจํ‹ด("coroutine#1)์—์„œ ๋™์ž‘ํ•˜๊ณ , collectํ•˜๋Š” ์ฝ”๋ฃจํ‹ด๊ณผ ํ•จ๊ป˜ ๋™์‹œ์— ๋‹ค๋ฅธ ์“ฐ๋ ˆ๋“œ์—์„œ ๋Œ๊ณ  ์žˆ๋Š” ๋˜ ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด("coroutine#2")์—์„œ emission์ด ์ผ์–ด๋‚ฌ์Šต๋‹ˆ๋‹ค. flowOn ์—ฐ์‚ฌ์ž๋Š” context์—์„œ CoroutineDispatcher๋ฅผ ๋ฐ”๊ฟ”์•ผ ํ•  ๋•Œ, upstream flow๋ฅผ ์œ„ํ•ด ๋˜ ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด์„ ์ƒ์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค.

 

Buffering

๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด๋“ค์•ˆ์—์„œ ํ•œ flow์˜ ๋‹ค๋ฅธ ๋ถ€๋ถ„๋“ค์„ ์‹คํ–‰ํ•˜๋Š” ๊ฒƒ์€ flow๋ฅผ ์ˆ˜์ง‘ํ•˜๋Š” ์ „์ฒด์ ์ธ ์‹œ๊ฐ„์˜ ๊ด€์ ์—์„œ๋Š” ์œ ์šฉํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค. ํŠนํžˆ ์žฅ์‹œ๊ฐ„ ์‹œํ–‰ํ•˜๋Š” ๋น„๋™๊ธฐ ์—ฐ์‚ฐ์„ ํฌํ•จํ•ด์„œ ๋ง์ด์ฃ . ์˜ˆ๋ฅผ ๋“ค์–ด์„œ, simple flow์˜ emission์ด ๋Š๋ฆด ๋•Œ, ํ•œ ์š”์†Œ๊ฐ€ ์ƒ์„ฑ๋˜๋Š”๋ฐ 100ms๊ฐ€ ๊ฑธ๋ฆฌ๊ณ , ์ˆ˜์ง‘ํ•˜๋Š” ๊ฒƒ๋„ ๋Š๋ ค์„œ ํ•œ ์š”์†Œ๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š”๋ฐ 300ms๊ฐ€ ๊ฑธ๋ฆฌ๋Š” ๊ฒฝ์šฐ๋ฅผ ๊ณ ๋ คํ•ด๋ด…์‹œ๋‹ค. 

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}

๊ฒฐ๊ณผ

1
2
3
Collected in 1222 ms

buffer ์—ฐ์‚ฐ์ž๋ฅผ ์‚ฌ์šฉํ•ด์„œ ์ˆœ์ฐจ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๋Š” ๋Œ€์‹ ์—, simple flow์˜ emitting ์ฝ”๋“œ์™€ collectiong ์ฝ”๋“œ๋ฅผ ๋™์‹œ์— ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .buffer() // buffer emissions, don't wait
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

๊ฒฐ๊ณผ

1
2
3
Collected in 1057 ms

๊ฐ™์€ ์ˆซ์ž๋“ค์„ ๋” ๋น ๋ฅด๊ฒŒ ์ƒ์„ฑํ•˜๊ณ , ์ฒ˜๋ฆฌ ํŒŒ์ดํ”„๋ผ์ธ์„ ํšจ๊ณผ์ ์œผ๋กœ ์ƒ์„ฑํ•ด์„œ, ์ฒซ ๋ฒˆ์งธ ์ˆซ์ž๋Š” 100ms๋งŒ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ๊ทธ ํ›„์— ๊ฐ ์ˆซ์ž๋ฅผ ์ฒ˜๋ฆฌํ•˜๋Š”๋ฐ์— 300ms๋งŒ ๊ฑธ๋ฆฝ๋‹ˆ๋‹ค.

 

Conflation / ํ•ฉ์„ฑ

flow๊ฐ€ ์—ฐ์‚ฐ์˜ ๋ถ€๋ถ„์ ์ธ ๊ฒฐ๊ณผ๋“ค์ด๋‚˜ ์—ฐ์‚ฐ ์ƒํƒœ ์—…๋ฐ์ดํŠธ์˜ ์ผ๋ถ€ ๊ฒฐ๊ณผ๋“ค์„ ๋‚˜ํƒ€๋‚ผ ๋•Œ, ๋ชจ๋“  ๊ฐ’์„ ์ฒ˜๋ฆฌํ•  ํ•„์š”๋Š” ์—†์Šต๋‹ˆ๋‹ค. ๋Œ€์‹ ์— ๊ฐ€์žฅ ์ตœ๊ทผ๊ฐ’๋งŒ ์ฒ˜๋ฆฌํ•˜๋ฉด ๋˜์ฃ . ์ด ๊ฒฝ์šฐ์—, conflate ์—ฐ์‚ฐ์ž๊ฐ€ collector๊ฐ€ ๊ฐ’๋“ค์„ ์ฒ˜๋ฆฌํ•˜๋Š”๋ฐ ๋„ˆ๋ฌด ์˜ค๋ž˜ ๊ฑธ๋ฆด ๋•Œ์— ์ค‘๊ฐ„ ๊ฐ’๋“ค์„ ๊ฑด๋„ˆ๋›ฐ๊ฒŒ ํ•ด์ค๋‹ˆ๋‹ค. ์ด์ „ ์˜ˆ์ œ๋ฅผ ๊ธฐ๋ฐ˜์„ ์ž‘์„ฑํ–ˆ์Šต๋‹ˆ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collect { value -> 
                delay(300) // pretend we are processing it for 300 ms
                println(value) 
            } 
    }   
    println("Collected in $time ms")
}

๊ฒฐ๊ณผ

1
3
Collected in 756 ms

์ฒซ ๋ฒˆ์งธ ์ˆซ์ž๊ฐ€ ์•„์ง๋„ ์ฒ˜๋ฆฌ ์ค‘์ผ ๋™์•ˆ์— ๋‘๋ฒˆ์งธ, ์„ธ๋ฒˆ์งธ ๊ฐ’์ด ์ด๋ฏธ ์ƒ์‚ฐ๋˜์—ˆ๊ณ ์š”, ๊ทธ๋ž˜์„œ ๋‘ ๋ฒˆ์งธ ๊ฐ’์€ ์••์ถ•(conflated/๊ฒฐํ•ฉ)๋˜์–ด ๊ฐ€์žฅ ์ตœ๊ทผ ๊ฐ’(์„ธ๋ฒˆ์งธ ๊ฐ’)์ด collector๋กœ ์ „๋‹ฌ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.

 

 ์ตœ์‹  ๊ฐ’ ์ฒ˜๋ฆฌํ•˜๊ธฐ

Conflation์€ emitter์™€ collector ๋‘˜ ๋‹ค ๋Š๋ฆด ๋•Œ ์ฒ˜๋ฆฌ ์†๋„๋ฅผ ๋น ๋ฅด๊ฒŒ ํ•˜๋Š” ํ•˜๋‚˜์˜ ๋ฐฉ๋ฒ•์ž…๋‹ˆ๋‹ค. ๋‚ด๋ณด๋‚ด์ง„(emitted) ๊ฐ’๋“ค์„ ๋ˆ„๋ฝ์‹œํ‚ค๋Š” ๊ฑฐ์ฃ . ๋‹ค๋ฅธ ๋ฐฉ๋ฒ•์€ ๋Š๋ฆฐ collector๋ฅผ ์ทจ์†Œํ•˜๊ณ  ์ƒˆ ๊ฐ’์„ ๋‚ด๋ณด๋‚ด๋Š” ๋งค ์‹œ๊ฐ„๋งˆ๋‹ค collector๋ฅผ ์žฌ์‹œ์ž‘ํ•˜๋Š” ๊ฒ๋‹ˆ๋‹ค. ์ด๊ฒƒ์€ xxx ์—ฐ์‚ฐ์ž์˜ ๊ผญ ํ•„์ˆ˜์ ์ธ ๋กœ์ง์„ ์ˆ˜ํ–‰ํ•˜๋‚˜ ์ƒˆ ๊ฐ’์„ ๋ฐ›์•„ block ์•ˆ์˜ ์ฝ”๋“œ๋Š” ์ทจ์†Œํ•˜๋Š”  xxxLatest ์—ฐ์‚ฐ์ž๋“ค์˜ ์ผ์ข…์ž…๋‹ˆ๋‹ค. 

 

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.*

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple()
            .collectLatest { value -> // cancel & restart on the latest value
                println("Collecting $value") 
                delay(300) // pretend we are processing it for 300 ms
                println("Done $value") 
            } 
    }   
    println("Collected in $time ms")
}

 

๊ฒฐ๊ณผ

Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 684 ms

 

collectLatest์˜ body๋Š” 300ms๊ฐ€ ๊ฑธ๋ฆฌ๋‚˜, ์ƒˆ ๊ฐ’๋“ค์€ ๋งค 100ms๋งˆ๋‹ค ๋ฐฉ์ถœ๋˜๋‹ˆ, block์€ ๋งค ์ƒˆ ๊ฐ’๋งˆ๋‹ค ์‹คํ–‰ํ•˜๋‚˜ ๋งˆ์ง€๋ง‰ ๊ฐ’์œผ๋กœ๋งŒ ์™„๋ฃŒ๋˜๋Š” ๊ฑธ ๋ณผ ์ˆ˜ ์žˆ์Šต๋‹ˆ๋‹ค.