๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ

๋นˆ ๊ตฌ๋ฉ ์ฑ„์šฐ๊ธฐ

[Kotlin Coroutines] Channels

kotlinlang.org/docs/channels.html#channel-basics

 

Channels - Help | Kotlin

 

kotlinlang.org

kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/

 

Channel - kotlinx-coroutines-core

Channel Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java’s BlockingQueue, but it has suspending operations instead of blocking ones an

kotlin.github.io

 

kotlinx-coroutines-core / kotlinx.coroutines.channels / Channel

interface Channel<E> : SendChannel<E>, ReceiveChannel<E> (source)

 

Channel์€ ๊ฐ’์˜ stream์„ ์ „๋‹ฌํ•œ๋‹ค. sender๊ณผ reciver์‚ฌ์ด์˜ ํ†ต์‹ ์„ ์œ„ํ•œ non-blocking ์›์‹œํƒ€์ž….

 

Channel ๊ธฐ๋ณธ

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")

 

abstract suspend fun send(element: E): Unit

 

element๋ฅผ ์ฑ„๋„๋กœ ๋ณด๋‚ธ๋‹ค.

์ด channel์˜ ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“์ฐจ๊ฑฐ๋‚˜ ์กด์žฌํ•˜์ง€ ์•Š์œผ๋ฉด ํ˜ธ์ถœ๋ถ€๋ฅผ ์ผ์‹œ์ค‘์ง€์‹œํ‚จ๋‹ค(suspend).

channel์ด close ๋์„ ๊ฒฝ์šฐ์— ํ˜ธ์ถœ๋˜๋ฉด  exception์„ ๋ฐœ์ƒ์‹œํ‚จ๋‹ค.

 

abstract val isClosedForSend: Boolean

 

close ํ•จ์ˆ˜์˜ ์‹คํ–‰์— ์˜ํ•ด channel์ด close ๋˜๋ฉด true๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. true๋ฅผ ๋ฐ˜ํ™˜ํ•  ๋•Œ send ํ•จ์ˆ˜, offer ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด exception์ด ๋ฐœ์ƒํ•œ๋‹ค.

 

abstract suspend fun receive() : E

 

channel์ด ๋น„์–ด์žˆ์ง€ ์•Š๋‹ค๋ฉด channel์˜ element ํ•˜๋‚˜๋ฅผ ๋ฐ˜ํ™˜ํ•˜๊ณ  ์ œ๊ฑฐํ•œ๋‹ค. 

channel์ด ๋น„์–ด์žˆ์œผ๋ฉด caller๋ฅผ ์ค‘๋‹จํ•œ๋‹ค.

channel์ด close ๋œ ์ƒํƒœ์ด๋ฉด ClosedReceiveChannelException์„ ๋ฐœ์ƒ์‹œํ‚จ๋‹ค.

๋‹ค๋ฅธ exception์œผ๋กœ ์ธํ•ด channel์ด close ๋˜๋ฉด, ์ด channel์€ failed channel์ด๋ผ ๋ถ€๋ฅธ๋‹ค.

 

abstract val isClosedForReceive: Boolean

 

SendChannel ์ธก์—์„œ close ํ•จ์ˆ˜๊ฐ€ ์‹คํ–‰๋˜์–ด channel์ด close ๋˜๊ณ  ์ด์ „์— ๋ฐ›์€ ๋ชจ๋“  element๋“ค์ด ์ด๋ฏธ ๋‹ค ๋ณด๋‚ด์ง€๋ฉด true๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. true๋ฅผ ๋ฐ˜ํ™˜ํ•  ๋•Œ receive ํ•จ์ˆ˜๋ฅผ ํ˜ธ์ถœํ•˜๋ฉด ClosedReceiveChannelException์ด ๋ฐœ์ƒํ•œ๋‹ค. ๋‹ค๋ฅธ exception์— ์˜ํ•ด channel์ด close ๋  ๊ฒฝ์šฐ๋„ channel์ด ๋‹ซํžŒ ๊ฒƒ์œผ๋กœ ๊ฐ„์ฃผํ•˜๋‚˜, ๊ทธ channel์€ failed channel๋กœ ๋ถ€๋ฅธ๋‹ค. 

 

abstract fun offer(element: E) : Boolean

 

์šฉ๋Ÿ‰์„ ์ดˆ๊ณผํ•˜์ง€ ์•Š์œผ๋ฉด element๋ฅผ ์ด channel์— ์ฆ‰์‹œ ์ถ”๊ฐ€ํ•˜๊ณ  true๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

์šฉ๋Ÿ‰์„ ์ดˆ๊ณผํ•˜๋ฉด false๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค. 

send์˜ ๋™๊ธฐ์ ์ธ ๋ณ€ํ˜•์ด๋‹ค. 

channel์ด send์— ๋‹ซํ˜€์ ธ ์žˆ๋‹ค๋ฉด (isClosedForSend ๊ฐ’์ด true) exception์„ ๋ฐœ์ƒ์‹œํ‚จ๋‹ค.

offer๋กœ false๋ฅผ ๋ฐ˜ํ™˜๋ฐ›์œผ๋ฉด, ์ด element๊ฐ€ consumer์—๊ฒŒ ์ „๋‹ฌ๋˜์ง€ ์•Š์•˜๋‹ค๋Š” ๊ฒƒ์„ ๋ณด์žฅํ•˜๊ณ  onUndeliveredElement๋ฅผ ํ˜ธ์ถœํ•˜์ง€ ์•Š๋Š”๋‹ค.

channel์ด ๋‹ซํ˜€์žˆ๋‹ค๋ฉด, exception์„ ํ˜ธ์ถœํ•˜๊ธฐ ์ „์— onUndeliveredElement๋ฅผ ํ˜ธ์ถœํ•œ๋‹ค. 

Channel ๋‹ซ๊ธฐ(close)์™€ ๋ฐ˜๋ณต(iteration)

val channel = Channel<Int>()
launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")

 

abstract suspend fun close(cause: Throwable? = null): Boolean

 

channel์—  ๋” ์ด์ƒ ์ „์†ก์‹œํ‚ฌ(send) element๊ฐ€ ์—†๋‹ค๊ณ  ์•Œ๋ฆฌ๊ธฐ ์œ„ํ•ด channel์„ ๋‹ซ์„(close) ์ˆ˜ ์žˆ๋‹ค.

close๋Š” ๋ฉฑ๋“ฑ ์—ฐ์‚ฐ์ž๋ผ์„œ ์ด ํ•จ์ˆ˜๋ฅผ ์ตœ์ดˆ ์‹คํ–‰ํ•œ ์ดํ›„์˜ ์‹คํ–‰ ํ›„์—๋Š” ์•„๋ฌด ๋ณ€ํ™”๋„ ์—†๊ณ  false๋ฅผ ๋ฆฌํ„ด์‹œํ‚จ๋‹ค. 

