Spring boot + Async + Stomp Websocket을 이용한 랜덤 채팅 구현 예제
> 데모
- server : spring boot, stomp websocket
- client : bootstrap, jquery, handlebars, sockjs
채팅 요청 ~ 채팅 형성까지는 비동기로 대기하고 있고, 채팅방이 형성되면 websocket으로 메시지를 주고받는 식으로 간단하게 구현해봤습니다.
간단한 로직은 아래와 같습니다
1. 채팅 요청
1. 채팅 요청 (Async : DeferredResult로 응답)
2. 유저 등록 (대기 큐에 등록)
3. 채팅 가능 체크
3-1) 대기 큐에 2명 이상
=> UUID로 채팅방 이름 생성 + Success (+ 채팅방 이름 포함)
=> UUID로 subscribe 요청
3-2) 대기 큐에 2명 미만이면 대기
3-2-1) 새로운 유저 요청
=> 3-1
3-2-1) 타임아웃
=> 실패 처리
2. 채팅 진행
1. 웹소켓 연결 (+ nativeHeader에 chatRoomId(UUID) 포함)
1.1 웹소켓 이벤트 리스너에 chatRoomId, session id 저장
2. /topic/chat/{chatRoomId} subscribe
3. /app/chat.message/{chatRoomId}로 메시지 전송
=> /topic/chat/{chatRoomId}로 메시지 전송
4. 상대방 연결 종료
=> 웹소켓 이벤트 리스너에서 해당 session id의 chatRoomId를 구함
=> /topic/chat/{chatRoomId}로 연결종료 메시지 전송
입니당
3. 소스코드
1. config 설정
Async 설정
import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class AsyncConfiguration {
@Bean
public Executor asyncThreadPool() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(3);
taskExecutor.setMaxPoolSize(30);
taskExecutor.setQueueCapacity(10);
taskExecutor.setThreadNamePrefix("Async-Executor-");
taskExecutor.setDaemon(true);
taskExecutor.initialize();
return taskExecutor;
}
}
> Stomp 설정
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
@Configuration
@EnableWebSocketMessageBroker
public class WebsocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic/chat");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/chat-websocket").withSockJS();
}
}
> Join 비동기 처리
@GetMapping("/join")
@ResponseBody
public DeferredResult joinRequest() {
String sessionId = ServletUtil.getSession().getId();
logger.info(">> Join request. session id : {}", sessionId);
final ChatRequest user = new ChatRequest(sessionId);
final DeferredResult deferredResult = new DeferredResult<>(null);
chatService.joinChatRoom(user, deferredResult);
deferredResult.onCompletion(() -> chatService.cancelChatRoom(user));
deferredResult.onError((throwable) -> chatService.cancelChatRoom(user));
deferredResult.onTimeout(() -> chatService.cancelChatRoom(user));
return deferredResult;
}
@Async("asyncThreadPool")
public void joinChatRoom(ChatRequest request, DeferredResult deferredResult) {
logger.info("## Join chat room request. {}[{}]", Thread.currentThread().getName(), Thread.currentThread().getId());
if (request == null || deferredResult == null) {
return;
}
try {
lock.writeLock().lock();
waitingUsers.put(request, deferredResult);
} finally {
lock.writeLock().unlock();
establishChatRoom();
}
}
> 웹소켓 연결 이벤트 처리
import demo.service.ChatService;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.SimpMessageHeaderAccessor;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.messaging.support.NativeMessageHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
@Component
public class WebSocketEventListener {
private static final Logger logger = LoggerFactory.getLogger(WebSocketEventListener.class);
@Autowired
private ChatService chatService;
@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
MessageHeaderAccessor accessor = NativeMessageHeaderAccessor.getAccessor(event.getMessage(), SimpMessageHeaderAccessor.class);
GenericMessage generic = (GenericMessage) accessor.getHeader("simpConnectMessage");
Map nativeHeaders = (Map) generic.getHeaders().get("nativeHeaders");
String chatRoomId = ((List) nativeHeaders.get("chatRoomId")).get(0);
String sessionId = (String) generic.getHeaders().get("simpSessionId");
/*StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
System.out.println("## headerAccessor :: " + headerAccessor);
String chatRoomId = headerAccessor.getNativeHeader("chatRoomId").get(0);
String sessionId = headerAccessor.getSessionId();*/
logger.info("[Connected] room id : {} | websocket session id : {}", chatRoomId, sessionId);
chatService.connectUser(chatRoomId, sessionId);
}
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = headerAccessor.getSessionId();
logger.info("[Disconnected] websocket session id : {}", sessionId);
chatService.disconnectUser(sessionId);
}
}
> 웹소켓 @MessageMapping & @SendTo
@MessageMapping("/chat.message/{chatRoomId}")
public void sendMessage(@DestinationVariable("chatRoomId") String chatRoomId, @Payload ChatMessage chatMessage) {
logger.info("Request message. roomd id : {} | chat message : {} | principal : {}", chatRoomId, chatMessage);
if (!StringUtils.hasText(chatRoomId) || chatMessage == null) {
return;
}
if (chatMessage.getMessageType() == MessageType.CHAT) {
chatService.sendMessage(chatRoomId, chatMessage);
}
}
> ChatService (@SendTo)
@Service
public class ChatService {
private static final Logger logger = LoggerFactory.getLogger(ChatService.class);
private Map> waitingUsers;
// {key : websocket session id, value : chat room id}
private Map connectedUsers;
private ReentrantReadWriteLock lock;
@Autowired
private SimpMessagingTemplate messagingTemplate;
...
public void sendMessage(String chatRoomId, ChatMessage chatMessage) {
String destination = getDestination(chatRoomId);
messagingTemplate.convertAndSend(destination, chatMessage);
}
private String getDestination(String chatRoomId) {
return "/topic/chat/" + chatRoomId;
}
- Client side (JS)
> Websocket connect
ChatManager.connectAndSubscribe = function () {
if (ChatManager.stompClient == null || !ChatManager.stompClient.connected) {
var socket = new SockJS('/chat-websocket');
ChatManager.stompClient = Stomp.over(socket);
ChatManager.stompClient.connect({chatRoomId: ChatManager.chatRoomId}, function (frame) {
console.log('Connected: ' + frame);
ChatManager.subscribeMessage();
});
} else {
ChatManager.subscribeMessage();
}
};
> Websocket send message
ChatManager.sendMessage = function () {
console.log('Check.. >>\n', ChatManager.stompClient);
console.log('send message.. >> ');
var $chatTarget = $('#chat-message-input');
var message = $chatTarget.val();
$chatTarget.val('');
var payload = {
messageType : 'CHAT',
senderSessionId: ChatManager.sessionId,
message : message
};
ChatManager.stompClient.send('/app/chat.message/' + ChatManager.chatRoomId, {}, JSON.stringify(payload));
};
> 웹소켓 Subscribe
ChatManager.subscribeMessage = function () {
ChatManager.stompClient.subscribe('/topic/chat/' + ChatManager.chatRoomId, function (resultObj) {
console.log('>> success to receive message\n', resultObj.body);
var result = JSON.parse(resultObj.body);
var message = '';
if (result.messageType == 'CHAT') {
if (result.senderSessionId === ChatManager.sessionId) {
message += '[Me] : ';
} else {
message += '[Anonymous] : ';
}
message += result.message + '\n';
} else if (result.messageType == 'DISCONNECTED') {
message = '>> Disconnected user :(';
ChatManager.disconnect();
}
ChatManager.updateText(message, true);
});
};
전체 코드는 github 에서 확인할 수 있습니다
(스타는 사랑입니다)
'Spring Boot > APPLIED' 카테고리의 다른 글
| [Heartbeat] Heartbeat로 서버 헬스체크하기 (Spring boot + Slack Web Hooks + Bot) (0) | 2019.01.18 |
|---|