ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Redis Pub-Sub 사용하기
    극락코딩 2023. 8. 19. 11:41
    Redis의 사용용도는 무궁무진하다. 캐싱, 큐, 분산락, 그리고 pub-sub 등 다양한 기능으로 사용될 수 있다.
    이번에는 redis pub-sub에 대해 정리해볼려고 한다.

     

     

     

     

    Pub-Sub 구조란?

    분산 시스템에서 사용되는 메시징 아키텍처 패턴 중 하나이다. 시스템 내의 다른 컴포넌트나 서비스 간에 데이터나 이벤트를 비동기적으로 교환하는데 사용한다. --> 컴포넌트들을 느슨한 결합으로 연결할 수 있다.

     

    pubsub-microsoft example

     

     

     

     

    구조

    1. Publisher (발행자)
      1. Publisher는 메시지나 이벤트를 생성하고 PubSub 시스템에 발행
      2. 데이터 생산자 역할
      3. Publisher는 어떤 이벤트가 발생했을 때 해당 이벤트를 PubSub 시스템에 알리는 역할
    2. Subscriber (구독자)
      1. Subscriber는 PubSub 시스템에서 발행된 메시지나 이벤트를 구독하는 역할
      2. Subscriber는 원하는 주제나 토픽을 구독하고, 해당 주제와 관련된 이벤트가 발생하면 이를 수신하고 처리
      3. Subscriber는 이벤트나 메시지를 소비자 역할
    3. Topic (주제)
      1. Topic은 메시지를 관리하고 전달하는 중개자 역할
      2. Publisher는 특정 주제에 메시지를 발행하고, Subscriber는 해당 주제를 구독
      3. Topic은 메시지를 발행한 Publisher에서 모든 Subscriber로 메시지를 전달하는 중간 역할
      4. 각 주제는 메시지 카테고리 또는 토픽으로 생각할 수 있음

     

     

     

     

    PUB-SUB의 핵심은?

    PubSub 시스템의 핵심 아이디어는 Publisher와 Subscriber가 서로 직접 통신하지 않는다는 것이다. 대신, 중간에서 메시지 브로커나 메시지 큐와 같은 시스템이 이벤트를 관리하고 전달한다. 이를 통해 시스템의 컴포넌트는 서로 독립적으로 동작하며, 쉽게 확장 가능하며 유연하게 구성 가능하다.

     

     

    Code로 보는 Pub-Sub Example

     

    Publisher

    @Component
    class RedisPublisher(
        private val redisTemplate: RedisTemplate<String, Any>
    ) {
        /** 특정 채널에 Message 발행 */
        fun publish(channel: String, message: Any) {
            redisTemplate.convertAndSend(channel, message)
        }
    
        /** 특정 채널에 Message 발행 */
        fun publish(channel: ChannelTopic, message: Any) {
            redisTemplate.convertAndSend(channel.topic, message)
        }
    }

    RedisTemplate의 convertAndSend를 통해 메세지를 publish할 수 있다.

    RedisTemplate

    @Override
    public void convertAndSend(String channel, Object message) {
    
       Assert.hasText(channel, "a non-empty channel is required");
    
       byte[] rawChannel = rawString(channel);
       byte[] rawMessage = rawValue(message);
    
       execute(connection -> {
          connection.publish(rawChannel, rawMessage);
          return null;
       }, true);
    }

    퍼블리시 하는 라이브러리 코드를 보면 알 수 있는데, connection을 열고 특정 채널에 요청된 메세지를 발송한다.

    여기서 connection은 Redis Connection이다. 현재 Reids Pub-Sub을 보는거니까~

    여기서 주의할 것은 message의 타입이 Object이다.. 직렬화 고통을 받지 않을려면... 조심해야 한다.

    또, RedisTemplate만 지원하고 있으니, 다른 Template Library는... 못쓴다는.. 허점..

     

     

     

     

     

    Subscriber

    지정된 채널에서 데이터를 받는 로직이다.

    @Component
    class RedisSubscriber(
        private val redisTemplate: RedisTemplate<String, Any>
    ) : MessageListener {
        /** 특정 채널의 메세지 구독 */
        override fun onMessage(message: Message, pattern: ByteArray?) {
            val channel = redisTemplate.getChannel(message)
            val content = redisTemplate.getMessage(message)
    
            run {
                // TODO : 별도의 로직을 수행한다.
            }
        }
    }
    
    /** 발행 이벤트 메세지 */
    data class EventModel(
        val id: Long,
        val description: String
    )
    
    /** 채널 정보 조회 */
    fun RedisTemplate<String, Any>.getChannel(message: Message): String {
        return this.stringSerializer.deserialize(message.channel)
            ?: throw RedisPubSubException("channel is null or empty")
    }
    
    /** 메세지 조회 */
    fun RedisTemplate<String, Any>.getMessage(message: Message): EventModel {
        val messageBody = this.stringSerializer.deserialize(message.body)
            ?: throw RedisPubSubException("message is null or empty")
    
        return mapper.readValue(messageBody)
    }
    
    class RedisPubSubException(
        override val message: String
    ) : RuntimeException(message)
    

     

     

     

    여기서 중요한 것은 MessageListener이다. 아래와 같이 구성되어 있는데,

    /**
     * Listener of messages published in Redis. A MessageListener can implement {@link SubscriptionListener} to receive
     * notifications for subscription states.
     *
     * @author Costin Leau
     * @author Christoph Strobl
     * @see SubscriptionListener
     */
    @FunctionalInterface
    public interface MessageListener {
    
       /**
        * Callback for processing received objects through Redis.
        *
        * @param message message must not be {@literal null}.
        * @param pattern pattern matching the channel (if specified) - can be {@literal null}.
        */
       void onMessage(Message message, @Nullable byte[] pattern);
    }
    

    해당 코드만 봤을 때는 어떻게 동작하는지 모르니 조금 더 깊게 가보면,

     

    RedisMessageListenerContainer

    @Override
    public void onMessage(Message message, @Nullable byte[] pattern) {
       Collection<MessageListener> listeners = null;
    
       // if it's a pattern, disregard channel
       if (pattern != null && pattern.length > 0) {
          listeners = patternMapping.get(new ByteArrayWrapper(pattern));
       } else {
          pattern = null;
          // do channel matching first
          listeners = channelMapping.get(new ByteArrayWrapper(message.getChannel()));
       }
    
       if (!CollectionUtils.isEmpty(listeners)) {
          dispatchMessage(listeners, message, pattern);
       }
    }

    패턴 기반으로, 채널 매칭 작업을 진행한다. (역시 ByteArrayWrapper가 쓰이네)

    발견된 리스너가 있는 경우에, subscribe를 위한 dispatchMessage 함수를 실행한다.

     

     

    private void dispatchMessage(Collection<MessageListener> listeners, Message message, @Nullable byte[] pattern) {
    
       byte[] source = (pattern != null ? pattern.clone() : message.getChannel());
    
       Executor executor = getRequiredTaskExecutor();
       for (MessageListener messageListener : listeners) {
          executor.execute(() -> processMessage(messageListener, message, source));
       }
    }
    

     

     

    리스너는 Executor를 만들고, 해당 Executer를 통해 Redis로 부터 메세지를 받아오는 작업을 수행한다는 것을 알 수 있다.

    아래의 코드가 Redis에서 만든 default executor이다.

     

    /**
     * Creates a default TaskExecutor. Called if no explicit TaskExecutor has been specified.
     * <p>
     * The default implementation builds a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} with the
     * specified bean name (or the class name, if no bean name specified) as thread name prefix.
     *
     * @see org.springframework.core.task.SimpleAsyncTaskExecutor#SimpleAsyncTaskExecutor(String)
     */
    protected TaskExecutor createDefaultTaskExecutor() {
       String threadNamePrefix = (beanName != null ? beanName + "-" : DEFAULT_THREAD_NAME_PREFIX);
       return new SimpleAsyncTaskExecutor(threadNamePrefix);
    }
    

     

    기본적으로 위와 같이 thread name 설정이 진행된다.

     

     

     

    다음으로 주의 깊게 봐야 할 것은 Message Interface이다.

    /**
     * Class encapsulating a Redis message body and its properties.
     *
     * @author Costin Leau
     * @author Christoph Strobl
     */
    public interface Message extends Serializable {
    
       /**
        * Returns the body (or the payload) of the message.
        *
        * @return message body. Never {@literal null}.
        */
       byte[] getBody();
    
       /**
        * Returns the channel associated with the message.
        *
        * @return message channel. Never {@literal null}.
        */
       byte[] getChannel();
    }
    

    Message Interface에 channel 정보다 message 정보를 담고있다.

     

     

     

     

     

     

    전체 코드

    @Component
    class RedisPublisher(
        private val redisTemplate: RedisTemplate<String, Any>
    ) {
        /** 특정 채널에 Message 발행 */
        fun publish(channel: String, message: Any) {
            redisTemplate.convertAndSend(channel, message)
        }
    
        /** 특정 채널에 Message 발행 */
        fun publish(channel: ChannelTopic, message: Any) {
            redisTemplate.convertAndSend(channel.topic, message)
        }
    }
    
    @Component
    class RedisSubscriber(
        private val redisTemplate: RedisTemplate<String, Any>
    ) : MessageListener {
        /** 특정 채널의 메세지 구독 */
        override fun onMessage(message: Message, pattern: ByteArray?) {
            val channel = redisTemplate.getChannel(message)
            val content = redisTemplate.getMessage(message)
    
            run {
                // TODO : 별도의 로직을 수행한다.
            }
        }
    }
    
    /** 발행 이벤트 메세지 */
    data class EventModel(
        val id: Long,
        val description: String
    )
    
    /** 채널 정보 조회 */
    fun RedisTemplate<String, Any>.getChannel(message: Message): String {
        return this.stringSerializer.deserialize(message.channel)
            ?: throw RedisPubSubException("channel is null or empty")
    }
    
    /** 메세지 조회 */
    fun RedisTemplate<String, Any>.getMessage(message: Message): EventModel {
        val messageBody = this.stringSerializer.deserialize(message.body)
            ?: throw RedisPubSubException("message is null or empty")
    
        return mapper.readValue(messageBody)
    }
    
    class RedisPubSubException(
        override val message: String
    ) : RuntimeException(message)

     

    GitHub Repo

극락코딩