๊ฐœ๋…์ ์œผ๋กœ ์ด ํ•จ์ˆ˜๋Š” ํŠน์ˆ˜ "close token"๋ฅผ channel์— ๋ณด๋‚ธ๋‹ค. ์ด close token์„ iteration์€ ๋ฐ›์ž๋งˆ์ž ์ค‘๋‹จํ•˜๊ธฐ ๋•Œ๋ฌธ์—, close ์ „์— ๋ฐ›์€ ๋ชจ๋“  element๋“ค์ด receive๋กœ ํ˜ธ์ถœ๋˜์–ด ๋‚˜๊ฐ”๋‹ค๋Š” ๊ฒƒ์ด ๋ณด์žฅ๋œ๋‹ค.

 

→๋ฉฑ๋“ฑ์— ๊ด€ํ•ด ์ •๋ฆฌํ•œ ๋‚ด์šฉ ํ™•์ธ

 

* Channel์˜ ํ•จ์ˆ˜

abstract operator fun iterator(): ChannelIterator<E>

 

for loop๋ฅผ ์ด์šฉํ•ด ์ด channel์—์„œ์˜ element๋“ค์„ ๋ฐ›๋Š” ์ƒˆ iterator๋ฅผ ๋ฐ˜ํ™˜ํ•œ๋‹ค.

๋ณดํ†ต exception cause ๋ฐœ์ƒ์ด ์—†์ด isClosedForReceived ํ•จ์ˆ˜์˜ ๋ฐ˜ํ™˜๊ฐ’์ด true์ผ ๋•Œ Iteration์€ ์™„๋ฃŒํ•˜๊ณ , channel์ด fail๋  ๊ฒฝ์šฐ close๋ฅผ ๋ฐœ์ƒ์‹œํ‚จ exception์„ throw ํ•œ๋‹ค. 

 

-> Channel close() ํ˜ธ์ถœ ํ›„ for ๋ฃจํ”„ ๋Œ๋ฆด ์ˆ˜ ์žˆ๋‹ค.

 

channel ์ƒ์„ฑ์ž(producer)๋“ค ๋งŒ๋“ค๊ธฐ

์ฝ”๋ฃจํ‹ด์ด ์—ฐ์†๋œ element๋“ค์„ ์ƒ์‚ฐํ•˜๋Š” ํŒจํ„ด(producer-consumer ํŒจํ„ด์˜ ์ผ๋ถ€)์€ ํ”ํžˆ ์“ฐ์ธ๋‹ค. ์ด producer๋Š” channel์„ ๋งค๊ฐœ๋ณ€์ˆ˜๋กœ ํ•˜๋Š” ํ•จ์ˆ˜๋กœ ๋งŒ๋“ค ์ˆ˜๋Š” ์ˆ˜ ์žˆ์ง€๋งŒ, ์ด๋Š” ํ•จ์ˆ˜๊ฐ€ ๋ฐ˜๋“œ์‹œ ๊ฐ’์„ ๋ฐ˜ํ™˜ํ•ด์•ผ ํ•œ๋‹ค๋Š” ์ƒ์‹๊ณผ ์–ด๊ธ‹๋‚œ๋‹ค.(... ๊ทธ๋ž˜์„œ ๊ทธ๋ ‡๊ฒŒ ํ•˜๋ฉด ์•ˆ๋œ๋‹ค๊ณ  ํ•˜๋Š” ๊ฑฐ์ฃ ? (๏ผ _๏ผ ;) ์•ˆ๋˜๊ธฐ ๋•Œ๋ฌธ์— produce ํ•จ์ˆ˜๋ฅผ ์†Œ๊ฐœํ•ด์ฃผ๋Š” ๊ฑฐ์ฃ ?)

 

-> producer-consumer ํŒจํ„ด ํ™•์ธ

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

 

@ExperimentalCoroutinesApi fun <E> CoroutineScope.produce(
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>

 

producer๋ฅผ ์‰ฝ๊ฒŒ ๋งŒ๋“œ๋Š” produce ๋ผ๋Š” ์ด๋ฆ„์˜ coroutine builder๊ฐ€ ์žˆ๋‹ค. ๋ฌธ์„œ๋ณด๊ธฐ

 

produce - kotlinx-coroutines-core

produce Launches a new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine. The scope of the cor

kotlin.github.io

๋ฐ์ดํ„ฐ ์ŠคํŠธ๋ฆผ์„ ์ƒ์„ฑํ•ด channel๋กœ ๋ณด๋‚ด๋Š” ์ƒˆ ์ฝ”๋ฃจํ‹ด์„ ์‹œ์ž‘ํ•œ๋‹ค.

์ฝ”๋ฃจํ‹ด์— ์˜ํ•ด ์ƒ์„ฑ๋œ element๋“ค์„ receiveํ•˜๋Š” ๋ฐ ์“ฐ์ด๋Š” ReceiveChannel์ด ๋ฐ˜ํ™˜๋œ๋‹ค.

 

context : CoroutineScope.coroutineContext

capacity : channel ๋ฒ„ํผ์˜ ์šฉ๋Ÿ‰ - ๊ฒฐ๊ณผ channel์ด ํŠน์ • capacity(์šฉ๋Ÿ‰) ๊ฐ’์— ๋”ฐ๋ผ ๋‹ฌ๋ผ์ง„๋‹ค. 

block : coroutine ์ฝ”๋“œ - ProducerScope๋ผ์„œ ์ฝ”๋ฃจํ‹ด์ด ๋ฐ”๋กœ send()๋ฅผ ํ˜ธ์ถœํ•  ์ˆ˜ ์žˆ๋‹ค. ์ฝ”๋ฃจํ‹ด์ด ์™„๋ฃŒ๋˜๋ฉด channel์€ ๋‹ซํžŒ๋‹ค. receive channel์ด ์ทจ์†Œ๋˜๋ฉด ์‹คํ–‰ ์ค‘์ด๋˜ coroutine์ด ์ทจ์†Œ๋œ๋‹ค.

 

exception ๋ฐœ์ƒ ์‹œ coroutine์€ channel์„ ๋‹ซ๊ณ  ๊ฒฐ๊ณผ channel์€ fail ๋œ๋‹ค.

 

@ExperimentalCoroutinesApi interface ProducerScope<in E> : 
    CoroutineScope,
    SendChannel<E>

 

๋ฌธ์„œ๋ณด๊ธฐ

produce coroutine builder๋ฅผ ์œ„ํ•œ scope. ์ด coroutine์ด elemet๋“ค์„ ๋ณด๋‚ด๋Š”(send) channel์˜ ์ฐธ์กฐ.

 

suspend fun <E> ReceiveChannel<E>.consueEach(action: (E) -> Unit) : Unit

 

cosumer ์ชฝ์ธ channel์—์„œ for ๋ฃจํ”„๋ฅผ ๋Œ€์ฒดํ•˜๋Š” extension function.

receive๋œ ๊ฐ element๋ฅผ ๋Œ€์ƒ์œผ๋กœ action์„ ์ˆ˜ํ–‰ํ•˜๊ณ  action ์‹คํ–‰ ํ›„์— channel์„ cancelํ•œ๋‹ค. 

 

Pipelines

ํŒŒ์ดํ”„๋ผ์ธ ํ•˜๋‚˜๋Š” ์ฝ”๋ฃจํ‹ด ํ•˜๋‚˜๊ฐ€ ๋ฌดํ•œ๋Œ€์˜ ์ŠคํŠธ๋ฆผ ๊ฐ’์„ ์ƒ์„ฑํ•˜๋Š” ํŒจํ„ด์ด๋‹ค.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // 1๋ถ€ํ„ฐ ์‹œ์ž‘ํ•˜๋Š” ์ •์ˆ˜์˜ ๋ฌดํ•œ๋Œ€ ์ŠคํŠธ๋ฆผ
}

