본문 바로가기

카테고리 없음

Publish / Subscribe 패턴 알아보기

Publish / Subscribe 패턴 알아보기

모든 소스 코드는 Github에 있습니다.

스타는 사랑입니다 :)

publish / subscribe 관련 패턴은


BlockingQueue를 이용하여 Publish / Subscribe 패턴 구현하기

자바의 java.util.concurrent.BlockingQueue 인터페이스 구현체를 통해서 Publish / Subscribe를 구현할 수 있습니다.


우선 Publisher와 Subscriber가 공통의 BlockingQueue 구현체 인스턴스를 가지고 메시지 Publish(publisher::offer())


메시지 Subcribe(subscriber::take())를 통해서 메시지를 전달할 수 있습니다.


아래는 간단한 Publisher, Subscriber, Runner 예제 입니다.

BlockingQueueSubscriber.java

import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BlockingQueueSubscriber extends Thread {

    // 메시지를 담을 큐
    private BlockingQueue<String> queue;
    private Optional<Consumer<String>> consumerOptional;

    public BlockingQueueSubscriber(BlockingQueue<String> queue, Optional<Consumer<String>> consumerOptional) {
        Objects.requireNonNull(queue, "queue must be not null");
        this.queue = queue;
        this.consumerOptional = consumerOptional;
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                // 큐에 담긴 메시지를 꺼내온다.
                // 메시지가 존재하지 않으면 블로킹 되어 있다가 메시지가
                // 생산되면 메시지를 반환한다.
                final String message = queue.take();                
                logger.info("[Consumer] {}", message);
                consumerOptional.ifPresent(consumer -> consumer.accept(message));
            }
        } catch (Exception e) {
            logger.error("Exception occur while taking messages", e);
            return;
        }

    }
}

BlockingQueuePublisher.java

package blog.publishsubscribe.blockingqueue;

import com.sun.media.jfxmedia.logging.Logger;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BlockingQueuePublisher extends Thread {

    // 메시지를 담을 큐
    private BlockingQueue<String> queue;
    private DateTimeFormatter formatter;
    private int messageCount;

    public BlockingQueuePublisher(BlockingQueue<String> queue) {
        Objects.requireNonNull(queue, "queue must be not null");
        this.queue = queue;
        this.formatter = DateTimeFormatter.ofPattern("[HH:mm:ss]");
        super.setDaemon(true);
    }

    @Override
    public void run() {
        try {
            while (!Thread.currentThread().isInterrupted()) {
                // 임시 메시지 생성
                String message = new StringBuilder()
                    .append(LocalDateTime.now().format(formatter))
                    .append("-message-")
                    .append(++messageCount)
                    .toString();

                logger.info("[Producer] {}", message);
                // 메시지 생산
                queue.offer(message);
                TimeUnit.SECONDS.sleep(1L);
            }
        } catch (InterruptedException e) {
            return;
        }
    }
}

BlockingQueueRunner

package blog.publishsubscribe.blockingqueue;

import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;

public class BlockingQueueRunner {

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch readCount = new CountDownLatch(3);
        BlockingQueue queue = new LinkedBlockingQueue();
        BlockingQueueSubscriber subscriber = new BlockingQueueSubscriber(queue,
            Optional.of(message -> readCount.countDown()));
        BlockingQueuePublisher publisher = new BlockingQueuePublisher(queue);

        subscriber.start();
        publisher.start();

        readCount.await();

        publisher.interrupt();
        subscriber.interrupt();
        // Output
        // 00:13:46.534 [Thread-1] INFO  b.p.b.BlockingQueuePublisher - [Producer] 00:13:46-message-1
        // 00:13:46.539 [Thread-0] INFO  b.p.b.BlockingQueueSubscriber - [Consumer] 00:13:46-message-1
        // 00:13:47.540 [Thread-1] INFO  b.p.b.BlockingQueuePublisher - [Producer] 00:13:47-message-2
        // 00:13:47.540 [Thread-0] INFO  b.p.b.BlockingQueueSubscriber - [Consumer] 00:13:47-message-2
        // 00:13:48.541 [Thread-1] INFO  b.p.b.BlockingQueuePublisher - [Producer] 00:13:48-message-3
        // 00:13:48.541 [Thread-0] INFO  b.p.b.BlockingQueueSubscriber - [Consumer] 00:13:48-message-3
    }
}

=> BlockingQueue 인터페이스는 상황에 따라 아래와 같은 큐를 쓸 수 있습니다.

  • java.util.concurrent.ArrayBlockingQueue (Array List 기반 FIFO)
  • java.util.concurrent.LinkedBlockingQueue (Linked List 기반 FIFO)
  • java.util.concurrent.PriorityBlockingQueue (우선순위큐)

=> 위의 Subscriber에서 queue.take() 메소드로 가져와야지만 메시지가 없을 때 해당 스레드가 블로킹 되어 있고
poll() 메소드를 호출 할 경우에는 poll() 메소드 호출 시점에서 큐가 비었으면 null을 반환합니다.


=> LinkedBlockingQueue를 간략하게 분석해보면, 아래와 같이 count.get() == 0 이면 (큐가 비어있으면) notEmpty.await() 함수를 호출해서 blocking 상태에 있게 되고, offer 메소드에서 보듯이 맨 마지막에 signalNotEmpty(); 메소드 호출을 통해서 blocking 되어 있는 스레드를 깨워줍니다.
> java.util.concurrent.LinkedBlockingQueue.java
    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                notEmpty.await();
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();
        long nanos = unit.toNanos(timeout);
        int c = -1;
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            while (count.get() == capacity) {
                if (nanos <= 0)
                    return false;
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(new Node<E>(e));
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }