kotlinlang.org/docs/channels.html#channel-basics
kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/
kotlinx-coroutines-core / kotlinx.coroutines.channels / Channel
interface Channel<E> : SendChannel<E>, ReceiveChannel<E> (source)
Channel์ ๊ฐ์ stream์ ์ ๋ฌํ๋ค. sender๊ณผ reciver์ฌ์ด์ ํต์ ์ ์ํ non-blocking ์์ํ์ .
Channel ๊ธฐ๋ณธ
val channel = Channel<Int>()
launch {
// this might be heavy CPU-consuming computation or async logic, we'll just send five squares
for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
abstract suspend fun send(element: E): Unit
element๋ฅผ ์ฑ๋๋ก ๋ณด๋ธ๋ค.
์ด channel์ ๋ฒํผ๊ฐ ๊ฐ๋์ฐจ๊ฑฐ๋ ์กด์ฌํ์ง ์์ผ๋ฉด ํธ์ถ๋ถ๋ฅผ ์ผ์์ค์ง์ํจ๋ค(suspend).
channel์ด close ๋์ ๊ฒฝ์ฐ์ ํธ์ถ๋๋ฉด exception์ ๋ฐ์์ํจ๋ค.
abstract val isClosedForSend: Boolean
close ํจ์์ ์คํ์ ์ํด channel์ด close ๋๋ฉด true๋ฅผ ๋ฐํํ๋ค. true๋ฅผ ๋ฐํํ ๋ send ํจ์, offer ํจ์๋ฅผ ํธ์ถํ๋ฉด exception์ด ๋ฐ์ํ๋ค.
abstract suspend fun receive() : E
channel์ด ๋น์ด์์ง ์๋ค๋ฉด channel์ element ํ๋๋ฅผ ๋ฐํํ๊ณ ์ ๊ฑฐํ๋ค.
channel์ด ๋น์ด์์ผ๋ฉด caller๋ฅผ ์ค๋จํ๋ค.
channel์ด close ๋ ์ํ์ด๋ฉด ClosedReceiveChannelException์ ๋ฐ์์ํจ๋ค.
๋ค๋ฅธ exception์ผ๋ก ์ธํด channel์ด close ๋๋ฉด, ์ด channel์ failed channel์ด๋ผ ๋ถ๋ฅธ๋ค.
abstract val isClosedForReceive: Boolean
SendChannel ์ธก์์ close ํจ์๊ฐ ์คํ๋์ด channel์ด close ๋๊ณ ์ด์ ์ ๋ฐ์ ๋ชจ๋ element๋ค์ด ์ด๋ฏธ ๋ค ๋ณด๋ด์ง๋ฉด true๋ฅผ ๋ฐํํ๋ค. true๋ฅผ ๋ฐํํ ๋ receive ํจ์๋ฅผ ํธ์ถํ๋ฉด ClosedReceiveChannelException์ด ๋ฐ์ํ๋ค. ๋ค๋ฅธ exception์ ์ํด channel์ด close ๋ ๊ฒฝ์ฐ๋ channel์ด ๋ซํ ๊ฒ์ผ๋ก ๊ฐ์ฃผํ๋, ๊ทธ channel์ failed channel๋ก ๋ถ๋ฅธ๋ค.
abstract fun offer(element: E) : Boolean
์ฉ๋์ ์ด๊ณผํ์ง ์์ผ๋ฉด element๋ฅผ ์ด channel์ ์ฆ์ ์ถ๊ฐํ๊ณ true๋ฅผ ๋ฐํํ๋ค.
์ฉ๋์ ์ด๊ณผํ๋ฉด false๋ฅผ ๋ฐํํ๋ค.
send์ ๋๊ธฐ์ ์ธ ๋ณํ์ด๋ค.
channel์ด send์ ๋ซํ์ ธ ์๋ค๋ฉด (isClosedForSend ๊ฐ์ด true) exception์ ๋ฐ์์ํจ๋ค.
offer๋ก false๋ฅผ ๋ฐํ๋ฐ์ผ๋ฉด, ์ด element๊ฐ consumer์๊ฒ ์ ๋ฌ๋์ง ์์๋ค๋ ๊ฒ์ ๋ณด์ฅํ๊ณ onUndeliveredElement๋ฅผ ํธ์ถํ์ง ์๋๋ค.
channel์ด ๋ซํ์๋ค๋ฉด, exception์ ํธ์ถํ๊ธฐ ์ ์ onUndeliveredElement๋ฅผ ํธ์ถํ๋ค.
Channel ๋ซ๊ธฐ(close)์ ๋ฐ๋ณต(iteration)
val channel = Channel<Int>()
launch {
for (x in 1..5) channel.send(x * x)
channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")
abstract suspend fun close(cause: Throwable? = null): Boolean
channel์ ๋ ์ด์ ์ ์ก์ํฌ(send) element๊ฐ ์๋ค๊ณ ์๋ฆฌ๊ธฐ ์ํด channel์ ๋ซ์(close) ์ ์๋ค.
close๋ ๋ฉฑ๋ฑ ์ฐ์ฐ์๋ผ์ ์ด ํจ์๋ฅผ ์ต์ด ์คํํ ์ดํ์ ์คํ ํ์๋ ์๋ฌด ๋ณํ๋ ์๊ณ false๋ฅผ ๋ฆฌํด์ํจ๋ค.
๊ฐ๋ ์ ์ผ๋ก ์ด ํจ์๋ ํน์ "close token"๋ฅผ channel์ ๋ณด๋ธ๋ค. ์ด close token์ iteration์ ๋ฐ์๋ง์ ์ค๋จํ๊ธฐ ๋๋ฌธ์, close ์ ์ ๋ฐ์ ๋ชจ๋ element๋ค์ด receive๋ก ํธ์ถ๋์ด ๋๊ฐ๋ค๋ ๊ฒ์ด ๋ณด์ฅ๋๋ค.
→๋ฉฑ๋ฑ์ ๊ดํด ์ ๋ฆฌํ ๋ด์ฉ ํ์ธ
* Channel์ ํจ์
abstract operator fun iterator(): ChannelIterator<E>
for loop๋ฅผ ์ด์ฉํด ์ด channel์์์ element๋ค์ ๋ฐ๋ ์ iterator๋ฅผ ๋ฐํํ๋ค.
๋ณดํต exception cause ๋ฐ์์ด ์์ด isClosedForReceived ํจ์์ ๋ฐํ๊ฐ์ด true์ผ ๋ Iteration์ ์๋ฃํ๊ณ , channel์ด fail๋ ๊ฒฝ์ฐ close๋ฅผ ๋ฐ์์ํจ exception์ throw ํ๋ค.
-> Channel close() ํธ์ถ ํ for ๋ฃจํ ๋๋ฆด ์ ์๋ค.
channel ์์ฑ์(producer)๋ค ๋ง๋ค๊ธฐ
์ฝ๋ฃจํด์ด ์ฐ์๋ element๋ค์ ์์ฐํ๋ ํจํด(producer-consumer ํจํด์ ์ผ๋ถ)์ ํํ ์ฐ์ธ๋ค. ์ด producer๋ channel์ ๋งค๊ฐ๋ณ์๋ก ํ๋ ํจ์๋ก ๋ง๋ค ์๋ ์ ์์ง๋ง, ์ด๋ ํจ์๊ฐ ๋ฐ๋์ ๊ฐ์ ๋ฐํํด์ผ ํ๋ค๋ ์์๊ณผ ์ด๊ธ๋๋ค.(... ๊ทธ๋์ ๊ทธ๋ ๊ฒ ํ๋ฉด ์๋๋ค๊ณ ํ๋ ๊ฑฐ์ฃ ? (๏ผ _๏ผ ;) ์๋๊ธฐ ๋๋ฌธ์ produce ํจ์๋ฅผ ์๊ฐํด์ฃผ๋ ๊ฑฐ์ฃ ?)
-> producer-consumer ํจํด ํ์ธ
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) send(x * x)
}
fun main() = runBlocking {
val squares = produceSquares()
squares.consumeEach { println(it) }
println("Done!")
}
@ExperimentalCoroutinesApi fun <E> CoroutineScope.produce(
context: CoroutineContext = EmptyCoroutineContext,
capacity: Int = 0,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E>
producer๋ฅผ ์ฝ๊ฒ ๋ง๋๋ produce ๋ผ๋ ์ด๋ฆ์ coroutine builder๊ฐ ์๋ค. ๋ฌธ์๋ณด๊ธฐ
๋ฐ์ดํฐ ์คํธ๋ฆผ์ ์์ฑํด channel๋ก ๋ณด๋ด๋ ์ ์ฝ๋ฃจํด์ ์์ํ๋ค.
์ฝ๋ฃจํด์ ์ํด ์์ฑ๋ element๋ค์ receiveํ๋ ๋ฐ ์ฐ์ด๋ ReceiveChannel์ด ๋ฐํ๋๋ค.
context : CoroutineScope.coroutineContext
capacity : channel ๋ฒํผ์ ์ฉ๋ - ๊ฒฐ๊ณผ channel์ด ํน์ capacity(์ฉ๋) ๊ฐ์ ๋ฐ๋ผ ๋ฌ๋ผ์ง๋ค.
block : coroutine ์ฝ๋ - ProducerScope๋ผ์ ์ฝ๋ฃจํด์ด ๋ฐ๋ก send()๋ฅผ ํธ์ถํ ์ ์๋ค. ์ฝ๋ฃจํด์ด ์๋ฃ๋๋ฉด channel์ ๋ซํ๋ค. receive channel์ด ์ทจ์๋๋ฉด ์คํ ์ค์ด๋ coroutine์ด ์ทจ์๋๋ค.
exception ๋ฐ์ ์ coroutine์ channel์ ๋ซ๊ณ ๊ฒฐ๊ณผ channel์ fail ๋๋ค.
@ExperimentalCoroutinesApi interface ProducerScope<in E> :
CoroutineScope,
SendChannel<E>
produce coroutine builder๋ฅผ ์ํ scope. ์ด coroutine์ด elemet๋ค์ ๋ณด๋ด๋(send) channel์ ์ฐธ์กฐ.
suspend fun <E> ReceiveChannel<E>.consueEach(action: (E) -> Unit) : Unit
cosumer ์ชฝ์ธ channel์์ for ๋ฃจํ๋ฅผ ๋์ฒดํ๋ extension function.
receive๋ ๊ฐ element๋ฅผ ๋์์ผ๋ก action์ ์ํํ๊ณ action ์คํ ํ์ channel์ cancelํ๋ค.
Pipelines
ํ์ดํ๋ผ์ธ ํ๋๋ ์ฝ๋ฃจํด ํ๋๊ฐ ๋ฌดํ๋์ ์คํธ๋ฆผ ๊ฐ์ ์์ฑํ๋ ํจํด์ด๋ค.
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 1๋ถํฐ ์์ํ๋ ์ ์์ ๋ฌดํ๋ ์คํธ๋ฆผ
}
๋ ๋ค๋ฅธ ์ฝ๋ฃจํด๋ค์ ๊ทธ ์คํธ๋ฆผ์ ์๋นํ๊ณ , ๋ช ๊ฐ์ง ์ฒ๋ฆฌ๋ฅผ ํ๊ณ ๋ค๋ฅธ ๊ฒฐ๊ณผ๊ฐ๋ค์ ์์ฑํ๋ค.
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x) // ์ ๊ณฑ
}
๋ฉ์ธ ์ฝ๋๋ ์ ์ฒด ํ์ดํ๋ผ์ธ์ ์์ํ๊ณ ์ฐ๊ฒฐํ๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val numbers = produceNumbers() // 1 ๋ถํฐ ์ฆ๊ฐํด ์์ฑ๋๋ ์ ์๋ค
val squares = square(numbers) // ์ ์๋ค ์ ๊ณฑ
repeat(5) {
println(squares.receive()) // ์ฒ์ 5๊ฐ๋ง ํ๋ฆฐํธ
}
println("Done!") // we are done
coroutineContext.cancelChildren() // cancel children coroutines
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // infinite stream of integers starting from 1
}
fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x)
}
๊ฒฐ๊ณผ
1
4
9
16
25
Done!
-> ํ ์ฝ๋ฃจํด์ด ์ฑ๋ ํ๋์ stream producting ํ๋ ์ค์ ๋ค๋ฅธ ์ฝ๋ฃจํด์ด ๊ทธ ์ฑ๋์ stream์ cosuming ํด์ ๋ค๋ฅธ channel์ ์์ฑํ๋ค.
์์์ ํ์ดํ๋ผ์ธ
์ฝ๋ฃจํด๋ค์ด ํ๋์ ํ์ดํ๋ผ์ธ์ ์ด์ฉํด์ ์์๋ฅผ ์์ฑํ๋ ์์ ๋ฅผ ํตํด ํ์ดํ๋ผ์ธ๋ค์ ๊ทนํ๊น์ง ์ดํด ๋ณด๊ฒ ๋ค.
๋ฌดํ๋์ ์ซ์๋ค๋ก ์์ํด๋ณด์.
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
์๋์ ํ์ดํ๋ผ์ธ ๋จ๊ณ๋ ๋ค์ด์ค๋ ์ซ์ ์คํธ๋ฆผ์ ํํฐ๋งํ๋๋ฐ, ์ฃผ์ด์ง ์์๋ก ๋๋ ์ง๋ ๋ชจ๋ ์ซ์๋ค์ ์ญ์ ํ๋ค.
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
์ซ์ 2๋ถํฐ ์์ํ๋ ์ซ์ ์คํธ๋ฆผ์ผ๋ก ์์ํ๋ ํ์ดํ๋ผ์ธ์ ์์ฑํ๊ณ , ํ์ฌ ์ฑ๋์์ ์์๋ฅผ ๊ฐ์ ธ๋ค๊ฐ, ์ฐพ์ ๊ฐ ์์๋ง๋ค ์๋ก์ด ํ์ดํ ๋จ๊ณ๋ฅผ ์์ํ๋ค.
numbersFrom(2) -> filter(2) -> filter(3) -> filter(5) -> filter(7) ...
์๋ ์์๋ ์ฒซ 10๊ฐ์ ์์๋ค์ ํ๋ฆฐํธํ๊ณ , ๋ฉ์ธ ์ฐ๋ ๋์ ์ฝํ ์คํธ์์ ์ ์ฒด ํ์ดํ๋ผ์ธ์ ์คํํ๋ค. ๋ชจ๋ ์ฝ๋ฃจํด๋ค์ด ๋ฉ์ธ runBlocking ์ฝ๋ฃจํด์ ์ค์ฝํ ์์์ ์์๋๊ธฐ ๋๋ฌธ์ ์ฐ๋ฆฌ๋ ์์๋ ๋ชจ๋ ์ฝ๋ฃจํด๋ค์ ๋ช ์์ ์ธ ๋ฆฌ์คํธ๋ฅผ ์ ์งํ ํ์๊ฐ ์๋ค.
์ฐ๋ฆฌ๋ cancelChildren ํ์ฅํจ์๋ฅผ ์ฌ์ฉํด ์ฒ์ 10๊ฐ์ ์์๋ฅผ ํ๋ฆฐํธ ํ ํ์ ๋ชจ๋ ์๋ ์ฝ๋ฃจํด๋ค์ ์ทจ์ํ๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.receive()
println(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
var x = start
while (true) send(x++) // infinite stream of integers from start
}
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
๊ฒฐ๊ณผ
2
3
5
7
11
13
17
19
23
29
์ฐ๋ฆฌ๋ ๋๊ฐ์ ํ์ดํ๋ผ์ธ์ standard library์ iterator ์ฝ๋ฃจํด ๋น๋๋ฅผ ์ด์ฉํด ๋ง๋ค ์ ์๋ค.
produce -> iterator ๊ต์ฒด
send -> yield ๊ต์ฒด
receive -> next ๊ต์ฒด
ReceiveChannel -> Iterator ๊ต์ฒด
-> ์ฝ๋ฃจํด ์ค์ฝํ ์ ๊ฑฐ
runBlocking์ ์ฌ์ฉํ ํ์์๋ค.
๊ต์ฒดํ ์ฝ๋. ๊ฒฐ๊ณผ ๊ฐ์ ์์ ๊ฒฐ๊ณผ ๊ฐ๊ณผ ๊ฐ๋ค.
fun main() {
var cur = numbersFrom(2)
repeat(10) {
val prime = cur.next()
println(prime)
cur = filter(cur, prime)
}
}
fun numbersFrom(start: Int) = iterator<Int> {
var x = start
while (true) yield(x++) // infinite stream of integers from start
}
fun filter(numbers: Iterator<Int>, prime: Int) = iterator<Int> {
for (x in numbers) if (x % prime != 0) yield(x)
}
๊ทธ๋ฌ๋ ์์ ๊ฐ์ด ์ฑ๋๋ค์ ์ด์ฉํ ํ์ดํ๋ผ์ธ์ ์ด์ ์ Dispatchers.Default context์์ ์ฝ๋๋ฅผ ์คํํ ๋ multiple CPU ์ฝ์ด๋ค์ ์ค์ ๋ก ์ฌ์ฉํ ์ ์๋ค๋ ๊ฒ์ด๋ค.
์ด์จ๋ , ์์๋ค์ ์ฐพ๋ ๊ทน๋จ์ ์ด๊ณ ์ค์ฉ์ ์ด์ง๋ ์๋ ์์ ์๋ค. ์ค์ ๋ก ํ์ดํ๋ผ์ธ๋ค์ ์ ๋ง๋ก ๋ช๊ฐ์ง ๋ค๋ฅธ suspending invocation(ํธ์ถ, ์๊ฒฉ ์๋น์ค๋ก ๋น๋๊ธฐ ํธ์ถ ๊ฐ์ด)์ ํฌํจํ๊ณ , ์ด ํ์ดํ๋ผ์ธ๋ค์ sequence/iterator๋ฅผ ์ฌ์ฉํด์ ๋ง๋ค์ด์ง ์๋ ์๋ค. ์๋ํ๋ฉด seqeunce/iterator๋ ์์ ํ ๋น๋๊ธฐ์ธ produce์ ๋ค๋ฅด๊ฒ ์์์ ์ธ ์ค๋จ์ ํ์ฉํ์ง ์๊ธฐ ๋๋ฌธ์ด๋ค.
fun <T> runBlocking(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
): T
context: ์ฝ๋ฃจํด์ ์ฝํ ์คํธ. ๊ธฐ๋ณธ๊ฐ์ ํ์ฌ ์ฐ๋ ๋์ ์ด๋ฒคํธ ๋ฃจํ์ด๋ค.
block: ์ฝ๋ฃจํด ์ฝ๋
์๋ก์ด ์ฝ๋ฃจํด ํ๋๋ฅผ ์คํํ๊ณ ์ด ์ฝ๋ฃจํด์ด ์๋ฃ๋ ๋๊น์ง ํ์ฌ ์ฐ๋ ๋๋ฅผ ์ธํฐ๋ฝํธ๋ก ๋ง๋๋ค. ์ด ํจ์๋ฅผ ์ฝ๋ฃจํด์์ ์ฌ์ฉํด์๋ ์๋๋ค. ์ด๊ฒ์ ์ผ๋ฐ ์ฐจ๋จ ์ฝ๋์ ์ผ์์ค๋จ ์คํ์ผ๋ก ์์ฑ๋ ๋ผ์ด๋ธ๋ฌ๋ฆฌ์ ์ฐ๊ฒฐํด์ ์ฃผ์ ๊ธฐ๋ฅ๋ค๊ณผ ํ ์คํธ์์ ์ฌ์ฉ๋๋๋ก ์ค๊ณ๋์๋ค.
์ด ๋น๋์ ๊ธฐ๋ณธ CoroutineDispatcher๋ ์ด ์ฝ๋ฃจํด์ด ์์ฑ๋ ๋๊น์ง ์ค๋จ๋ ์ฐ๋ ๋์์ ์ฐ์์ ์ผ๋ก ์งํํ๋ ์ด๋ฒคํธ ๋ฃจํ์ ๋ด๋ถ ๊ตฌํ์ด๋ค.
context ๋ณ์์ CoroutineDispatcher๊ฐ ๋ช ์์ ์ผ๋ก ์ง์ ๋๋ฉด ํ์ฌ ์ฐ๋ ๋๊ฐ ๋งํ์ ๋์ ์ง์ ๋ dispatcher์ context๊ฐ ์๋ก์ด ์ฝ๋ฃจํด์ ์คํํ๋ค. ๋ง์ฝ ์ง์ ๋ dispatcher๊ฐ ๋ ๋ค๋ฅธ runBlocking์ ์ด๋ฒคํธ ๋ฃจํ๋ผ๋ฉด, ์ด ํธ์ถ์ ๋ฐ๊นฅ ๋ฃจํ๋ฅผ ์ฌ์ฉํ๋ค.
์ด ๋งํ ์ฐ๋ ๋๊ฐ ์ธํฐ๋ฝํธ๋๋ฉด, ์ฝ๋ฃจํด job์ ์ทจ์๋๊ณ , runBlocking์ InterruptedException์ ํธ์ถํ๋ค.
fun CoroutineConetext.cancelChildren(
cause: CancellationException? = null
): Unit
์ ํ์ ์ธ ์ทจ์ ์์ธ์ ๊ฐ์ง๊ณ job ์ค์ค๋ก์ ์ํ๋ฅผ ๊ฑด๋๋ฆฌ์ง ์๊ณ , ์ด context ์์ Job์ ๋ชจ๋ ์๋ ๋ค์ ์ทจ์ํ๋ค.
*Standard library์ ํจ์
fun <T> iterator(
block: suspend SeqeunceScope<T>.() -> Unit
): Iterator<T>
๊ฐ์ ํ๋์ฉ ๋๋ฆฌ๊ฒ ์์ฑํ๋ Iterator๋ฅผ ๋น๋ํ๋ค.
์์
import kotlin.test.*
fun main(args: Array<String>) {
val collection = listOf(1, 2, 3)
val wrappedCollection = object : AbstractCollection<Any>() {
override val size: Int = collection.size + 2
override fun iterator(): Iterator<Any> = iterator {
yield("first")
yieldAll(collection)
yield("last")
}
}
println(wrappedCollection) // [first, 1, 2, 3, last]
}
์์
fun main(args: Array<String>) {
val iterable = Iterable {
iterator {
yield(42)
yieldAll(1..5 step 2)
}
}
val result = iterable.mapIndexed { index, value -> "$index: $value" }
println(result) // [0: 42, 1: 1, 2: 3, 3: 5]
// can be iterated many times
repeat(2) {
val sum = iterable.sum()
println(sum) // 51
}
}
Fan-out
ํ๋์ producer ์ฝ๋ฃจํด์ channel 1๊ฐ, ์ฒ๋ฆฌ ์ฝ๋ฃจํด ์ฌ๋ฌ๊ฐ.
์ฌ๋ฌ ์ฝ๋ฃจํด๋ค์ ๋์ผํ ์ฑ๋์ ์์ (receive)ํด์ ์๋ก๊ฐ์ ์์ ์ ๋ถ๋ฐฐํ ์ ์๋ค.
๋งค ์ด๋ง๋ค ๊ฐ์ด ์ฆ๊ฐํ๋ ์ ์๋ฅผ ์์ฐํ๋ producer ์ฝ๋ฃจํด์ผ๋ก ์์ํ๋ค.
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
์ฌ๋ฌ processor ์ฝ๋ฃจํด๋ค์ ๊ฐ์ง ์ ์๋ค. ์์ ์ฝ๋์์๋ ์ฝ๋ฃจํด์ id์ ์์ ํ ์ซ์๋ฅผ printํ๋ค.
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
์ด์ 5๊ฐ์ ํ๋ก์ธ์๋ค์ ์์ํ๊ณ ์ฝ 1์ด ๋์ ๋์์ํจ๋ค. ์ด๋ค ์ผ์ด ์ผ์ด๋๋์ง ๋ณด์.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
println("Processor #$id received $msg")
}
}
์ถ๋ ฅ์ ๋ค์๊ณผ ๋น์ทํ ๊ฒ์ด๋ค. ๊ฐ ์ ์๋ฅผ ์์ ํ ํ๋ก์ธ์์ id๋ ๋ค๋ฅผ ์ ์์ง๋ง ๋ง์ด๋ค.
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10
producer ์ฝ๋ฃจํด์ ์ทจ์ํ๋ฉด ํด๋น ์ฑ๋๋ ๋ซํ๋ค. ๋ฐ๋ผ์ ๊ฒฐ๊ณผ์ ์ผ๋ก ํ๋ก์ธ์ ์ฝ๋ฃจํด๋ค์ด ์์
ํ๊ณ ์๋ ์ฑ๋์ iteration์ ์ข
๋ฃ๋๋ค.
launchProcessor ์ฝ๋์์ fan-out์ ์ํํ๊ธฐ ์ํด for ๋ฃจํ๋ก channel์ ๋ช
์์ ์ผ๋ก ๋ฐ๋ณตํ๋ ๋ฐฉ๋ฒ์ ์ฃผ๋ชฉํด๋ผ. consumEach์ ๋ค๋ฅด๊ฒ, ์ด for ๋ฃจํ ํจํด์ ์ฌ๋ฌ ์ฝ๋ฃจํด๋ค์ ์ฌ์ฉํ๊ธฐ์ ์๋ฒฝํ๊ฒ ์์ ํ๋ค. ๋ง์ฝ ํ๋ก์ธ์ ์ฝ๋ฃจํด๋ค ์ค ํ๋๊ฐ ์คํจํ๋ค๋ฉด, ๋ค๋ฅธ ์ฝ๋ฃจํด๋ค์ ์ฌ์ ํ ๊ณ์ ์ฑ๋์ ์ฒ๋ฆฌํ ๊ฒ์ธ ๋ฐ๋ฉด, consumeEach๋ฅผ ํตํด ์์ฑ๋ ํ๋ก์ธ์๋ ํญ์ ๊ธฐ๋ณธ์ด ๋๋ ์ฑ๋์ ์ ์์ ์ด๊ฑฐ๋ ๋น์ ์์ ์ธ ์๋ฃ์ ์๋น(์ทจ์)๋ฅผ ํ๋ค.
-> fan-out์์ ์ฌ๋ฌ processor ์ฝ๋ฃจํด๋ค ์ค ํ๋๊ฐ ์คํจํด๋ ๋ค๋ฅธ ์ฝ๋ฃจํด๋ค์ด ์์ ์ ์ ์์ ์ผ๋ก ์งํํ ์ ์์ผ๋, consumeEach๋ก ์์ฑ๋ processor ์ฝ๋ฃจํด์ ์ฝ๋ฃจํด์ด ์คํจํ๋ฉด ์์ ์ค์ธ channel์ด ๋ซํ๊ธฐ ๋๋ฌธ์ ๋ค๋ฅธ ์ฝ๋ฃจํด๋ค์ ์์ ์ด ์ค๋จ๋๋ค.
Fan-in
์ฌ๋ฌ ์ฝ๋ฃจํด๋ค์ด ํ๋์ channel์ ์ ๊ทผ ๊ฐ๋ฅ.
์ฌ๋ฌ ์ฝ๋ฃจํด๋ค์ด ๋์ผํ ํ๋์ channel์ ๊ฐ์ sendํ ์ ์๋ค.
์๋ฅผ ๋ค์ด, string๊ฐ๋ค์ channel ํ๋๋ฅผ ๊ฐ์ง๊ณ ์๊ณ , ์ง์ฐ ์๊ฐ์ ๊ฐ์ง๊ณ ํน์ string์ ์ด channel์ ๋ฐ๋ณต์ ์ผ๋ก sendํ๋ suspending function์ด ์๋ค๊ณ ํ์.
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
๋ง์ฝ string๊ฐ๋ค์ ๋ณด๋ด๋ ์ฝ๋ฃจํด ๋ ๊ฐ๋ฅผ ์์ํ๋ค๋ฉด ์ด๋ค ์ผ์ด ์ผ์ด๋๋ ์ง ๋ณด์.(์ด ์์ ์์ ๋ฉ์ธ ์ฝ๋ฃจํด์ ์๋ ๋ค๋ก์ ๋ฉ์ธ ์ฐ๋ ๋์ context์์ ์ด ์ฝ๋ฃจํด๋ค์ ์คํํ๋ค.)
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking {
val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
}
suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
while (true) {
delay(time)
channel.send(s)
}
}
์ถ๋ ฅ์ ๋ค์๊ณผ ๊ฐ๋ค.
foo
foo
BAR!
foo
foo
BAR!
Buffered channels
์ง๊ธ๊น์ง ์ดํด๋ณธ channel๋ค์ ๋ฒํผ๊ฐ ์์๋ค. ๋ฒํผ๊ฐ ์๋ ์ฑ๋๋ค์ sender์ receiver๊ฐ ๋ง๋ ๋ ์์๋ค์ ์ ์กํ๋ค.(์ผ๋ช ๋๋ฐ๋ทฐ. rendezvous ์ฑ๋์ด๋ผ ๋ถ๋ฅด๋๋ผ). send๊ฐ ๋จผ์ ์คํ๋๋ฉด receive๋ ํธ์ถ๋ ๋๊น์ง ๊ธฐ๋ค๋ฆฌ๊ณ , receive๊ฐ ๋จผ์ ํธ์ถ๋๋ฉด send๊ฐ ํธ์ถ๋ ๋๊น์ง ์ค๋จ๋๋ค.
Channel ํฉํ ๋ฆฌ ํจ์์ produce ๋น๋ ๋๋ค capacity ๋งค๊ฐ๋ณ์๋ฅผ ๋ฒํผ์ฌ์ด์ฆ๋ก ์ฌ์ฉํ ์ ์๋ค. ๋ฒํผ๋ ์ค๋จ ์ ์ ์ฌ๋ฌ ์์๋ค์ sender๊ฐ ์ ์กํ๊ฒ ํ๊ณ , ๋ฒํผ๊ฐ ๊ฐ๋์ฐฐ ๋ ์ฐจ๋จ๋๋ ๊ฒ์ ์ง์ ๋ capacity๋ฅผ ๊ฐ์ง BlockingQueue์ ๋น์ทํ๋ค.
fun <E> Channel(
capacity: Int = RENDEZVOUS,
onBufferedOverflow: BufferOverflow = BufferOverflow.SUSPEND,
pnUndeliveredElement: (E) -> Unit = null
): Channel<E>
capacity - ์์์ ์ฑ๋ ์ฉ๋ ๋๋ Channel.Facotry์์ ์ ์๋ ์์ ์ค ํ๋
onBufferOverflow - ๋ฒํผ ์ค๋ฒํ๋ก์ฐ์์์ ์ก์ . ์ ํ์ฌํญ. ๊ธฐ๋ณธ๊ฐ์ ์ ์ก ์๋๋ฅผ ์ผ์์ค๋จํ๋ค. capacity >=0์ด๊ฑฐ๋ capacity == Channel.BUFFERED ์ผ ๋๋ง ์ง์๋๋ค. ์ต์ ํ๋์ ๋ฒํผ๋ง๋ ์์๊ฐ ์์ผ๋ฉด ์ฑ๋์ ์์์ ์ผ๋ก ์์ฑํ๋ค.
onUndeliveredElement - ์ ํ์ฌํญ์ธ ํจ์. ์์๊ฐ ์ ์ก๋์์ง๋ง ์๋น์๋ก ์์๊ฐ ์ ์ก๋์ง ๋ชปํ ๋ ํธ์ถ๋๋ค.
capacity < -2 ๊ฒฝ์ฐ์ IllegalArgumentException ์ ํธ์ถํ๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
println("Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
println("Sending $it done")
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
}
๊ฒฐ๊ณผ
Sending 0
Sending 0 done
Sending 1
Sending 1 done
Sending 2
Sending 2 done
Sending 3
Sending 3 done
Sending 4
์ฒ์ 4๊ฐ์ ์์๋ค์ ๋ฒํผ์ ์ถ๊ฐ๋์๊ณ , sender๋ ๋ค์ฏ๋ฒ์งธ ์์๋ฅผ sendํ๊ธฐ ์ํด ์ค๋จ๋๋ค.
Channel๋ค์ ๊ณตํํ๋ค
์ฌ๋ฌ ์ฝ๋ฃจํด๋ค์์ ํธ์ถ ์์์ ๊ด๋ จํด ์ฑ๋์ ์ ์ก(send)๊ณผ ์์ (receive) ์ฐ์ฐ์ ๊ณตํํ๋ค. FIFO๋ก ์งํ๋๋ค. receive๋ฅผ ํธ์ถํ ์ฒซ๋ฒ์งธ ์ฝ๋ฃจํด์ด ์์๋ฅผ ๊ฐ์ ธ๊ฐ๋ค.
์๋ ์์์์ ๋ ์ฝ๋ฃจํด "ping"๊ณผ "pong"์ "ball" ๊ฐ์ฒด๋ฅผ ๊ณต์ฉ "table" ์ฑ๋์์ ์์ ํ๊ณ ์๋ค.
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
data class Ball(var hits: Int)
fun main() = runBlocking {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
println("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
๊ฒฐ๊ณผ
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)
"ping" ์ฝ๋ฃจํด์ด ์ฒ์ ์์๋์๊ณ , ๊ณต์ ์์ ํ๋ ์ฒซ ์ฝ๋ฃจํด์ด๋ค. ๋น๋ก "ping" ์ฝ๋ฃจํด์ด ํ ์ด๋ธ์ ๊ณต์ ์ ์กํ ํ์ ๋ฐ๋ก ๊ณต์ ๋ฐ๋ ๊ฒ์ ์์ํ์ผ๋, ๊ณต์ "pong" ์ฝ๋ฃจํด์๊ฒ ์์ ๋์๋ค. ์๋ํ๋ฉด ์ด๋ฏธ "pong"์ด ๊ณต์ ๊ธฐ๋ค๋ฆฌ๊ณ ์์๊ธฐ ๋๋ฌธ์ด๋ค.
์ข ์ข channel๋ค์ ์คํ์ด ์คํ ํ๋ก๊ทธ๋จ์ ํน์ฑ์ ๋ฐ๋ผ unfairํ ๊ฒฝ์ฐ๋ ์๋ค.
Ticker channels
Ticker channel์ ์ด ์ฑ๋์ ๋ง์ง๋ง ์์์ ์๋น ์ดํ์ delay๊ฐ ๋ฐ์ํ ๋๋ง๋ค Unit์ ์์ฑํ๋ ํน๋ณํ ๋๋ฐ๋ทฐ ์ฑ๋(๋ฒํผ๋ง์๋ ์ฑ๋)์ด๋ค.
์ธ๋ฐ์๋ ๋ ๋ฆฝํ ์ฑ๋๋ก ๋ณด์ด์ง๋ง, ๋ณต์กํ ์๊ฐ ๊ธฐ๋ฐ produce ํ์ดํ๋ผ์ธ์ ์์ฑํ๊ฑฐ๋ windowing(? ์ดํด ๋ชปํ์ต๋๋ค.), ์๊ฐ์ข ์ ์ฒ๋ฆฌ๋ฅผ ํ๋ ์ฐ์ฐ๋ค์ ํ๋๋ฐ ์ ์ฉํ๋ค.
Ticker channel์ ์์ฑํ๋ ค๋ฉด ํฉํ ๋ฆฌ ํจ์ ticker๋ฅผ ์ฌ์ฉํ๋ค. ๋์ด์์ ์์๋ค์ด ํ์ํ์ง ์๋ค๋ฉด ReceiveChannel.cancel ํจ์๋ฅผ ์ฌ์ฉํ๋ค.
@ObsoleteCoroutinesApi fun ticker(
delayMillis: Long,
initialDelayMillis: Long = delayMillis,
context: CoroutineContext = EmptyCoroutineContext,
mode: TickerMode = TickerMode.FIXED_PERIOD
): ReceiveChannel<Unit>
์ฒ์ ์ง์ฐ ์๊ฐ ํ์ ์ฒซ ์์ดํ (Unit)์ ์์ฐํ๊ณ ์ง์ฐ์๊ฐ์ด ์๋ ํ์ ์์ดํ ๋ค์ ์์ฐํ๋ ์ฑ๋์ ์์ฑํ๋ค.
๊ฒฐ๊ณผ ์ฑ๋์ ๋๋ฐ๋ทฐ ์ฑ๋์ด๋ค. ์ด ์ฑ๋์ receiver๊ฐ ์ฑ๋์ element๋ค์ ์์ ์ ๋ฐ๋ผ๊ฐ์ง ๋ชปํ๋ฉด, element๋ค์ backpressure ๋๋ฌธ์ send๋์ง ๋ชปํ๋ค. ์ด๋ฌํ ๊ฒฝ์ฐ์ ticker์ ์ค์ ํ์ด๋ฐ ํ๋์ ๊ธฐ๋ณธ๊ฐ์ด TickerMode.FIXED_PERIOD์ธ mode ๋งค๊ฐ๋ณ์์ ์ํด ํต์ ๋๋ค.
cancel ํธ์ถ ํ์ ๋ฐ๋ก element ์์ฐ์ ์ค๋จํ๋ค.
ticker๋ consumer์ ๊ฐ๋ฅํ ์ผ์์ค์ง๋ฅผ ์ธ์งํ๊ณ , ๊ธฐ๋ณธ์ ์ผ๋ก ์ผ์์ค์ง๊ฐ ๋ฐ์ํ๋ฉด ๋ค์์ ์์ฑ๋๋ ์์์ ์ง์ฐ ์๊ฐ์ ์กฐ์ ํด์ ์์ฑ๋ ์์๋ค์ ๊ณ ์ ๋ ๋น์จ์ ์ ์งํ๋ ค๊ณ ํ๋ค.
delayMillis - ๊ฐ ์์๋ค ์ฌ์ด์ ๋ฐ๋ฆฌ์ด ์ง์ฐ ์๊ฐ
initialDelayMillis - ๋ฐ๋ฆฌ ์ด์ ์ง์ฐ ์๊ฐ ํ์ ์ฒซ ์์๊ฐ ์์ฑ๋ ๊ฒ์ด๋ค. ๊ธฐ๋ณธ ๊ฐ์ delayMillis์ ๊ฐ๋ค.
context - producing ์ฝ๋ฃจํด์ context
mode - ์์๋ค์ด ์์ ๋์ง ์์ ๋์ ํน์ ํ๋. ๊ธฐ๋ณธ ๊ฐ์ด TickerMode.FIXED_PERIOD
@ObsoleteCoroutinesApi enum classTickerMode
ticker ํจ์๋ฅผ ์ํ ๋ชจ๋.
์ฐธ๊ณ : ticker ์ฑ๋์ ํ์ฌ ๊ตฌ์กฐํ๋ ๋์์ฑ๊ณผ ํตํฉ๋์ง ์์๊ณ , ์ด api๋ ํฅํ์ ๋ณ๊ฒฝ๋ ๊ฒ์ด๋ค.
Enum value
FIXED_PERIOD - cosumer๊ฐ ๋ฐ๋ผ์ก์ง ๋ชปํ๊ฑฐ๋ ๋๋ฆฌ๋ค๋ฉด ๊ณ ์ ๋ ๊ธฐ๊ฐ์ ์ ์งํ๊ธฐ ์ํด ์ง์ฐ ์๊ฐ์ ์กฐ์ ํ๋ค.
FIXED_DELAY - cosumer๊ฐ ๋ฐ๋ผ์ก์ง ๋ชปํ๊ฑฐ๋ ๋๋ฆฌ๋ฉด ์์ฑ๋ element๋ค ์ฌ์ด์ ๊ณ ์ ๋ ์ง์ฐ ์๊ฐ์ ์ ์งํ๋ค.
-> @ObsoleteCoroutinesApi ํ์ธํ๊ธฐ
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
fun main() = runBlocking<Unit> {
//ticker ์ฑ๋ ์์ฑ. ์ฒ์ 0์ด ๋๋ ์ด๋ก Unit ์์ดํ
์ produceํ๊ณ
//๊ทธ ํ๋ก๋ ๋ชจ๋ Unit ์์ดํ
์ด 100์ด ๊ฐ๊ฒฉ์ผ๋ก(์ง์ฐ ์๊ฐ์ผ๋ก) produceํ๋ค.
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0)
//1์ด ํ์์์ receive -> ๋ฐํ ๊ฐ Unit. ์์ฑํ์๋ง๋ค ์์ดํ
์ produceํ๊ธฐ์
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Initial element is available immediately: $nextElement") // no initial delay
//50์ด ํ์์์ receive -> ๋ฐํ ๊ฐ null. ๋๋ต 51์ด๊น์ง produce๋ ์์ดํ
์ด ์์
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() }
println("Next element is not ready in 50 ms: $nextElement")
//60์ด ํ์์์ receive -> ๋ฐํ ๊ฐ Unit. 100์ด ๊ฒฝ์ ์์ดํ
์ด produce ๋์ ๊ฒ.
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 100 ms: $nextElement")
// Emulate large consumption delays
// cosumer์ 150์ด ๋๋ ์ด.
println("Consumer pauses for 150ms")
delay(150)
// Next element is available immediately
// 1์ด ํ์์์ receive -> ๋ฐํ ๊ฐ Unit.200์ด์ ์ด๋ฏธ ์์ดํ
์ด produce ๋์ ๊ฒ.
// 250์ด ๊ฒฝ์ receive.
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
println("Next element is available immediately after large consumer delay: $nextElement")
// Note that the pause between `receive` calls is taken into account and next element arrives faster
// 60์ด ํ์์์ receiver -> ๋ฐํ ๊ฐ Unit.
// receive ํธ์ถ ์ฌ์ด์ ์ผ์์ ์ง(delay)๋ฅผ ๊ณ ๋ คํด์ Unit ์์๊ฐ ๋นจ๋ฆฌ produce ๋์๋ค.
// 50์ด๋ง์ ์์ดํ
์ด produce ๋๋ค.
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // indicate that no more elements are needed
}
print ๊ฒฐ๊ณผ
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
๋ฐ๋ฒ์ญ์ผ๋ก ๊ทธ๋๋ ์ฌ๊ธฐ๊น์ง ์ดํดํ๋ค.
'๋น ๊ตฌ๋ฉ ์ฑ์ฐ๊ธฐ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Kotlin] Delegation (0) | 2021.03.05 |
---|---|
[Programming] Backpressure (0) | 2021.03.02 |
[Android] java.lang.IllegalStateException: Can't access the Fragment View's LifecycleOwner when getView() is null i.e., before onCreateView() or after onDestroyView() (0) | 2021.03.02 |
[Kotlin Coroutines] ObsoleteCoroutinesApi (0) | 2021.02.24 |
[Android] AndroidManifest <receiver> (0) | 2021.02.23 |