๋˜ ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด๋“ค์€ ๊ทธ ์ŠคํŠธ๋ฆผ์„ ์†Œ๋น„ํ•˜๊ณ , ๋ช‡ ๊ฐ€์ง€ ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๊ณ  ๋‹ค๋ฅธ ๊ฒฐ๊ณผ๊ฐ’๋“ค์„ ์ƒ์„ฑํ•œ๋‹ค.

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x) // ์ œ๊ณฑ
}

๋ฉ”์ธ ์ฝ”๋“œ๋Š” ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹œ์ž‘ํ•˜๊ณ  ์—ฐ๊ฒฐํ•œ๋‹ค. 

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val numbers = produceNumbers() // 1 ๋ถ€ํ„ฐ ์ฆ๊ฐ€ํ•ด ์ƒ์„ฑ๋˜๋Š” ์ •์ˆ˜๋“ค
    val squares = square(numbers) // ์ •์ˆ˜๋“ค ์ œ๊ณฑ
    repeat(5) {
        println(squares.receive()) // ์ฒ˜์Œ 5๊ฐœ๋งŒ ํ”„๋ฆฐํŠธ
    }
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

๊ฒฐ๊ณผ

1
4
9
16
25
Done!

-> ํ•œ ์ฝ”๋ฃจํ‹ด์ด ์ฑ„๋„ ํ•˜๋‚˜์— stream producting ํ•˜๋Š” ์ค‘์— ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด์ด ๊ทธ ์ฑ„๋„์˜ stream์„ cosuming ํ•ด์„œ ๋‹ค๋ฅธ channel์„ ์ƒ์„ฑํ–ˆ๋‹ค.

 

์†Œ์ˆ˜์™€ ํŒŒ์ดํ”„๋ผ์ธ

์ฝ”๋ฃจํ‹ด๋“ค์ด ํ•˜๋‚˜์˜ ํŒŒ์ดํ”„๋ผ์ธ์„ ์ด์šฉํ•ด์„œ ์†Œ์ˆ˜๋ฅผ ์ƒ์„ฑํ•˜๋Š” ์˜ˆ์ œ๋ฅผ ํ†ตํ•ด ํŒŒ์ดํ”„๋ผ์ธ๋“ค์˜ ๊ทนํ•œ๊นŒ์ง€ ์‚ดํŽด ๋ณด๊ฒ ๋‹ค. 

 

๋ฌดํ•œ๋Œ€์˜ ์ˆซ์ž๋“ค๋กœ ์‹œ์ž‘ํ•ด๋ณด์ž.

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

์•„๋ž˜์˜ ํŒŒ์ดํ”„๋ผ์ธ ๋‹จ๊ณ„๋Š” ๋“ค์–ด์˜ค๋Š” ์ˆซ์ž ์ŠคํŠธ๋ฆผ์„ ํ•„ํ„ฐ๋งํ•˜๋Š”๋ฐ, ์ฃผ์–ด์ง„ ์†Œ์ˆ˜๋กœ ๋‚˜๋ˆ ์ง€๋Š” ๋ชจ๋“  ์ˆซ์ž๋“ค์„ ์‚ญ์ œํ•œ๋‹ค.

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

์ˆซ์ž 2๋ถ€ํ„ฐ ์‹œ์ž‘ํ•˜๋Š” ์ˆซ์ž ์ŠคํŠธ๋ฆผ์œผ๋กœ ์‹œ์ž‘ํ•˜๋Š” ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•˜๊ณ , ํ˜„์žฌ ์ฑ„๋„์—์„œ ์†Œ์ˆ˜๋ฅผ ๊ฐ€์ ธ๋‹ค๊ฐ€, ์ฐพ์€ ๊ฐ ์†Œ์ˆ˜๋งˆ๋‹ค ์ƒˆ๋กœ์šด ํŒŒ์ดํ”„ ๋‹จ๊ณ„๋ฅผ ์‹œ์ž‘ํ•œ๋‹ค.

numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...

์•„๋ž˜ ์˜ˆ์‹œ๋Š” ์ฒซ 10๊ฐœ์˜ ์†Œ์ˆ˜๋“ค์„ ํ”„๋ฆฐํŠธํ•˜๊ณ , ๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ์˜ ์ฝ˜ํ…์ŠคํŠธ์—์„œ ์ „์ฒด ํŒŒ์ดํ”„๋ผ์ธ์„ ์‹คํ–‰ํ•œ๋‹ค. ๋ชจ๋“  ์ฝ”๋ฃจํ‹ด๋“ค์ด  ๋ฉ”์ธ runBlocking ์ฝ”๋ฃจํ‹ด์˜ ์Šค์ฝ”ํ”„ ์•ˆ์—์„œ ์‹œ์ž‘๋˜๊ธฐ ๋•Œ๋ฌธ์— ์šฐ๋ฆฌ๋Š” ์‹œ์ž‘๋œ ๋ชจ๋“  ์ฝ”๋ฃจํ‹ด๋“ค์˜ ๋ช…์‹œ์ ์ธ ๋ฆฌ์ŠคํŠธ๋ฅผ ์œ ์ง€ํ•  ํ•„์š”๊ฐ€ ์—†๋‹ค.

 

์šฐ๋ฆฌ๋Š” cancelChildren ํ™•์žฅํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•ด ์ฒ˜์Œ 10๊ฐœ์˜ ์†Œ์ˆ˜๋ฅผ ํ”„๋ฆฐํŠธ ํ•œ ํ›„์— ๋ชจ๋“  ์ž๋…€ ์ฝ”๋ฃจํ‹ด๋“ค์„ ์ทจ์†Œํ•œ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish    
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

๊ฒฐ๊ณผ

2
3
5
7
11
13
17
19
23
29

 

์šฐ๋ฆฌ๋Š” ๋˜‘๊ฐ™์€ ํŒŒ์ดํ”„๋ผ์ธ์„ standard library์˜ iterator ์ฝ”๋ฃจํ‹ด ๋นŒ๋”๋ฅผ ์ด์šฉํ•ด ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค.

 

produce -> iterator ๊ต์ฒด

send -> yield ๊ต์ฒด

receive -> next ๊ต์ฒด

ReceiveChannel -> Iterator ๊ต์ฒด

-> ์ฝ”๋ฃจํ‹ด ์Šค์ฝ”ํ”„ ์ œ๊ฑฐ

runBlocking์„ ์‚ฌ์šฉํ•  ํ•„์š”์—†๋‹ค.

 

