본문 바로가기

Spring Boot/APPLIED

Spring async + stomp websocket 랜덤 채팅 구현 예제


Spring boot + Async + Stomp Websocket을 이용한 랜덤 채팅 구현 예제


> 데모





- server : spring boot, stomp websocket 

- client : bootstrap, jquery, handlebars, sockjs


채팅 요청 ~ 채팅 형성까지는 비동기로 대기하고 있고, 채팅방이 형성되면 websocket으로 메시지를 주고받는 식으로 간단하게 구현해봤습니다.

간단한 로직은 아래와 같습니다

1. 채팅 요청


1. 채팅 요청 (Async : DeferredResult로 응답)

2. 유저 등록 (대기 큐에 등록)

3. 채팅 가능 체크 

3-1) 대기 큐에 2명 이상

=> UUID로 채팅방 이름 생성 + Success (+ 채팅방 이름 포함) 

=> UUID로 subscribe 요청

3-2) 대기 큐에 2명 미만이면 대기

3-2-1) 새로운 유저 요청 

=> 3-1

3-2-1) 타임아웃

=> 실패 처리



2. 채팅 진행

1. 웹소켓 연결 (+ nativeHeader에 chatRoomId(UUID) 포함)

1.1 웹소켓 이벤트 리스너에 chatRoomId, session id 저장

2. /topic/chat/{chatRoomId} subscribe

3. /app/chat.message/{chatRoomId}로 메시지 전송

=> /topic/chat/{chatRoomId}로 메시지 전송

4. 상대방 연결 종료

=> 웹소켓 이벤트 리스너에서 해당 session id의 chatRoomId를 구함

=> /topic/chat/{chatRoomId}로 연결종료 메시지 전송


입니당



3. 소스코드


1. config 설정



Async 설정

import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableAsync
public class AsyncConfiguration {

    @Bean
    public Executor asyncThreadPool() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();

        taskExecutor.setCorePoolSize(3);
        taskExecutor.setMaxPoolSize(30);
        taskExecutor.setQueueCapacity(10);
        taskExecutor.setThreadNamePrefix("Async-Executor-");
        taskExecutor.setDaemon(true);
        taskExecutor.initialize();

        return taskExecutor;
    }
}

> Stomp 설정

import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;

@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic/chat");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/chat-websocket").withSockJS();
    }
}



> Join 비동기 처리

@GetMapping("/join")
    @ResponseBody
    public DeferredResult joinRequest() {
        String sessionId = ServletUtil.getSession().getId();
        logger.info(">> Join request. session id : {}", sessionId);

        final ChatRequest user = new ChatRequest(sessionId);
        final DeferredResult deferredResult = new DeferredResult<>(null);
        chatService.joinChatRoom(user, deferredResult);

        deferredResult.onCompletion(() -> chatService.cancelChatRoom(user));
        deferredResult.onError((throwable) -> chatService.cancelChatRoom(user));
        deferredResult.onTimeout(() -> chatService.cancelChatRoom(user));

        return deferredResult;
    }
 @Async("asyncThreadPool")
    public void joinChatRoom(ChatRequest request, DeferredResult deferredResult) {
        logger.info("## Join chat room request. {}[{}]", Thread.currentThread().getName(), Thread.currentThread().getId());
        if (request == null || deferredResult == null) {
            return;
        }

        try {
            lock.writeLock().lock();
            waitingUsers.put(request, deferredResult);
        } finally {
            lock.writeLock().unlock();
            establishChatRoom();
        }
    }


> 웹소켓 연결 이벤트 처리

import demo.service.ChatService;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;

@Component
public class WebSocketEventListener {

    private static final Logger logger = LoggerFactory.getLogger(WebSocketEventListener.class);

    @Autowired
    private ChatService chatService;

