Spring boot + Async + Stomp Websocket을 이용한 랜덤 채팅 구현 예제
> 데모
- server : spring boot, stomp websocket
- client : bootstrap, jquery, handlebars, sockjs
채팅 요청 ~ 채팅 형성까지는 비동기로 대기하고 있고, 채팅방이 형성되면 websocket으로 메시지를 주고받는 식으로 간단하게 구현해봤습니다.
간단한 로직은 아래와 같습니다
1. 채팅 요청
1. 채팅 요청 (Async : DeferredResult로 응답)
2. 유저 등록 (대기 큐에 등록)
3. 채팅 가능 체크
3-1) 대기 큐에 2명 이상
=> UUID로 채팅방 이름 생성 + Success (+ 채팅방 이름 포함)
=> UUID로 subscribe 요청
3-2) 대기 큐에 2명 미만이면 대기
3-2-1) 새로운 유저 요청
=> 3-1
3-2-1) 타임아웃
=> 실패 처리
2. 채팅 진행
1. 웹소켓 연결 (+ nativeHeader에 chatRoomId(UUID) 포함)
1.1 웹소켓 이벤트 리스너에 chatRoomId, session id 저장
2. /topic/chat/{chatRoomId} subscribe
3. /app/chat.message/{chatRoomId}로 메시지 전송
=> /topic/chat/{chatRoomId}로 메시지 전송
4. 상대방 연결 종료
=> 웹소켓 이벤트 리스너에서 해당 session id의 chatRoomId를 구함
=> /topic/chat/{chatRoomId}로 연결종료 메시지 전송
입니당
3. 소스코드
1. config 설정
Async 설정
import java.util.concurrent.Executor; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @Configuration @EnableAsync public class AsyncConfiguration { @Bean public Executor asyncThreadPool() { ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); taskExecutor.setCorePoolSize(3); taskExecutor.setMaxPoolSize(30); taskExecutor.setQueueCapacity(10); taskExecutor.setThreadNamePrefix("Async-Executor-"); taskExecutor.setDaemon(true); taskExecutor.initialize(); return taskExecutor; } }
> Stomp 설정
import org.springframework.context.annotation.Configuration; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; @Configuration @EnableWebSocketMessageBroker public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer { @Override public void configureMessageBroker(MessageBrokerRegistry config) { config.enableSimpleBroker("/topic/chat"); config.setApplicationDestinationPrefixes("/app"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/chat-websocket").withSockJS(); } }
> Join 비동기 처리
@GetMapping("/join") @ResponseBody public DeferredResultjoinRequest() { String sessionId = ServletUtil.getSession().getId(); logger.info(">> Join request. session id : {}", sessionId); final ChatRequest user = new ChatRequest(sessionId); final DeferredResult deferredResult = new DeferredResult<>(null); chatService.joinChatRoom(user, deferredResult); deferredResult.onCompletion(() -> chatService.cancelChatRoom(user)); deferredResult.onError((throwable) -> chatService.cancelChatRoom(user)); deferredResult.onTimeout(() -> chatService.cancelChatRoom(user)); return deferredResult; }
@Async("asyncThreadPool") public void joinChatRoom(ChatRequest request, DeferredResultdeferredResult) { logger.info("## Join chat room request. {}[{}]", Thread.currentThread().getName(), Thread.currentThread().getId()); if (request == null || deferredResult == null) { return; } try { lock.writeLock().lock(); waitingUsers.put(request, deferredResult); } finally { lock.writeLock().unlock(); establishChatRoom(); } }
> 웹소켓 연결 이벤트 처리
import demo.service.ChatService; import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.event.EventListener; import org.springframework.messaging.simp.SimpMessageHeaderAccessor; import org.springframework.messaging.simp.stomp.StompHeaderAccessor; import org.springframework.messaging.support.GenericMessage; import org.springframework.messaging.support.MessageHeaderAccessor; import org.springframework.messaging.support.NativeMessageHeaderAccessor; import org.springframework.stereotype.Component; import org.springframework.web.socket.messaging.SessionConnectedEvent; import org.springframework.web.socket.messaging.SessionDisconnectEvent; @Component public class WebSocketEventListener { private static final Logger logger = LoggerFactory.getLogger(WebSocketEventListener.class); @Autowired private ChatService chatService; @EventListener public void handleWebSocketConnectListener(SessionConnectedEvent event) { MessageHeaderAccessor accessor = NativeMessageHeaderAccessor.getAccessor(event.getMessage(), SimpMessageHeaderAccessor.class); GenericMessage generic = (GenericMessage) accessor.getHeader("simpConnectMessage"); MapnativeHeaders = (Map ) generic.getHeaders().get("nativeHeaders"); String chatRoomId = ((List ) nativeHeaders.get("chatRoomId")).get(0); String sessionId = (String) generic.getHeaders().get("simpSessionId"); /*StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage()); System.out.println("## headerAccessor :: " + headerAccessor); String chatRoomId = headerAccessor.getNativeHeader("chatRoomId").get(0); String sessionId = headerAccessor.getSessionId();*/ logger.info("[Connected] room id : {} | websocket session id : {}", chatRoomId, sessionId); chatService.connectUser(chatRoomId, sessionId); } @EventListener public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) { StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage()); String sessionId = headerAccessor.getSessionId(); logger.info("[Disconnected] websocket session id : {}", sessionId); chatService.disconnectUser(sessionId); } }
> 웹소켓 @MessageMapping & @SendTo
@MessageMapping("/chat.message/{chatRoomId}") public void sendMessage(@DestinationVariable("chatRoomId") String chatRoomId, @Payload ChatMessage chatMessage) { logger.info("Request message. roomd id : {} | chat message : {} | principal : {}", chatRoomId, chatMessage); if (!StringUtils.hasText(chatRoomId) || chatMessage == null) { return; } if (chatMessage.getMessageType() == MessageType.CHAT) { chatService.sendMessage(chatRoomId, chatMessage); } }
> ChatService (@SendTo)
@Service public class ChatService { private static final Logger logger = LoggerFactory.getLogger(ChatService.class); private Map> waitingUsers; // {key : websocket session id, value : chat room id} private Map connectedUsers; private ReentrantReadWriteLock lock; @Autowired private SimpMessagingTemplate messagingTemplate; ... public void sendMessage(String chatRoomId, ChatMessage chatMessage) { String destination = getDestination(chatRoomId); messagingTemplate.convertAndSend(destination, chatMessage); } private String getDestination(String chatRoomId) { return "/topic/chat/" + chatRoomId; }
- Client side (JS)
> Websocket connect
ChatManager.connectAndSubscribe = function () { if (ChatManager.stompClient == null || !ChatManager.stompClient.connected) { var socket = new SockJS('/chat-websocket'); ChatManager.stompClient = Stomp.over(socket); ChatManager.stompClient.connect({chatRoomId: ChatManager.chatRoomId}, function (frame) { console.log('Connected: ' + frame); ChatManager.subscribeMessage(); }); } else { ChatManager.subscribeMessage(); } };
> Websocket send message
ChatManager.sendMessage = function () { console.log('Check.. >>\n', ChatManager.stompClient); console.log('send message.. >> '); var $chatTarget = $('#chat-message-input'); var message = $chatTarget.val(); $chatTarget.val(''); var payload = { messageType : 'CHAT', senderSessionId: ChatManager.sessionId, message : message }; ChatManager.stompClient.send('/app/chat.message/' + ChatManager.chatRoomId, {}, JSON.stringify(payload)); };
> 웹소켓 Subscribe
ChatManager.subscribeMessage = function () { ChatManager.stompClient.subscribe('/topic/chat/' + ChatManager.chatRoomId, function (resultObj) { console.log('>> success to receive message\n', resultObj.body); var result = JSON.parse(resultObj.body); var message = ''; if (result.messageType == 'CHAT') { if (result.senderSessionId === ChatManager.sessionId) { message += '[Me] : '; } else { message += '[Anonymous] : '; } message += result.message + '\n'; } else if (result.messageType == 'DISCONNECTED') { message = '>> Disconnected user :('; ChatManager.disconnect(); } ChatManager.updateText(message, true); }); };
전체 코드는 github 에서 확인할 수 있습니다
(스타는 사랑입니다)
'Spring Boot > APPLIED' 카테고리의 다른 글
[Heartbeat] Heartbeat로 서버 헬스체크하기 (Spring boot + Slack Web Hooks + Bot) (0) | 2019.01.18 |
---|