๊ต์ฒดํ•œ ์ฝ”๋“œ. ๊ฒฐ๊ณผ ๊ฐ’์€ ์œ„์˜ ๊ฒฐ๊ณผ ๊ฐ’๊ณผ ๊ฐ™๋‹ค.

fun main() {
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.next()
        println(prime)
        cur = filter(cur, prime)
    }
}

fun numbersFrom(start: Int) = iterator<Int> {
    var x = start
    while (true) yield(x++) // infinite stream of integers from start
}

fun filter(numbers: Iterator<Int>, prime: Int) = iterator<Int> {
    for (x in numbers) if (x % prime != 0) yield(x)
}

 

๊ทธ๋Ÿฌ๋‚˜ ์œ„์™€ ๊ฐ™์ด ์ฑ„๋„๋“ค์„ ์ด์šฉํ•œ ํŒŒ์ดํ”„๋ผ์ธ์˜ ์ด์ ์€ Dispatchers.Default context์—์„œ ์ฝ”๋“œ๋ฅผ ์‹คํ–‰ํ•  ๋•Œ multiple CPU ์ฝ”์–ด๋“ค์„ ์‹ค์ œ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค๋Š” ๊ฒƒ์ด๋‹ค. 

 

์–ด์จŒ๋“ , ์†Œ์ˆ˜๋“ค์„ ์ฐพ๋Š” ๊ทน๋‹จ์ ์ด๊ณ  ์‹ค์šฉ์ ์ด์ง€๋„ ์•Š๋Š” ์˜ˆ์ œ์˜€๋‹ค. ์‹ค์ œ๋กœ ํŒŒ์ดํ”„๋ผ์ธ๋“ค์€ ์ •๋ง๋กœ ๋ช‡๊ฐ€์ง€ ๋‹ค๋ฅธ suspending invocation(ํ˜ธ์ถœ, ์›๊ฒฉ ์„œ๋น„์Šค๋กœ ๋น„๋™๊ธฐ ํ˜ธ์ถœ ๊ฐ™์ด)์„ ํฌํ•จํ•˜๊ณ , ์ด ํŒŒ์ดํ”„๋ผ์ธ๋“ค์€ sequence/iterator๋ฅผ ์‚ฌ์šฉํ•ด์„œ ๋งŒ๋“ค์–ด์งˆ ์ˆ˜๋Š” ์—†๋‹ค. ์™œ๋ƒํ•˜๋ฉด seqeunce/iterator๋Š” ์™„์ „ํžˆ ๋น„๋™๊ธฐ์ธ produce์™€ ๋‹ค๋ฅด๊ฒŒ ์ž„์˜์ ์ธ ์ค‘๋‹จ์„ ํ—ˆ์šฉํ•˜์ง€ ์•Š๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค. 

 

fun <T> runBlocking(

    context: CoroutineContext  = EmptyCoroutineContext,

    block: suspend CoroutineScope.() -> T

): T

 

context: ์ฝ”๋ฃจํ‹ด์˜ ์ฝ˜ํ…์ŠคํŠธ. ๊ธฐ๋ณธ๊ฐ’์€ ํ˜„์žฌ ์“ฐ๋ ˆ๋“œ์˜ ์ด๋ฒคํŠธ ๋ฃจํ”„์ด๋‹ค.

block: ์ฝ”๋ฃจํ‹ด ์ฝ”๋“œ 

 

์ƒˆ๋กœ์šด ์ฝ”๋ฃจํ‹ด ํ•˜๋‚˜๋ฅผ ์‹คํ–‰ํ•˜๊ณ  ์ด ์ฝ”๋ฃจํ‹ด์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ํ˜„์žฌ ์“ฐ๋ ˆ๋“œ๋ฅผ ์ธํ„ฐ๋ŸฝํŠธ๋กœ ๋ง‰๋Š”๋‹ค. ์ด ํ•จ์ˆ˜๋ฅผ ์ฝ”๋ฃจํ‹ด์—์„œ ์‚ฌ์šฉํ•ด์„œ๋Š” ์•ˆ๋œ๋‹ค. ์ด๊ฒƒ์€ ์ผ๋ฐ˜ ์ฐจ๋‹จ ์ฝ”๋“œ์™€ ์ผ์‹œ์ค‘๋‹จ ์Šคํƒ€์ผ๋กœ ์ž‘์„ฑ๋œ ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ์™€ ์—ฐ๊ฒฐํ•ด์„œ ์ฃผ์š” ๊ธฐ๋Šฅ๋“ค๊ณผ ํ…Œ์ŠคํŠธ์—์„œ ์‚ฌ์šฉ๋˜๋„๋ก ์„ค๊ณ„๋˜์—ˆ๋‹ค.

 

์ด ๋นŒ๋”์˜ ๊ธฐ๋ณธ CoroutineDispatcher๋Š” ์ด ์ฝ”๋ฃจํ‹ด์ด ์™„์„ฑ๋  ๋•Œ๊นŒ์ง€ ์ค‘๋‹จ๋œ ์“ฐ๋ ˆ๋“œ์—์„œ ์—ฐ์†์ ์œผ๋กœ ์ง„ํ–‰ํ•˜๋Š” ์ด๋ฒคํŠธ ๋ฃจํ”„์˜ ๋‚ด๋ถ€ ๊ตฌํ˜„์ด๋‹ค. 

 

context ๋ณ€์ˆ˜์— CoroutineDispatcher๊ฐ€ ๋ช…์‹œ์ ์œผ๋กœ ์ง€์ •๋˜๋ฉด ํ˜„์žฌ ์“ฐ๋ ˆ๋“œ๊ฐ€ ๋ง‰ํ˜”์„ ๋™์•ˆ ์ง€์ •๋œ dispatcher์˜ context๊ฐ€ ์ƒˆ๋กœ์šด ์ฝ”๋ฃจํ‹ด์„ ์‹คํ–‰ํ•œ๋‹ค. ๋งŒ์•ฝ ์ง€์ •๋œ dispatcher๊ฐ€ ๋˜ ๋‹ค๋ฅธ runBlocking์˜ ์ด๋ฒคํŠธ ๋ฃจํ”„๋ผ๋ฉด, ์ด ํ˜ธ์ถœ์€ ๋ฐ”๊นฅ ๋ฃจํ”„๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

 

์ด ๋ง‰ํžŒ ์“ฐ๋ ˆ๋“œ๊ฐ€ ์ธํ„ฐ๋ŸฝํŠธ๋˜๋ฉด, ์ฝ”๋ฃจํ‹ด job์€ ์ทจ์†Œ๋˜๊ณ , runBlocking์€ InterruptedException์„ ํ˜ธ์ถœํ•œ๋‹ค.

 

fun CoroutineConetext.cancelChildren(

    cause: CancellationException? = null

): Unit

 

์„ ํƒ์ ์ธ ์ทจ์†Œ ์›์ธ์„ ๊ฐ€์ง€๊ณ  job ์Šค์Šค๋กœ์˜ ์ƒํƒœ๋ฅผ ๊ฑด๋“œ๋ฆฌ์ง€ ์•Š๊ณ , ์ด context ์•ˆ์˜ Job์˜ ๋ชจ๋“  ์ž๋…€๋“ค์„ ์ทจ์†Œํ•œ๋‹ค.

 

