1. Redis 채팅을 위한 pub/sub 개념
- 따로 Notion 페이지에 정리를 해두었다.
https://bitter-gambler-dbe.notion.site/Redis-1dcd91f7116d47f694de31ac71f8b4ab?pvs=4
이 글은 기본적인 Redis의 Pub/Sub 개념만을 사용하여 채팅을 구현하였으며 하나의 세션이 아닌 다른 세션에서 참가해도 함께 채팅이 가능한 멀티 서버이다.
이 글을 다 작성하고 바로 채팅 기록을 MongoDB를 사용해서 저장하려고한다. 그 이후에는 Jwt토큰으로 유효성 검사를 하는 코드까지 추가할 것이니 하나씩 따라해보면 좋을 것 같다.
2. 필요한 환경
- Java 17
- Spring 3.x
- Redis(Docker로 띄워서 사용하였다)
// redis(채팅방 저장용)
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
//freemarker
implementation 'org.springframework.boot:spring-boot-starter-freemarker'
//web socket
implementation 'org.springframework.boot:spring-boot-starter-websocket'
implementation 'org.springframework.boot:spring-boot-starter-web'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
implementation 'mysql:mysql-connector-java:8.0.15'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.security:spring-security-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
implementation 'org.webjars.bower:bootstrap:4.3.1'
implementation 'org.webjars.bower:vue:2.5.16'
implementation 'org.webjars.bower:axios:0.17.1'
implementation 'org.webjars:sockjs-client:1.1.2'
implementation 'org.webjars:stomp-websocket:2.3.3-1'
implementation 'com.google.code.gson:gson:2.8.0'
3. 설정
application.yml 설정
- redis의 포트는 기본적으로 6379을 쓰는 것 같다.
spring:
data:
redis:
host : localhost
port : 6379
Config파일
설명은 주석으로 각 객체의 역할을 적어두었다.
여기서 주의할 점!
listenerAdapterChatMessage의 메서드는 뒤에서 만들 RedisSubscriber 메서드명과 일치해야 한다.
@Configuration
@EnableRedisRepositories
public class RedisConfig {
// Redis 서버의 호스트 주소를 가져오는 변수
@Value("${spring.data.redis.host}")
private String redisHost;
// Redis 서버의 포트 번호를 가져오는 변수
@Value("${spring.data.redis.port}")
private int redisPort;
// RedisConnectionFactory 빈을 생성하는 메서드
// Redis 서버와의 연결을 설정하고 관리하는데 사용됩니다.
@Bean
public RedisConnectionFactory redisConnectionFactory() {
return new LettuceConnectionFactory(redisHost, redisPort);
}
// ChannelTopic 빈을 생성하는 메서드
// Redis의 pub/sub 메시징을 위한 채널 토픽을 설정합니다.
@Bean
public ChannelTopic channelTopic() {
return new ChannelTopic("chatroom");
}
// RedisMessageListenerContainer 빈을 생성하는 메서드
// Redis 메시지를 수신하고 리스너에 전달하는 컨테이너를 설정합니다.
@Bean
public RedisMessageListenerContainer redisMessage(
MessageListenerAdapter listenerAdapterChatMessage,
ChannelTopic channelTopic
){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory());
container.addMessageListener(listenerAdapterChatMessage, channelTopic);
return container;
}
// 실제 메시지를 처리하는 subscriber 설정 추가
// RedisSubscriber 클래스를 사용하여 메시지를 처리하는 어댑터를 설정합니다.
@Bean
public MessageListenerAdapter listenerAdapterChatMessage(RedisSubscriber subscriber) {
return new MessageListenerAdapter(subscriber, "onMessage");
}
// RedisTemplate 빈을 생성하는 메서드
// Redis 데이터를 직렬화하고 역직렬화하는 데 사용됩니다.
@Bean
public RedisTemplate<String, Object> chatRoomRedisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
// 키를 위한 직렬화 설정 - StringRedisSerializer 사용
template.setKeySerializer(new StringRedisSerializer());
template.setHashKeySerializer(new StringRedisSerializer());
// 값을 위한 직렬화 설정 - Jackson2JsonRedisSerializer 사용
Jackson2JsonRedisSerializer<ChatRoom> serializer = new Jackson2JsonRedisSerializer<>(ChatRoom.class);
template.setValueSerializer(serializer);
template.setHashValueSerializer(serializer);
return template;
}
}
WebSocketConfig
메시지 브로커란 의미가 생소할 수 있는데 우리가 아는 그 브로커의 의미와 동일하다. 중간에서 메시지를 처리해주는 중개역할이라고 생각하면 된다. 우리가 채팅을 치면 /pub로 시작하는 요청으로 진행되며 채팅은 리스너를 통해 감지하고 브로커를 통해 Subscriber들에게 전달된다.
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
// 메시지 브로커를 구성하는 메서드
// 클라이언트로부터의 메시지를 처리하고 응답을 전달하는 방법을 설정합니다.
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 간단한 메모리 기반 메시지 브로커를 활성화하고, 해당 브로커의 목적지 접두사를 "/sub"로 설정
registry.enableSimpleBroker("/sub");
// 애플리케이션에서 처리할 메시지의 접두사를 "/pub"로 설정
registry.setApplicationDestinationPrefixes("/pub");
}
// STOMP 엔드포인트를 등록하는 메서드
// 클라이언트가 웹소켓 서버에 연결할 수 있는 엔드포인트를 설정합니다.
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
// "/ws-stomp" 엔드포인트를 추가하고, 모든 도메인에서의 요청을 허용하며 SockJS 지원을 활성화
registry.addEndpoint("/ws-stomp").setAllowedOriginPatterns("*").withSockJS();
}
}
4. 구현
ChatRoom Entity
채팅방을 개설하고 채팅방 정보를 저장하기 위한 엔티티다. 따로 테이블을 생성하지 않은 상태이지만 임시적으로 엔티티라고 하겠다.
@Data
public class ChatRoom implements Serializable {
private static final long serialVersionUID = 6494678977089006639L;
private String roomId;
private String name;
public static ChatRoom create(String name) {
ChatRoom chatRoom = new ChatRoom();
chatRoom.roomId = UUID.randomUUID().toString();
chatRoom.name = name;
return chatRoom;
}
}
ChatMessage
메세지를 전송하고 채팅기록을 저장할 수 있는 엔티티다. Type은 Enum타입으로 ENTER,TALK가 있다.
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ChatMessage {
private Type type;
private String roomId;
private String sender;
private String message;
}
public enum Type {
ENTER, TALK
}
RedisPublisher
채팅창의 유저가 채팅을 치면 publish 서비스를 수행하게 된다.
Spring-Data-Redis에서 제공하는 redisTemplate을 사용해서 받은 메시지를 해당 채널에 뿌리게된다. 여기서 말하는 채널은 Topic과 동일한 의미이다.
@RequiredArgsConstructor
@Service
@Slf4j
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, ChatMessage message) {
log.info("published topic = {}", topic.getTopic());
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
RedisSubscriber
onMessage는 리스너에서 감지하여 받아온 메시지를 역직렬화하고 역직렬화된 메시지를 해당 채널을 구독하고 있는 모든 유저에게 전달하게 된다.
채팅이 잘 전달되고 있는지 확인하기 위해 각 컨트롤러, 서비스단에 로깅을 추가해두었다. 과정을 천천히 보고 이해하길 바란다.
@RequiredArgsConstructor
@Service
@Slf4j
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final RedisTemplate redisTemplate;
private final SimpMessageSendingOperations messagingTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
log.info("Received message: {}", publishMessage); // 수신된 메시지를 로깅
ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
log.info("Deserialized message: {}", roomMessage.getMessage()); // 역직렬화된 메시지를 로깅
messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
} catch (Exception e) {
log.error(e.getMessage());
}
}
ChatRoomController
채팅방 조회, 생성, 입장을 다루는 Controller단이다.
@RequiredArgsConstructor
@Controller
@RequestMapping("/chat")
public class ChatRoomController {
private final ChatRoomRepository chatRoomRepository;
@GetMapping("/room")
public String rooms(Model model) {
return "/chat/room";
}
@GetMapping("/rooms")
@ResponseBody
public List<ChatRoom> room() {
return chatRoomRepository.findAllRoom();
}
@PostMapping("/room")
@ResponseBody
public ChatRoom createRoom(@RequestParam String name) {
return chatRoomRepository.createChatRoom(name);
}
@GetMapping("/room/enter/{roomId}")
public String roomDetail(Model model, @PathVariable String roomId) {
model.addAttribute("roomId", roomId);
return "/chat/roomdetail";
}
@GetMapping("/room/{roomId}")
@ResponseBody
public ChatRoom roomInfo(@PathVariable String roomId) {
return chatRoomRepository.findRoomById(roomId);
}
}
ChatController
Type이 Enter로 들어오면 입장메시지를 publish하고 Talk 인경우에는 입력한 메세지를 publishing하도록 합니다.
@RequiredArgsConstructor
@Controller
@Slf4j
public class ChatController {
private final RedisPublisher redisPublisher;
private final ChatRoomRepository chatRoomRepository;
/**
* websocket "/pub/chat/message"로 들어오는 메시징 처리
*/
@MessageMapping("/chat/message")
public void message(ChatMessage message) {
log.info("Received message: {}", message);
if (Type.ENTER.equals(message.getType())) {
chatRoomRepository.enterChatRoom(message.getRoomId());
message.setMessage(message.getSender()+"님이 입장하셨습니다.");
log.info("User {} entered room {}", message.getSender(), message.getRoomId());
}
redisPublisher.publish(chatRoomRepository.getTopic(message.getRoomId()),message);
}
}
ChatRepository
DB를 연결한 것이 아니기 때문에 따로 Service단을 만들지 않고 Repository에서 처리를 합니다.
ChatRoomController에서 수행하는 로직들입니다.
@RequiredArgsConstructor
@Repository
@Slf4j
public class ChatRoomRepository {
// 채팅방(topic)에 발행되는 메시지를 처리할 Listener
private final RedisMessageListenerContainer redisMessageListener;
// 구독 처리 서비스
private final RedisSubscriber redisSubscriber;
// Redis
private static final String CHAT_ROOMS = "CHAT_ROOM";
private final RedisTemplate<String, Object> redisTemplate;
private HashOperations<String, String, ChatRoom> opsHashChatRoom;
// 채팅방의 대화 메시지를 발행하기 위한 redis topic 정보. 서버별로 채팅방에 매치되는 topic정보를 Map에 넣어 roomId로 찾을수 있도록 한다.
private Map<String, ChannelTopic> topics;
@PostConstruct
private void init() {
opsHashChatRoom = redisTemplate.opsForHash();
topics = new HashMap<>();
log.info("ChatRoomRepository initialized.");
}
public List<ChatRoom> findAllRoom() {
log.info("Fetching all chat rooms.");
return opsHashChatRoom.values(CHAT_ROOMS);
}
public ChatRoom findRoomById(String id) {
log.info("Fetching chat room with id={}", id);
return opsHashChatRoom.get(CHAT_ROOMS, id);
}
/**
* 채팅방 생성 : 서버간 채팅방 공유를 위해 redis hash에 저장한다.
*/
public ChatRoom createChatRoom(String name) {
ChatRoom chatRoom = ChatRoom.create(name);
opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
log.info("Created chat room with id={} and name={}", chatRoom.getRoomId(), chatRoom.getName());
return chatRoom;
}
/**
* 채팅방 입장 : redis에 topic을 만들고 pub/sub 통신을 하기 위해 리스너를 설정한다.
*/
public void enterChatRoom(String roomId) {
ChannelTopic topic = topics.get(roomId);
if (topic == null) {
topic = new ChannelTopic(roomId);
redisMessageListener.addMessageListener(redisSubscriber, topic);
topics.put(roomId, topic);
log.info("Created and stored new topic for roomId={}", roomId);
} else {
log.info("Topic already exists for roomId={}", roomId);
}
}
public ChannelTopic getTopic(String roomId) {
ChannelTopic topic = topics.get(roomId);
if (topic == null) {
log.warn("Topic not found for roomId={}", roomId);
}
return topic;
}
}
5. 테스트
채팅 입장 페이지
vue 문법을 사용하고 백엔드와 통신하는 문법이 담겨있습니다.
<!doctype html>
<html lang="en">
<head>
<title>Websocket Chat</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no">
<!-- CSS -->
<link rel="stylesheet" href="/webjars/bootstrap/4.3.1/dist/css/bootstrap.min.css">
<style>
[v-cloak] {
display: none;
}
</style>
</head>
<body>
<div class="container" id="app" v-cloak>
<div class="row">
<div class="col-md-12">
<h3>채팅방 리스트</h3>
</div>
</div>-
<div class="input-group">
<div class="input-group-prepend">
<label class="input-group-text">방제목</label>
</div>
<input type="text" class="form-control" v-model="room_name" v-on:keyup.enter="createRoom">
<div class="input-group-append">
<button class="btn btn-primary" type="button" @click="createRoom">채팅방 개설</button>
</div>
</div>
<ul class="list-group">
<li class="list-group-item list-group-item-action" v-for="item in chatrooms" v-bind:key="item.roomId" v-on:click="enterRoom(item.roomId)">
{{item.name}}
</li>
</ul>
</div>
<!-- JavaScript -->
<script src="/webjars/vue/2.5.16/dist/vue.min.js"></script>
<script src="/webjars/axios/0.17.1/dist/axios.min.js"></script>
<script>
var vm = new Vue({
el: '#app',
data: {
room_name : '',
chatrooms: [
]
},
created() {
this.findAllRoom();
},
methods: {
findAllRoom: function() {
axios.get('/chat/rooms').then(response => { this.chatrooms = response.data; });
},
createRoom: function() {
if("" === this.room_name) {
alert("방 제목을 입력해 주십시요.");
return;
} else {
var params = new URLSearchParams();
params.append("name",this.room_name);
axios.post('/chat/room', params)
.then(
response => {
alert(response.data.name+"방 개설에 성공하였습니다.")
this.room_name = '';
this.findAllRoom();
}
)
.catch( response => { alert("채팅방 개설에 실패하였습니다."); } );
}
},
enterRoom: function(roomId) {
var sender = prompt('대화명을 입력해 주세요.');
if(sender != "") {
localStorage.setItem('wschat.sender',sender);
localStorage.setItem('wschat.roomId',roomId);
location.href="/chat/room/enter/"+roomId;
}
}
}
});
</script>
</body>
</html>
채팅 페이지
주의할 점은 아래 자바스크립트에서 사용하는 속성들의 이름과 백엔드 서버의 파라미터들이 일치해야 한다는 점입니다.
실행이 안될 경우 api 호출 uri가 일치하는지, 파라미터들이 동일한 이름을 갖는지 꼭 확인하시기 바랍니다.
<!doctype html>
<html lang="en">
<head>
<title>Websocket ChatRoom</title>
<!-- Required meta tags -->
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1, maximum-scale=1, user-scalable=no">
<!-- Bootstrap CSS -->
<link rel="stylesheet" href="/webjars/bootstrap/4.3.1/dist/css/bootstrap.min.css">
<style>
[v-cloak] {
display: none;
}
</style>
</head>
<body>
<div class="container" id="app" v-cloak>
<div>
<h2>{{room.name}}</h2>
</div>
<div class="input-group">
<div class="input-group-prepend">
<label class="input-group-text">내용</label>
</div>
<input type="text" class="form-control" v-model="message" v-on:keypress.enter="sendMessage">
<div class="input-group-append">
<button class="btn btn-primary" type="button" @click="sendMessage">보내기</button>
</div>
</div>
<ul class="list-group">
<li class="list-group-item" v-for="message in messages">
{{message.sender}} - {{message.message}}</a>
</li>
</ul>
<div></div>
</div>
<!-- JavaScript -->
<script src="/webjars/vue/2.5.16/dist/vue.min.js"></script>
<script src="/webjars/axios/0.17.1/dist/axios.min.js"></script>
<script src="/webjars/sockjs-client/1.1.2/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/2.3.3-1/stomp.min.js"></script>
<script>
//alert(document.title);
// websocket & stomp initialize
var sock = new SockJS("/ws-stomp");
var ws = Stomp.over(sock);
var reconnect = 0;
// vue.js
var vm = new Vue({
el: '#app',
data: {
roomId: '',
room: {},
sender: '',
message: '',
messages: []
},
created() {
this.roomId = localStorage.getItem('wschat.roomId');
this.sender = localStorage.getItem('wschat.sender');
this.findRoom();
},
methods: {
findRoom: function () {
axios.get('/chat/room/' + this.roomId).then(response => {
this.room = response.data;
});
},
sendMessage: function () {
ws.send("/pub/chat/message", {}, JSON.stringify({
type: 'TALK',
roomId: this.roomId,
sender: this.sender,
message: this.message
}));
this.message = '';
},
recvMessage: function (recv) {
this.messages.unshift({
"type": recv.type,
"sender": recv.type == 'ENTER' ? '[알림]' : recv.sender,
"message": recv.message
})
}
}
});
function connect() {
// pub/sub event
ws.connect({}, function (frame) {
console.log('Connected: ' + frame);
ws.subscribe("/sub/chat/room/" + vm.$data.roomId, function (message) {
console.log('Message received: ', message);
var recv = JSON.parse(message.body);
vm.recvMessage(recv);
});
ws.send(
"/pub/chat/message",
{},
JSON.stringify({
type: 'ENTER',
roomId: vm.$data.roomId,
sender: vm.$data.sender
}));
}, function (error) {
console.log('Connection error: ', error);
if (reconnect++ < 5) {
setTimeout(function () {
sock = new SockJS("/ws-stomp");
ws = Stomp.over(sock);
connect();
}, 10 * 1000);
}
});
ws.onclose = function (event) {
console.log('Connection closed: ', event);
};
ws.onerror = function (error) {
console.log('Connection error: ', error);
};
}
connect();
</script>
</body>
</html>
파일 Build
정상적으로 build가 되다면 해당 jar 파일을 가지고 두 개의 포트로 애플리케이션을 실행시킵니다.
그 와중에 승상싱 유튜브가 알람이 울렸네요...;;
프로젝트 실행
이렇게 cmd창을 두개 켜서 실행을 시켜줍니다. 물론 window 기준입니다.
정상적으로 애플리케이션이 실행됐다면 이제 웹에서 창을 열어줍니다.
각각의 다른 포트로 웹을 열어 채팅방을 들어가줍니다.
보시다 시피 8080이 먼저 들어가고 8081이 들어가면 입장 아내문이 출력되며 그 이후로는 정상적으로 채팅이 진행되는 것을 볼 수 있습니다.
로그를 통해 실행되는 순서도 콘솔에서 확인할 수 있습니다.
'Redis > Redis 채팅' 카테고리의 다른 글
Redis Pub/Sub을 활용한 채팅 구현의 여정 - Service 레이어 (0) | 2024.12.19 |
---|---|
Redis Pub/Sub을 활용한 채팅 구현의 여정 - Chat Domain(Entity ~ Controller) (0) | 2024.12.08 |
Redis Pub/Sub을 활용한 채팅 구현의 여정 - 환경설정 (0) | 2024.11.29 |
Redis Pub/Sub을 활용한 채팅 구현의 여정 - 개념편 (0) | 2024.11.28 |