λ³Έλ¬Έ λ°”λ‘œκ°€κΈ°

빈 ꡬ멍 μ±„μš°κΈ°

[λ””μžμΈ νŒ¨ν„΄][Java] μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄ Producer-Consumer Pattern

좜처

ChatGPT


μƒμ‚°μž-μ†ŒλΉ„μž(Producer-Consumer) νŒ¨ν„΄μ€ λ©€ν‹°μŠ€λ ˆλ“œ ν™˜κ²½μ—μ„œ μƒμ‚°μž(Producer)와 μ†ŒλΉ„μž(Consumer) μŠ€λ ˆλ“œ κ°„μ˜ 데이터λ₯Ό μ•ˆμ „ν•˜κ²Œ κ³΅μœ ν•˜κΈ° μœ„ν•œ λ””μžμΈ νŒ¨ν„΄μ΄λ‹€. 이 νŒ¨ν„΄μ€ 곡유 μžμ›μ„ 두고 μ—¬λŸ¬ μŠ€λ ˆλ“œκ°€ λ™μ‹œμ— μ ‘κ·Όν•  λ•Œ λ°œμƒν•  수 μžˆλŠ” 문제λ₯Ό ν•΄κ²°ν•˜κΈ° μœ„ν•΄ μ‚¬μš©λ˜λ©°, 일반적으둜 버퍼(큐)λ₯Ό μ‚¬μš©ν•΄ 데이터λ₯Ό μ£Όκ³ λ°›λŠ”λ‹€.

 

1. μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ˜ κ°œλ…

μƒμ‚°μž Producer

데이터λ₯Ό μƒμ„±ν•˜λŠ” μŠ€λ ˆλ“œλ‘œ, μƒμ‚°λœ 데이터λ₯Ό λ²„νΌλ‚˜ 큐에 μ €μž₯ν•œλ‹€.

μ†ŒλΉ„μž Consumer

μƒμ‚°μžκ°€ μƒμ„±ν•œ 데이터λ₯Ό λ²„νΌμ—μ„œ 가져와 μ²˜λ¦¬ν•˜λŠ” μŠ€λ ˆλ“œ

 

 

λ²„νΌλŠ” μƒμ‚°μžκ°€ 데이터λ₯Ό μ €μž₯ν•˜κ³ , μ†ŒλΉ„μžκ°€ 데이터λ₯Ό κ°€μ Έμ˜€λŠ” 곡유된 μžμ›μ΄λ‹€. 버퍼가 가득 μ°¬ 경우, μƒμ‚°μžλŠ” λ°κΈ°ν•˜κ³ , 버퍼가 λΉ„μ–΄ μžˆλŠ” 경우, μ†ŒλΉ„μžλŠ” λŒ€κΈ°ν•΄μ•Ό ν•œλ‹€. μƒμ‚°μžμ™€ μ†ŒλΉ„μžκ°€ 동깃에 버퍼에 μ ‘κ·Όν•˜λŠ” 경우 동기화가 ν•„μš”ν•˜λ‹€.

 

2. μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ˜ λ™μž‘ 원리

이 νŒ¨ν„΄μ€ 동기화 λ§€μ»€λ‹ˆμ¦˜μ„ 톡해 데이터λ₯Ό μ•ˆμ „ν•˜κ²Œ κ³΅μœ ν•˜κ³ , μƒμ‚°μžκ°€ 데이터λ₯Ό 더 이상 μƒμ„±ν•˜μ§€ μ•Šλ„λ‘ ν•˜κ³ , μ†ŒλΉ„μžκ°€ 데이터λ₯Ό μ²˜λ¦¬ν•  수 μžˆλŠ” 상황을 λ§Œλ“€μ–΄μ€€λ‹€.

  • μƒμ‚°μžλŠ” 버퍼에 데이터λ₯Ό μΆ”κ°€ν•˜κ³ , 버퍼가 가득 μ°¨λ©΄ 더 이상 데이터λ₯Ό μΆ”κ°€ν•  수 μ—†κΈ° λ•Œλ¬Έμ— λŒ€κΈ°ν•œλ‹€.
  • μ†ŒλΉ„μžλŠ” λ²„νΌμ—μ„œ 데이터λ₯Ό κ°€μ Έκ°€λ©°, 버퍼가 λΉ„μ–΄ 있으면 데이터λ₯Ό κ°€μ Έμ˜¬ 수 μ—†κΈ° λ•Œλ¬Έμ— λŒ€κΈ°ν•œλ‹€.
  • 이 κ³Όμ •μ—μ„œ λ²„νΌμ˜ 크기λ₯Ό κ³ λ €ν•œ 동기화가 ν•„μš”ν•˜λ©°, μƒμ‚°μž-μ†ŒλΉ„μž λ¬Έμ œλŠ” λŒ€κ°œ 버퍼λ₯Ό κΈ°μ€€μœΌλ‘œ 데이터λ₯Ό μ•ˆμ „ν•˜κ²Œ κ³΅μœ ν•˜λŠ” λ°©μ‹μœΌλ‘œ ν•΄κ²°λœλ‹€.

 