*Standard library์˜ ํ•จ์ˆ˜

fun <T> iterator(

    block: suspend SeqeunceScope<T>.() -> Unit

): Iterator<T>

 

๊ฐ’์„ ํ•˜๋‚˜์”ฉ ๋Š๋ฆฌ๊ฒŒ ์ƒ์„ฑํ•˜๋Š” Iterator๋ฅผ ๋นŒ๋“œํ•œ๋‹ค.

์˜ˆ์ œ

import kotlin.test.*

fun main(args: Array<String>) {
    val collection = listOf(1, 2, 3)
    val wrappedCollection = object : AbstractCollection<Any>() {
        override val size: Int = collection.size + 2

        override fun iterator(): Iterator<Any> = iterator {
            yield("first")
            yieldAll(collection)
            yield("last")
        }
    }

    println(wrappedCollection) // [first, 1, 2, 3, last]
}

์˜ˆ์ œ

fun main(args: Array<String>) {
    val iterable = Iterable {
        iterator {
            yield(42)
            yieldAll(1..5 step 2)
        }
    }
    val result = iterable.mapIndexed { index, value -> "$index: $value" }
    println(result) // [0: 42, 1: 1, 2: 3, 3: 5]

    // can be iterated many times
    repeat(2) {
        val sum = iterable.sum()
        println(sum) // 51
    }
}

 

Fan-out

ํ•˜๋‚˜์˜ producer ์ฝ”๋ฃจํ‹ด์˜ channel 1๊ฐœ, ์ฒ˜๋ฆฌ ์ฝ”๋ฃจํ‹ด ์—ฌ๋Ÿฌ๊ฐœ.

์—ฌ๋Ÿฌ ์ฝ”๋ฃจํ‹ด๋“ค์€ ๋™์ผํ•œ ์ฑ„๋„์„ ์ˆ˜์‹ (receive)ํ•ด์„œ ์„œ๋กœ๊ฐ„์— ์ž‘์—…์„ ๋ถ„๋ฐฐํ•  ์ˆ˜ ์žˆ๋‹ค.

๋งค ์ดˆ๋งˆ๋‹ค ๊ฐ’์ด ์ฆ๊ฐ€ํ•˜๋Š” ์ •์ˆ˜๋ฅผ ์ƒ์‚ฐํ•˜๋Š” producer ์ฝ”๋ฃจํ‹ด์œผ๋กœ ์‹œ์ž‘ํ•œ๋‹ค.

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

์—ฌ๋Ÿฌ processor ์ฝ”๋ฃจํ‹ด๋“ค์„ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋‹ค. ์˜ˆ์ œ ์ฝ”๋“œ์—์„œ๋Š” ์ฝ”๋ฃจํ‹ด์˜ id์™€ ์ˆ˜์‹ ํ•œ ์ˆซ์ž๋ฅผ printํ•œ๋‹ค.

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }
}

์ด์ œ 5๊ฐœ์˜ ํ”„๋กœ์„ธ์„œ๋“ค์„ ์‹œ์ž‘ํ•˜๊ณ  ์•ฝ 1์ดˆ ๋™์•ˆ ๋™์ž‘์‹œํ‚จ๋‹ค. ์–ด๋–ค ์ผ์ด ์ผ์–ด๋‚˜๋Š”์ง€ ๋ณด์ž.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}

์ถœ๋ ฅ์€ ๋‹ค์Œ๊ณผ ๋น„์Šทํ•  ๊ฒƒ์ด๋‹ค. ๊ฐ ์ •์ˆ˜๋ฅผ ์ˆ˜์‹ ํ•œ ํ”„๋กœ์„ธ์„œ์˜ id๋Š” ๋‹ค๋ฅผ ์ˆ˜ ์žˆ์ง€๋งŒ ๋ง์ด๋‹ค.

Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

producer ์ฝ”๋ฃจํ‹ด์„ ์ทจ์†Œํ•˜๋ฉด ํ•ด๋‹น ์ฑ„๋„๋„ ๋‹ซํžŒ๋‹ค. ๋”ฐ๋ผ์„œ ๊ฒฐ๊ณผ์ ์œผ๋กœ ํ”„๋กœ์„ธ์„œ ์ฝ”๋ฃจํ‹ด๋“ค์ด ์ž‘์—…ํ•˜๊ณ  ์žˆ๋Š” ์ฑ„๋„์˜ iteration์€ ์ข…๋ฃŒ๋œ๋‹ค.

launchProcessor ์ฝ”๋“œ์—์„œ fan-out์„ ์ˆ˜ํ–‰ํ•˜๊ธฐ ์œ„ํ•ด for ๋ฃจํ”„๋กœ channel์„ ๋ช…์‹œ์ ์œผ๋กœ ๋ฐ˜๋ณตํ•˜๋Š” ๋ฐฉ๋ฒ•์— ์ฃผ๋ชฉํ•ด๋ผ. consumEach์™€ ๋‹ค๋ฅด๊ฒŒ, ์ด for ๋ฃจํ”„ ํŒจํ„ด์€ ์—ฌ๋Ÿฌ ์ฝ”๋ฃจํ‹ด๋“ค์„ ์‚ฌ์šฉํ•˜๊ธฐ์— ์™„๋ฒฝํ•˜๊ฒŒ ์•ˆ์ „ํ•˜๋‹ค. ๋งŒ์•ฝ ํ”„๋กœ์„ธ์„œ ์ฝ”๋ฃจํ‹ด๋“ค ์ค‘ ํ•˜๋‚˜๊ฐ€ ์‹คํŒจํ•œ๋‹ค๋ฉด, ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด๋“ค์€ ์—ฌ์ „ํžˆ ๊ณ„์† ์ฑ„๋„์„ ์ฒ˜๋ฆฌํ•  ๊ฒƒ์ธ ๋ฐ˜๋ฉด, consumeEach๋ฅผ ํ†ตํ•ด ์ž‘์„ฑ๋œ ํ”„๋กœ์„ธ์„œ๋Š” ํ•ญ์ƒ ๊ธฐ๋ณธ์ด ๋˜๋Š” ์ฑ„๋„์„ ์ •์ƒ์ ์ด๊ฑฐ๋‚˜ ๋น„์ •์ƒ์ ์ธ ์™„๋ฃŒ์— ์†Œ๋น„(์ทจ์†Œ)๋ฅผ ํ•œ๋‹ค.

 

-> fan-out์—์„œ ์—ฌ๋Ÿฌ processor ์ฝ”๋ฃจํ‹ด๋“ค ์ค‘ ํ•˜๋‚˜๊ฐ€ ์‹คํŒจํ•ด๋„ ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด๋“ค์ด ์ž‘์—…์„ ์ •์ƒ์ ์œผ๋กœ ์ง„ํ–‰ํ•  ์ˆ˜ ์žˆ์œผ๋‚˜, consumeEach๋กœ ์ž‘์„ฑ๋œ processor ์ฝ”๋ฃจํ‹ด์€ ์ฝ”๋ฃจํ‹ด์ด ์‹คํŒจํ•˜๋ฉด ์ž‘์—… ์ค‘์ธ channel์ด ๋‹ซํžˆ๊ธฐ ๋•Œ๋ฌธ์— ๋‹ค๋ฅธ ์ฝ”๋ฃจํ‹ด๋“ค์˜ ์ž‘์—…์ด ์ค‘๋‹จ๋œ๋‹ค.

 