    @EventListener
    public void handleWebSocketConnectListener(SessionConnectedEvent event) {
        MessageHeaderAccessor accessor = NativeMessageHeaderAccessor.getAccessor(event.getMessage(), SimpMessageHeaderAccessor.class);
        GenericMessage generic = (GenericMessage) accessor.getHeader("simpConnectMessage");
        Map nativeHeaders = (Map) generic.getHeaders().get("nativeHeaders");
        String chatRoomId = ((List) nativeHeaders.get("chatRoomId")).get(0);
        String sessionId = (String) generic.getHeaders().get("simpSessionId");

        /*StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
        System.out.println("## headerAccessor :: " + headerAccessor);
        String chatRoomId = headerAccessor.getNativeHeader("chatRoomId").get(0);
        String sessionId = headerAccessor.getSessionId();*/

        logger.info("[Connected] room id : {} | websocket session id : {}", chatRoomId, sessionId);

        chatService.connectUser(chatRoomId, sessionId);
    }

    @EventListener
    public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
        StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());

        String sessionId = headerAccessor.getSessionId();

        logger.info("[Disconnected] websocket session id : {}", sessionId);

        chatService.disconnectUser(sessionId);
    }
}


> 웹소켓 @MessageMapping & @SendTo 

@MessageMapping("/chat.message/{chatRoomId}")
    public void sendMessage(@DestinationVariable("chatRoomId") String chatRoomId, @Payload ChatMessage chatMessage) {
        logger.info("Request message. roomd id : {} | chat message : {} | principal : {}", chatRoomId, chatMessage);
        if (!StringUtils.hasText(chatRoomId) || chatMessage == null) {
            return;
        }

        if (chatMessage.getMessageType() == MessageType.CHAT) {
            chatService.sendMessage(chatRoomId, chatMessage);
        }
    }

> ChatService (@SendTo)

@Service
public class ChatService {

    private static final Logger logger = LoggerFactory.getLogger(ChatService.class);
    private Map> waitingUsers;
    // {key : websocket session id, value : chat room id}
    private Map connectedUsers;
    private ReentrantReadWriteLock lock;

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    ...

    public void sendMessage(String chatRoomId, ChatMessage chatMessage) {
        String destination = getDestination(chatRoomId);
        messagingTemplate.convertAndSend(destination, chatMessage);
    }

    private String getDestination(String chatRoomId) {
        return "/topic/chat/" + chatRoomId;
    } 


- Client side (JS)  



> Websocket connect

    ChatManager.connectAndSubscribe = function () {
      if (ChatManager.stompClient == null || !ChatManager.stompClient.connected) {
        var socket = new SockJS('/chat-websocket');
        ChatManager.stompClient = Stomp.over(socket);
        ChatManager.stompClient.connect({chatRoomId: ChatManager.chatRoomId}, function (frame) {
          console.log('Connected: ' + frame);
          ChatManager.subscribeMessage();
        });
      } else {
        ChatManager.subscribeMessage();
      }
    };


> Websocket send message

    ChatManager.sendMessage = function () {
      console.log('Check.. >>\n', ChatManager.stompClient);
      console.log('send message.. >> ');
      var $chatTarget = $('#chat-message-input');
      var message = $chatTarget.val();
      $chatTarget.val('');

      var payload = {
        messageType    : 'CHAT',
        senderSessionId: ChatManager.sessionId,
        message        : message
      };

      ChatManager.stompClient.send('/app/chat.message/' + ChatManager.chatRoomId, {}, JSON.stringify(payload));
    };

> 웹소켓 Subscribe

    ChatManager.subscribeMessage = function () {
      ChatManager.stompClient.subscribe('/topic/chat/' + ChatManager.chatRoomId, function (resultObj) {
        console.log('>> success to receive message\n', resultObj.body);
        var result = JSON.parse(resultObj.body);
        var message = '';

        if (result.messageType == 'CHAT') {
          if (result.senderSessionId === ChatManager.sessionId) {
            message += '[Me] : ';
          } else {
            message += '[Anonymous] : ';
          }

          message += result.message + '\n';
        } else if (result.messageType == 'DISCONNECTED') {
          message = '>> Disconnected user :(';
          ChatManager.disconnect();
        }
        ChatManager.updateText(message, true);
      });
    };


전체 코드는 github 에서 확인할 수 있습니다

(스타는 사랑입니다)