3. μ£Όμš” 문제점 및 ν•΄κ²° 방법

1. λ™μ‹œμ„± 문제

μ—¬λŸ¬ μŠ€λ ˆλ“œκ°€ λ™μ‹œμ— 버퍼에 μ ‘κ·Όν•˜λŠ” μƒν™©μ—μ„œ, λ°μ΄ν„°μ˜ 일관성을 μœ μ§€ν•˜κΈ° μœ„ν•΄ 동기화가 ν•„μš”ν•˜λ‹€. μžλ°”μ—μ„œλŠ” synchronized(), wait(), notify(), notifyAll()κ³Ό 같은 동기화 λ§€μ»€λ‹ˆμ¦˜μ„ 톡해 이 문제λ₯Ό ν•΄κ²°ν•œλ‹€.

2. 버퍼가 가득 μ°¬ 경우 Overflow

버퍼가 가득 μ°¨λ©΄ μƒμ‚°μžλŠ” 더 이상 데이터λ₯Ό μΆ”κ°€ν•  수 μ—†κΈ° λ•Œλ¬Έμ— λŒ€κΈ°ν•œλ‹€. 이 μƒνƒœλŠ” 버퍼가 λΉ„μ›Œμ§ˆ λ•ŒκΉŒμ§€ μ§€μ†λœλ‹€.

3. 버퍼가 λΉ„μ–΄ μžˆλŠ” 경우 Underflow

버퍼가 λΉ„μ–΄ 있으면 μ†ŒλΉ„μžλŠ” 데이터λ₯Ό κ°€μ Έμ˜¬ 수 μ—†μœΌλ―€λ‘œ, μƒμ‚°μžκ°€ μƒˆλ‘œμš΄ 데이터λ₯Ό μΆ”κ°€ν•  λ•ŒκΉŒμ§€ λŒ€κΈ°ν•΄μ•Ό ν•œλ‹€.

 

4. μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ˜ κ·œν˜„ 방법

μžλ°”μ—μ„œ μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ„ κ΅¬ν˜„ν•  λ•ŒλŠ” 일반적으둜  BlockinigQueueλ₯Ό μ‚¬μš©ν•˜κ±°λ‚˜, synchronized, wait(), notify() λ©”μ†Œλ“œλ₯Ό 톡해 직접 동기화λ₯Ό μ œμ–΄ν•  수 μžˆλ‹€.

1. BlockingQueueλ₯Ό μ‚¬μš©ν•œ μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄

BlockingQueueλŠ” 큐가 가득 찼을 λ•Œ μžλ™μœΌλ‘œ μƒμ‚°μžλ₯Ό λŒ€κΈ°μ‹œν‚€κ³ , 큐가 λΉ„μ—ˆμ„ λ•Œ μžλ™μœΌλ‘œ μ†ŒλΉ„μžλ₯Ό λŒ€κΈ°μ‹œν‚€λŠ” 동기화 큐이닀.

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class ProducerConsumerExample {
    public static void main(String[] args) {
        // 크기가 5인 BlockingQueue 생성
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(5);

        // μƒμ‚°μž μŠ€λ ˆλ“œ
        Thread producer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    System.out.println("Produced: " + i);
                    queue.put(i);  // 큐가 가득 μ°¨λ©΄ λŒ€κΈ°
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // μ†ŒλΉ„μž μŠ€λ ˆλ“œ
        Thread consumer = new Thread(() -> {
            try {
                for (int i = 1; i <= 10; i++) {
                    Integer value = queue.take();  // 큐가 λΉ„μ–΄ 있으면 λŒ€κΈ°
                    System.out.println("Consumed: " + value);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

 

좜λ ₯ μ˜ˆμ‹œ

Produced: 1
Produced: 2
Produced: 3
Consumed: 1
Consumed: 2
Produced: 4
Consumed: 3
...

 

  • BlockingQueueλŠ” μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ—μ„œ 동기화λ₯Ό λͺ…μ‹œμ μœΌλ‘œ μ²˜λ¦¬ν•˜μ§€ μ•Šμ•„λ„ λœλ‹€. 큐가 가득 μ°¨λ©΄ μžλ™μœΌλ‘œ μƒμ‚°μžκ°€ λŒ€κΈ°ν•˜κ³ , λΉ„μ–΄ 있으면 μ†ŒλΉ„μžκ°€ λŒ€κΈ°ν•œλ‹€.
  • put() λ©”μ†Œλ“œλŠ” 큐가 가득 μ°¨λ©΄ λŒ€κΈ°ν•˜κ³ , take() λ©”μ†Œλ“œλŠ” 큐가 λΉ„μ–΄ 있으면 λŒ€κΈ°ν•œλ‹€.

2. synchronized, wait(), notify()λ₯Ό μ΄μš©ν•œ μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄

λ‹€μŒμ€ synchronized ν‚€μ›Œλ“œμ™€ wait(), notify() λ©”μ†Œλ“œλ₯Ό μ‚¬μš©ν•œ μ˜ˆμ‹œμ΄λ‹€. wait()λŠ” μŠ€λ ˆλ“œλ₯Ό λŒ€κΈ° μƒνƒœλ‘œ λ§Œλ“€κ³ , notify()λŠ” λŒ€κΈ° 쀑인 μŠ€λ ˆλ“œλ₯Ό κΉ¨μš°λŠ” λ™μž‘μ„ μˆ˜ν–‰ν•œλ‹€.

import java.util.LinkedList;
import java.util.Queue;

class ProducerConsumer {
    private final Queue<Integer> queue = new LinkedList<>();
    private final int MAX_SIZE = 5;

    // μƒμ‚°μž λ©”μ„œλ“œ
    public void produce() throws InterruptedException {
        int value = 0;
        while (true) {
            synchronized (this) {
                while (queue.size() == MAX_SIZE) {
                    wait();  // 큐가 가득 μ°¨λ©΄ λŒ€κΈ°
                }
                System.out.println("Produced: " + value);
                queue.add(value++);
                notify();  // μ†ŒλΉ„μžμ—κ²Œ μ‹ ν˜Έλ₯Ό 보내 λŒ€κΈ° μƒνƒœμ—μ„œ 깨움
            }
        }
    }

    // μ†ŒλΉ„μž λ©”μ„œλ“œ
    public void consume() throws InterruptedException {
        while (true) {
            synchronized (this) {
                while (queue.isEmpty()) {
                    wait();  // 큐가 λΉ„μ–΄ 있으면 λŒ€κΈ°
                }
                int value = queue.poll();
                System.out.println("Consumed: " + value);
                notify();  // μƒμ‚°μžμ—κ²Œ μ‹ ν˜Έλ₯Ό 보내 λŒ€κΈ° μƒνƒœμ—μ„œ 깨움
            }
        }
    }
}

public class ProducerConsumerExample {
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();

        // μƒμ‚°μž μŠ€λ ˆλ“œ
        Thread producer = new Thread(() -> {
            try {
                pc.produce();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // μ†ŒλΉ„μž μŠ€λ ˆλ“œ
        Thread consumer = new Thread(() -> {
            try {
                pc.consume();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        producer.start();
        consumer.start();
    }
}

 

좜λ ₯ μ˜ˆμ‹œ

Produced: 0
Produced: 1
Produced: 2
Produced: 3
Produced: 4
Consumed: 0
Consumed: 1
Produced: 5
Consumed: 2
Produced: 6
...

 

  • wait() : μƒμ‚°μžκ°€ 큐가 가득 찼을 λ•Œ λ˜λŠ” μ†ŒλΉ„μžκ°€ 큐가 λΉ„μ—ˆμ„ λ•Œ, μŠ€λ ˆλ“œλ₯Ό λŒ€κΈ° μƒνƒœλ‘œ λ§Œλ“ λ‹€.
  • notify() : μƒμ‚°μž λ˜λŠ” μ†ŒλΉ„μžκ°€ λŒ€κΈ° μƒνƒœμ—μ„œ κΉ¨μ–΄λ‚˜κ²Œ ν•˜μ—¬ μž‘μ—…μ„ 계속 μˆ˜ν–‰ν•  수 μžˆλ„λ‘ ν•œλ‹€.

 

5. μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ˜ μž₯점

λ™μ‹œμ„± μ œμ–΄

μƒμ‚°μžμ™€ μ†ŒλΉ„μžκ°€ λ™μ‹œμ— 데이터λ₯Ό μ²˜λ¦¬ν•˜λ”λΌλ„ 동기화λ₯Ό 톡해 λ°μ΄ν„°μ˜ 무결성을 보μž₯ν•  수 μžˆλ‹€.

효율적 μžμ› μ‚¬μš©

λ²„νΌμ˜ 크기λ₯Ό μ œν•œν•΄ λ©”λͺ¨λ¦¬ μžμ›μ˜ μ‚¬μš©μ„ μ œμ–΄ν•  수 있으며, μƒμ‚°μžμ™€ μ†ŒλΉ„μžκ°€ 효율적으둜 ν˜‘λ ₯ν•˜μ—¬ μž‘μ—…μ„ μ²˜λ¦¬ν•  수 μžˆλ‹€.

λ©€λ¦¬μŠ€λ ˆλ“œ ν™˜κ²½μ—μ„œμ˜ μ•ˆμ •μ„±

μƒμ‚°μžμ™€ μ†ŒλΉ„μžκ°€ λ™κΈ°ν™”λœ λ°©μ‹μœΌλ‘œ 데이터λ₯Ό μ²˜λ¦¬ν•˜λ―€λ‘œ, μŠ€λ ˆλ“œ κ°„μ˜ μΆ©λŒμ„ 방지할 수 μžˆλ‹€.

 

6. μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ˜ μ‘μš©

이 νŒ¨ν„΄μ€ 주둜 닀쀑 μŠ€λ ˆλ“œ ν”„λ‘œκ·Έλž˜λ°μ—μ„œ μ‚¬μš©λ˜λ©°, λ‹€μ–‘ν•œ μƒν™©μ—μ„œ μ‘μš©λ  수 μžˆλ‹€.

μž‘μ—… λŒ€κΈ°μ—΄

μƒμ‚°μžκ°€ μž‘μ—…μ„ μƒμ„±ν•˜κ³  μ†ŒλΉ„μžκ°€ μ²˜λ¦¬ν•˜λŠ” μž‘μ—… νμ—μ„œ μ‚¬μš©λœλ‹€.

λ©”μ‹œμ§€ 처리 μ‹œμŠ€ν…œ

λ©”μ‹œμ§€λ₯Ό μƒμ„±ν•˜λŠ” μƒμ‚°μžμ™€ κ·Έ λ©”μ‹œμ§€λ₯Ό μ²˜λ¦¬ν•˜λŠ” μ†ŒλΉ„μžλ₯Ό λΆ„λ¦¬ν•˜μ—¬, λ©”μ‹œμ§€ 큐λ₯Ό μ‚¬μš©ν•΄ μ•ˆμ „ν•˜κ²Œ λ©”μ‹œμ§€λ₯Ό μ²˜λ¦¬ν•œλ‹€.

데이터 슀트리밍

슀트리밍 λ°μ΄ν„°μ˜ μƒμ‚°μž(데이터 제곡자)와 μ†ŒλΉ„μž(데이터 처리자)κ°€ ν˜‘λ ₯ν•˜μ—¬ 데이터λ₯Ό μ‹€μ‹œκ°„μœΌλ‘œ μ²˜λ¦¬ν•  수 μžˆλ‹€.

 

κ²°λ‘ 

μƒμ‚°μž-μ†ŒλΉ„μž νŒ¨ν„΄μ€ λ©€ν‹°μŠ€λ ˆλ“œ ν™˜κ²½μ—μ„œ 데이터λ₯Ό μ•ˆμ „ν•˜κ²Œ κ³΅μœ ν•˜λŠ” λ°©λ²•μœΌλ‘œ, μƒμ‚°μž μŠ€λ ˆλ“œ 데이터λ₯Ό μƒμ„±ν•˜κ³  μ†ŒλΉ„μž μŠ€λ ˆλ“œκ°€ κ·Έ 데이터λ₯Ό μ²˜λ¦¬ν•  λ•Œ 자주 μ‚¬μš©λœλ‹€. 이 νŒ¨ν„΄μ€ λ™μ‹œμ„± 문제λ₯Ό ν•΄κ²°ν•˜λ©°, BlockingQueue와 같은 λ™κΈ°ν™”λœ 큐λ₯Ό μ‚¬μš©ν•˜λ©΄ λ³΅μž‘ν•œ 동기화 μ½”λ“œλ₯Ό ν”Όν•  수 μžˆλ‹€.