Fan-in

์—ฌ๋Ÿฌ ์ฝ”๋ฃจํ‹ด๋“ค์ด ํ•˜๋‚˜์˜ channel์— ์ ‘๊ทผ ๊ฐ€๋Šฅ.

์—ฌ๋Ÿฌ ์ฝ”๋ฃจํ‹ด๋“ค์ด ๋™์ผํ•œ ํ•˜๋‚˜์˜ channel์— ๊ฐ’์„ sendํ•  ์ˆ˜ ์žˆ๋‹ค. 
์˜ˆ๋ฅผ ๋“ค์–ด, string๊ฐ’๋“ค์˜ channel ํ•˜๋‚˜๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๊ณ , ์ง€์—ฐ ์‹œ๊ฐ„์„ ๊ฐ€์ง€๊ณ  ํŠน์ • string์„ ์ด channel์— ๋ฐ˜๋ณต์ ์œผ๋กœ sendํ•˜๋Š” suspending function์ด ์žˆ๋‹ค๊ณ  ํ•˜์ž.

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

๋งŒ์•ฝ string๊ฐ’๋“ค์„ ๋ณด๋‚ด๋Š” ์ฝ”๋ฃจํ‹ด ๋‘ ๊ฐœ๋ฅผ ์‹œ์ž‘ํ•œ๋‹ค๋ฉด ์–ด๋–ค ์ผ์ด ์ผ์–ด๋‚˜๋Š” ์ง€ ๋ณด์ž.(์ด ์˜ˆ์ œ์—์„œ ๋ฉ”์ธ ์ฝ”๋ฃจํ‹ด์˜ ์ž๋…€๋“ค๋กœ์„œ ๋ฉ”์ธ ์“ฐ๋ ˆ๋“œ์˜ context์—์„œ ์ด ์ฝ”๋ฃจํ‹ด๋“ค์„ ์‹คํ–‰ํ–ˆ๋‹ค.)

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
        println(channel.receive())
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish
}

suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
        delay(time)
        channel.send(s)
    }
}

์ถœ๋ ฅ์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค.

foo
foo
BAR!
foo
foo
BAR!

Buffered channels

์ง€๊ธˆ๊นŒ์ง€ ์‚ดํŽด๋ณธ channel๋“ค์€ ๋ฒ„ํผ๊ฐ€ ์—†์—ˆ๋‹ค. ๋ฒ„ํผ๊ฐ€ ์—†๋Š” ์ฑ„๋„๋“ค์€ sender์™€ receiver๊ฐ€ ๋งŒ๋‚  ๋•Œ ์š”์†Œ๋“ค์„ ์ „์†กํ•œ๋‹ค.(์ผ๋ช… ๋ž‘๋ฐ๋ทฐ. rendezvous ์ฑ„๋„์ด๋ผ ๋ถ€๋ฅด๋”๋ผ). send๊ฐ€ ๋จผ์ € ์‹คํ–‰๋˜๋ฉด receive๋Š” ํ˜ธ์ถœ๋  ๋•Œ๊นŒ์ง€ ๊ธฐ๋‹ค๋ฆฌ๊ณ , receive๊ฐ€ ๋จผ์ € ํ˜ธ์ถœ๋˜๋ฉด send๊ฐ€ ํ˜ธ์ถœ๋  ๋•Œ๊นŒ์ง€ ์ค‘๋‹จ๋œ๋‹ค.

 

Channel ํŒฉํ† ๋ฆฌ ํ•จ์ˆ˜์™€ produce ๋นŒ๋” ๋‘˜๋‹ค capacity ๋งค๊ฐœ๋ณ€์ˆ˜๋ฅผ ๋ฒ„ํผ์‚ฌ์ด์ฆˆ๋กœ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค. ๋ฒ„ํผ๋Š” ์ค‘๋‹จ ์ „์— ์—ฌ๋Ÿฌ ์š”์†Œ๋“ค์„ sender๊ฐ€ ์ „์†กํ•˜๊ฒŒ ํ•˜๊ณ , ๋ฒ„ํผ๊ฐ€ ๊ฐ€๋“์ฐฐ ๋•Œ ์ฐจ๋‹จ๋˜๋Š” ๊ฒƒ์€ ์ง€์ •๋œ capacity๋ฅผ ๊ฐ€์ง„ BlockingQueue์™€ ๋น„์Šทํ•˜๋‹ค. 

 

fun <E> Channel(

    capacity: Int = RENDEZVOUS,

    onBufferedOverflow: BufferOverflow = BufferOverflow.SUSPEND,

    pnUndeliveredElement: (E) -> Unit = null

): Channel<E>

 

capacity - ์–‘์ˆ˜์˜ ์ฑ„๋„ ์šฉ๋Ÿ‰ ๋˜๋Š” Channel.Facotry์—์„œ ์ •์˜๋œ ์ƒ์ˆ˜ ์ค‘ ํ•˜๋‚˜

onBufferOverflow - ๋ฒ„ํผ ์˜ค๋ฒ„ํ”Œ๋กœ์šฐ์—์„œ์˜ ์•ก์…˜. ์„ ํƒ์‚ฌํ•ญ. ๊ธฐ๋ณธ๊ฐ’์€ ์ „์†ก ์‹œ๋„๋ฅผ ์ผ์‹œ์ค‘๋‹จํ•œ๋‹ค. capacity >=0์ด๊ฑฐ๋‚˜ capacity == Channel.BUFFERED ์ผ ๋•Œ๋งŒ ์ง€์›๋œ๋‹ค. ์ตœ์†Œ ํ•˜๋‚˜์˜ ๋ฒ„ํผ๋ง๋œ ์š”์†Œ๊ฐ€ ์žˆ์œผ๋ฉด ์ฑ„๋„์„ ์•”์‹œ์ ์œผ๋กœ ์ƒ์„ฑํ•œ๋‹ค.

onUndeliveredElement - ์„ ํƒ์‚ฌํ•ญ์ธ ํ•จ์ˆ˜. ์š”์†Œ๊ฐ€ ์ „์†ก๋˜์—ˆ์ง€๋งŒ ์†Œ๋น„์ž๋กœ ์š”์†Œ๊ฐ€ ์ „์†ก๋˜์ง€ ๋ชปํ•  ๋•Œ ํ˜ธ์ถœ๋œ๋‹ค.

capacity < -2 ๊ฒฝ์šฐ์— IllegalArgumentException ์„ ํ˜ธ์ถœํ•œ๋‹ค.

 

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            println("Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
            println("Sending $it done")
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine    
}

๊ฒฐ๊ณผ

Sending 0
Sending 0 done
Sending 1
Sending 1 done
Sending 2
Sending 2 done
Sending 3
Sending 3 done
Sending 4

์ฒ˜์Œ 4๊ฐœ์˜ ์š”์†Œ๋“ค์€ ๋ฒ„ํผ์— ์ถ”๊ฐ€๋˜์—ˆ๊ณ , sender๋Š” ๋‹ค์„ฏ๋ฒˆ์งธ ์š”์†Œ๋ฅผ sendํ•˜๊ธฐ ์œ„ํ•ด ์ค‘๋‹จ๋œ๋‹ค.

 

Channel๋“ค์€ ๊ณตํ‰ํ•˜๋‹ค

์—ฌ๋Ÿฌ ์ฝ”๋ฃจํ‹ด๋“ค์—์„œ ํ˜ธ์ถœ ์ˆœ์„œ์— ๊ด€๋ จํ•ด ์ฑ„๋„์˜ ์ „์†ก(send)๊ณผ ์ˆ˜์‹ (receive) ์—ฐ์‚ฐ์€ ๊ณตํ‰ํ•˜๋‹ค. FIFO๋กœ ์ง„ํ–‰๋œ๋‹ค. receive๋ฅผ ํ˜ธ์ถœํ•œ ์ฒซ๋ฒˆ์งธ ์ฝ”๋ฃจํ‹ด์ด ์š”์†Œ๋ฅผ ๊ฐ€์ ธ๊ฐ„๋‹ค. 

์•„๋ž˜ ์˜ˆ์‹œ์—์„œ ๋‘ ์ฝ”๋ฃจํ‹ด "ping"๊ณผ "pong"์€ "ball" ๊ฐ์ฒด๋ฅผ ๊ณต์šฉ "table" ์ฑ„๋„์—์„œ ์ˆ˜์‹ ํ•˜๊ณ  ์žˆ๋‹ค.

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

๊ฒฐ๊ณผ

ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

"ping" ์ฝ”๋ฃจํ‹ด์ด ์ฒ˜์Œ ์‹œ์ž‘๋˜์—ˆ๊ณ , ๊ณต์„ ์ˆ˜์‹ ํ•˜๋Š” ์ฒซ ์ฝ”๋ฃจํ‹ด์ด๋‹ค. ๋น„๋ก "ping" ์ฝ”๋ฃจํ‹ด์ด ํ…Œ์ด๋ธ”์— ๊ณต์„ ์ „์†กํ•œ ํ›„์— ๋ฐ”๋กœ ๊ณต์„ ๋ฐ›๋Š” ๊ฒƒ์„ ์‹œ์ž‘ํ–ˆ์œผ๋‚˜, ๊ณต์€ "pong" ์ฝ”๋ฃจํ‹ด์—๊ฒŒ ์ˆ˜์‹ ๋˜์—ˆ๋‹ค. ์™œ๋ƒํ•˜๋ฉด ์ด๋ฏธ "pong"์ด ๊ณต์„ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์žˆ์—ˆ๊ธฐ ๋•Œ๋ฌธ์ด๋‹ค.

 

์ข…์ข… channel๋“ค์˜ ์‹คํ–‰์ด ์‹คํ–‰ ํ”„๋กœ๊ทธ๋žจ์˜ ํŠน์„ฑ์— ๋”ฐ๋ผ unfairํ•œ ๊ฒฝ์šฐ๋„ ์žˆ๋‹ค. 

 

Ticker channels

Ticker channel์€ ์ด ์ฑ„๋„์˜ ๋งˆ์ง€๋ง‰ ์š”์†Œ์˜ ์†Œ๋น„ ์ดํ›„์— delay๊ฐ€ ๋ฐœ์ƒํ•  ๋•Œ๋งˆ๋‹ค Unit์„ ์ƒ์„ฑํ•˜๋Š” ํŠน๋ณ„ํ•œ ๋ž‘๋ฐ๋ทฐ ์ฑ„๋„(๋ฒ„ํผ๋ง์—†๋Š” ์ฑ„๋„)์ด๋‹ค.

์“ธ๋ฐ์—†๋Š” ๋…๋ฆฝํ˜• ์ฑ„๋„๋กœ ๋ณด์ด์ง€๋งŒ, ๋ณต์žกํ•œ ์‹œ๊ฐ„ ๊ธฐ๋ฐ˜ produce ํŒŒ์ดํ”„๋ผ์ธ์„ ์ƒ์„ฑํ•˜๊ฑฐ๋‚˜ windowing(? ์ดํ•ด ๋ชปํ–ˆ์Šต๋‹ˆ๋‹ค.), ์‹œ๊ฐ„์ข…์† ์ฒ˜๋ฆฌ๋ฅผ ํ•˜๋Š” ์—ฐ์‚ฐ๋“ค์„ ํ•˜๋Š”๋ฐ ์œ ์šฉํ•˜๋‹ค.

Ticker channel์„ ์ƒ์„ฑํ•˜๋ ค๋ฉด ํŒฉํ† ๋ฆฌ ํ•จ์ˆ˜ ticker๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค. ๋”์ด์ƒ์˜ ์š”์†Œ๋“ค์ด ํ•„์š”ํ•˜์ง€ ์•Š๋‹ค๋ฉด ReceiveChannel.cancel ํ•จ์ˆ˜๋ฅผ ์‚ฌ์šฉํ•œ๋‹ค.

 

@ObsoleteCoroutinesApi fun ticker(

    delayMillis: Long,

    initialDelayMillis: Long = delayMillis,

    context: CoroutineContext = EmptyCoroutineContext,

    mode: TickerMode = TickerMode.FIXED_PERIOD

): ReceiveChannel<Unit>

 

์ฒ˜์Œ ์ง€์—ฐ ์‹œ๊ฐ„ ํ›„์— ์ฒซ ์•„์ดํ…œ(Unit)์„ ์ƒ์‚ฐํ•˜๊ณ  ์ง€์—ฐ์‹œ๊ฐ„์ด ์žˆ๋Š” ํ›„์† ์•„์ดํ…œ๋“ค์„ ์ƒ์‚ฐํ•˜๋Š” ์ฑ„๋„์„ ์ƒ์„ฑํ•œ๋‹ค. 

๊ฒฐ๊ณผ ์ฑ„๋„์€ ๋ž‘๋ฐ๋ทฐ ์ฑ„๋„์ด๋‹ค. ์ด ์ฑ„๋„์˜ receiver๊ฐ€ ์ฑ„๋„์˜ element๋“ค์˜ ์ˆ˜์‹ ์„ ๋”ฐ๋ผ๊ฐ€์ง€ ๋ชปํ•˜๋ฉด, element๋“ค์€ backpressure ๋•Œ๋ฌธ์— send๋˜์ง€ ๋ชปํ•œ๋‹ค. ์ด๋Ÿฌํ•œ ๊ฒฝ์šฐ์˜ ticker์˜ ์‹ค์ œ ํƒ€์ด๋ฐ ํ–‰๋™์€ ๊ธฐ๋ณธ๊ฐ’์ด TickerMode.FIXED_PERIOD์ธ mode ๋งค๊ฐœ๋ณ€์ˆ˜์— ์˜ํ•ด ํ†ต์ œ๋œ๋‹ค. 

cancel ํ˜ธ์ถœ ํ›„์— ๋ฐ”๋กœ element ์ƒ์‚ฐ์„ ์ค‘๋‹จํ•œ๋‹ค.

ticker๋Š” consumer์˜ ๊ฐ€๋Šฅํ•œ ์ผ์‹œ์ค‘์ง€๋ฅผ ์ธ์ง€ํ•˜๊ณ , ๊ธฐ๋ณธ์ ์œผ๋กœ ์ผ์‹œ์ค‘์ง€๊ฐ€ ๋ฐœ์ƒํ•˜๋ฉด ๋‹ค์Œ์— ์ƒ์„ฑ๋˜๋Š” ์š”์†Œ์˜ ์ง€์—ฐ ์‹œ๊ฐ„์„ ์กฐ์ •ํ•ด์„œ ์ƒ์„ฑ๋œ ์š”์†Œ๋“ค์˜ ๊ณ ์ •๋œ ๋น„์œจ์„ ์œ ์ง€ํ•˜๋ ค๊ณ  ํ•œ๋‹ค. 

 

delayMillis - ๊ฐ ์š”์†Œ๋“ค ์‚ฌ์ด์˜ ๋ฐ€๋ฆฌ์ดˆ ์ง€์—ฐ ์‹œ๊ฐ„

initialDelayMillis - ๋ฐ€๋ฆฌ ์ดˆ์˜ ์ง€์—ฐ ์‹œ๊ฐ„ ํ›„์— ์ฒซ ์š”์†Œ๊ฐ€ ์ƒ์„ฑ๋  ๊ฒƒ์ด๋‹ค. ๊ธฐ๋ณธ ๊ฐ’์€ delayMillis์™€ ๊ฐ™๋‹ค.

context - producing ์ฝ”๋ฃจํ‹ด์˜ context

mode - ์š”์†Œ๋“ค์ด ์ˆ˜์‹ ๋˜์ง€ ์•Š์„ ๋•Œ์˜ ํŠน์ • ํ–‰๋™. ๊ธฐ๋ณธ ๊ฐ’์ด TickerMode.FIXED_PERIOD

 

-> Backpressure ํ™•์ธํ•˜๊ธฐ

 

@ObsoleteCoroutinesApi enum classTickerMode

 

ticker ํ•จ์ˆ˜๋ฅผ ์œ„ํ•œ ๋ชจ๋“œ.

์ฐธ๊ณ : ticker ์ฑ„๋„์€ ํ˜„์žฌ ๊ตฌ์กฐํ™”๋œ ๋™์‹œ์„ฑ๊ณผ ํ†ตํ•ฉ๋˜์ง€ ์•Š์•˜๊ณ , ์ด api๋Š” ํ–ฅํ›„์— ๋ณ€๊ฒฝ๋  ๊ฒƒ์ด๋‹ค.

 

Enum value

FIXED_PERIOD - cosumer๊ฐ€ ๋”ฐ๋ผ์žก์ง€ ๋ชปํ•˜๊ฑฐ๋‚˜ ๋Š๋ฆฌ๋‹ค๋ฉด ๊ณ ์ •๋œ ๊ธฐ๊ฐ„์„ ์œ ์ง€ํ•˜๊ธฐ ์œ„ํ•ด ์ง€์—ฐ ์‹œ๊ฐ„์„ ์กฐ์ •ํ•œ๋‹ค.

FIXED_DELAY -  cosumer๊ฐ€ ๋”ฐ๋ผ์žก์ง€ ๋ชปํ•˜๊ฑฐ๋‚˜ ๋Š๋ฆฌ๋ฉด ์ƒ์„ฑ๋œ element๋“ค ์‚ฌ์ด์˜ ๊ณ ์ •๋œ ์ง€์—ฐ ์‹œ๊ฐ„์„ ์œ ์ง€ํ•œ๋‹ค.

 

-> @ObsoleteCoroutinesApi ํ™•์ธํ•˜๊ธฐ

 

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*

fun main() = runBlocking<Unit> {
    //ticker ์ฑ„๋„ ์ƒ์„ฑ. ์ฒ˜์Œ 0์ดˆ ๋”œ๋ ˆ์ด๋กœ Unit ์•„์ดํ…œ์„ produceํ•˜๊ณ 
    //๊ทธ ํ›„๋กœ๋Š” ๋ชจ๋“  Unit ์•„์ดํ…œ์ด 100์ดˆ ๊ฐ„๊ฒฉ์œผ๋กœ(์ง€์—ฐ ์‹œ๊ฐ„์œผ๋กœ) produceํ•œ๋‹ค.
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)
    //1์ดˆ ํƒ€์ž„์•„์›ƒ receive -> ๋ฐ˜ํ™˜ ๊ฐ’ Unit. ์ƒ์„ฑํ•˜์ž๋งˆ๋‹ค ์•„์ดํ…œ์„ produceํ–ˆ๊ธฐ์—
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // no initial delay

    //50์ดˆ ํƒ€์ž„์•„์›ƒ receive -> ๋ฐ˜ํ™˜ ๊ฐ’ null. ๋Œ€๋žต 51์ดˆ๊นŒ์ง€ produce๋œ ์•„์ดํ…œ์ด ์—†์Œ
    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
    println("Next element is not ready in 50 ms: $nextElement")

    //60์ดˆ ํƒ€์ž„์•„์›ƒ receive -> ๋ฐ˜ํ™˜ ๊ฐ’ Unit. 100์ดˆ ๊ฒฝ์— ์•„์ดํ…œ์ด produce ๋์„ ๊ฒƒ.
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    // cosumer์˜ 150์ดˆ ๋”œ๋ ˆ์ด.
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    // 1์ดˆ ํƒ€์ž„์•„์›ƒ receive -> ๋ฐ˜ํ™˜ ๊ฐ’ Unit.200์ดˆ์— ์ด๋ฏธ ์•„์ดํ…œ์ด produce ๋์„ ๊ฒƒ.
    // 250์ดˆ ๊ฒฝ์— receive.
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    // 60์ดˆ ํƒ€์ž„์•„์›ƒ receiver -> ๋ฐ˜ํ™˜ ๊ฐ’ Unit. 
    // receive ํ˜ธ์ถœ ์‚ฌ์ด์˜ ์ผ์‹œ์ •์ง€(delay)๋ฅผ ๊ณ ๋ คํ•ด์„œ Unit ์š”์†Œ๊ฐ€ ๋นจ๋ฆฌ produce ๋˜์—ˆ๋‹ค.
    // 50์ดˆ๋งŒ์— ์•„์ดํ…œ์ด produce ๋๋‹ค.  
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}

print ๊ฒฐ๊ณผ

Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

 


๋ฐœ๋ฒˆ์—ญ์œผ๋กœ ๊ทธ๋ž˜๋„ ์—ฌ๊ธฐ๊นŒ์ง€ ์ดํ•ดํ–ˆ๋‹ค.