스프링 부트, SseEmitter를 사용한 알림 서비스 만들기
작성일
서버-클라이언트의 SSE 통신 과정
- 클라이언트에서 서버 쪽으로
SSE 연결 요청
을 한다. (연결 요청) - 서버에서 클라이언트와 매핑되는
SSE 통신 객체를 생성
한다. - 서버에서 SSE 연결 성공 메시지를 클라이언트에게 전송하게 된다. (연결 완료)
이제 서버에서 이벤트가 발생할 때마다 해당 SSE 객체를 통해 클라이언트로 데이터를 전달하게 된다. (ex. SseEmitter.send(”데이터”))
이제 SSE가 구현된 코드를 살펴보자.
예제 코드
User 엔티티
package com.me.notify.entity;
import jakarta.persistence.*;
import lombok.*;
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Users {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "user_id")
private Long id;
@Column(unique = true)
private String username;
private String password;
private String name;
public static Users of(String username, String password, String name) {
Users users = new Users();
users.username = username;
users.password = password;
users.name = name;
return users;
}
}
Alarm 엔티티
package com.me.notify.entity;
import jakarta.persistence.*;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.NoArgsConstructor;
@Entity
@Getter
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Alarm {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
@Column(name = "alarm_id")
private Long id;
@ManyToOne
@JoinColumn(name = "from_user_id")
private Users fromUser; // 댓글 또는 좋아요를 누른 사람 (알림을 발생시킨 사람)
@ManyToOne
@JoinColumn(name = "target_id")
private Users targetUser; // 알림을 받는 사람
@Enumerated(EnumType.STRING)
private AlarmType alarmType;
public static Alarm of(Users fromUser, Users targetUser, AlarmType type) {
Alarm alarm = new Alarm();
alarm.fromUser = fromUser;
alarm.targetUser = targetUser;
alarm.alarmType = type;
return alarm;
}
}
- fromUser: 댓글, 좋아요를 누른 사람
- targetUser: 알림을 받는 사람
package com.me.notify.entity;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@Getter
@RequiredArgsConstructor
public enum AlarmType {
NEW_COMMENT_ON_POST("new comment"),
NEW_LIKE_ON_POST("new like"),
;
private final String alarmText;
}
- 알림을 받는 이벤트가 발생하는 경우는 아래와 같이 2가지이다.
- 새로운 댓글을 작성했을 때
- 좋아요를 눌렀을 때
EmitterRepository
EmitterRepository는 SseEmitter를 관리하기 위한 코드이다.
- SSE 객체를 메모리에 저장/조회/삭제 기능
- 로그인한 사용자 별로 SseEmitter를 조회
- 이를 통해, 서버는 특정 사용자에게 실시간으로 알림 이벤트를 push 할 수 있다.
package com.me.notify.repository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Repository;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Slf4j
@Repository
public class EmitterRepository {
private Map<String, SseEmitter> emitterMap = new HashMap<>();
public SseEmitter save(Long userId, SseEmitter sseEmitter) {
String key = getKey(userId);
emitterMap.put(key, sseEmitter);
log.info("[SseEmitter] Set sseEmitter {}", userId);
return sseEmitter;
}
public Optional<SseEmitter> get(Long userId) {
String key = getKey(userId);
log.info("[SseEmitter] Get sseEmitter {}", userId);
return Optional.ofNullable(emitterMap.get(key));
}
public void delete(Long userId) {
log.info("[SseEmitter] Remove sseEmitter {}", userId);
emitterMap.remove(getKey(userId));
}
private String getKey(Long userId) {
return "Emitter:UID" + userId;
}
}
- Map<String, SseEmitter>
- SseEmitter를 저장하는 Map이다.
- 동시성 처리가 필요한 경우에는 ConcurrentHashMap으로 변경해야 안전하다.
- save(Long userId, SseEmitter sseEmitter)
- 사용자 아이디를 기준으로 SseEmitter를 저장한다.
- get(Long userId)
- 특정 사용자에게 연결된 SseEmitter가 있는지 가져온다.
- delete(Long userId)
- 사용자의 SSE 연결이 끊기거나 오류가 나면 삭제한다.
- 메모리 누수 방지를 위해 필요하다.
- getKey(Long userId)
- userId를 기반으로 고유한 문자열 키를 생성한다.
AlarmService
AlarmService는 SSE 연결을 생성하고, 특정 사용자에게 알림 이벤트를 전송하는 서비스이다.
package com.me.notify.service;
import com.me.notify.repository.EmitterRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
@Slf4j
@Service
@RequiredArgsConstructor
public class AlarmService {
private final static Long DEFAULT_TIMEOUT = 60L * 1000 * 60; // SSE 연결 유지 시간
private final static String ALARM_NAME = "alarm"; // 이벤트 이름
private final EmitterRepository emitterRepository;
public SseEmitter connectAlarm(Long userId) {
SseEmitter sseEmitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(userId, sseEmitter);
sseEmitter.onCompletion(() -> emitterRepository.delete(userId));
sseEmitter.onTimeout(() -> emitterRepository.delete(userId));
try {
sseEmitter.send(
SseEmitter.event()
.id("id")
.name(ALARM_NAME)
.data("SseEmitter Connect Complete")
);
} catch (IOException e) {
throw new RuntimeException(e);
}
return sseEmitter;
}
// SSE(Server-Sent Events)를 사용하여 특정 사용자에게 실시간 알림을 전송하는 역할
public void send(Long alarmId, Long targetId) {
emitterRepository.get(targetId).ifPresentOrElse(sseEmitter -> {
try {
sseEmitter.send(
SseEmitter.event()
.id(alarmId.toString())
.name(ALARM_NAME)
.data("New Alarm")
);
} catch (IOException e) {
emitterRepository.delete(targetId);
throw new RuntimeException(e);
}
}, () -> log.info("No Emitter Founded."));
}
}
- connectAlarm(Long userId)
- 클라이언트가 SSE 연결 요청을 할 때, 해당 유저 ID에 대해 emitter를 생성하고 저장한다.
- emitterRepository.save(userId, sseEmitter)를 통해 사용자 아이디를 기준으로 emitter를 저장한다.
- 정상적으로 emitter를 생성하게 되면 클라이언트에게 “연결 완료” 메시지를 전송한다.
- 연결 종료(completion)나 타임아웃(timeout) 발생 시 emitter를 삭제한다.
- 클라이언트가 SSE 연결 요청을 할 때, 해당 유저 ID에 대해 emitter를 생성하고 저장한다.
- send(Long alarmId, Long targetId)
- 알림이 발생했을 때, 해당 사용자의 emitter가 존재하면 이벤트를 전송한다. (알림을 발생 시킴)
- 연결 오류 시 emitter를 제거한다.
참고 Optional.ifPresentOrElse()
optional.ifPresentOrElse( value -> { /* 값이 존재할 경우 실행 */ }, () -> { /* 값이 없을 경우 실행 */ } );
지금까지 SseEmitter를 구현하는 코드를 작성했다.
EmitterRepository는 사용자별 SseEmitter를 저장하고 관리하는 저장소 역할을 하고, AlarmService는 클라이언트의 SSE 연결을 설정하고 사용자에게 알림을 전송하는 핵심 로직을 담당한다.
이제 실제로 SseEmitter를 사용하기 위한 코드들을 작성해보겠다.
UserController
package com.me.notify.controller;
import com.me.notify.controller.response.AlarmResponse;
import com.me.notify.service.AlarmService;
import com.me.notify.service.UserService;
import lombok.RequiredArgsConstructor;
import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.util.List;
@Controller
@RequiredArgsConstructor
@RequestMapping("/user")
public class UserController {
private final UserService userService;
private final AlarmService alarmService;
...
@GetMapping("/alarm")
public String alarm() {
return "alarm/list";
}
@GetMapping("/alarm/list")
@ResponseBody
public List<AlarmResponse> alarmsJson(Authentication authentication) {
return userService.alarmList(authentication.getName());
}
@GetMapping("/alarm/subscribe")
public SseEmitter subscribe(Authentication authentication) {
Long userId = userService.findUserId(authentication.getName());
return alarmService.connectAlarm(userId);
}
}
- subscribe()는 SSE 연결을 생성하기 위해 사용한다.
- alarmService.connectAlarm(userId)를 호출해 SseEmitter 객체를 만들어 클라이언트와의 실시간 연결을 유지한다.
- alarmsJson()은 현재 로그인한 사용자의 알림 리스트를 JSON으로 반환한다.
PostService
이제 마지막이다. SseEmitter를 사용해 알림을 전송하는 방법을 아래 코드를 통해 확인할 수 있다.
package com.me.notify.service;
import com.me.notify.entity.*;
import com.me.notify.repository.AlarmRepository;
import com.me.notify.repository.CommentRepository;
import com.me.notify.repository.PostRepository;
import com.me.notify.repository.UserRepository;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional(readOnly = true)
@RequiredArgsConstructor
public class PostServiceImpl implements PostService {
private final UserRepository userRepository;
private final PostRepository postRepository;
private final CommentRepository commentRepository;
private final AlarmRepository alarmRepository;
private final AlarmService alarmService;
...
@Transactional
@Override
public void commentCreate(Long postId, String writer, String comment) {
Users user = userRepository.findByUsername(writer).orElseThrow();
Post post = postRepository.findById(postId).orElseThrow();
commentRepository.save(Comment.createComment(user, post, comment));
// Sse를 사용해 알림 전송
Alarm alarm = alarmRepository.save(Alarm.of(post.getUser(), user, AlarmType.NEW_COMMENT_ON_POST));
alarmService.send(alarm.getId(), post.getUser().getId());
}
}
- commentCreate()는 게시글에 댓글을 저장하고, 게시글 작성자에게 알림을 전송하는 코드이다.
- 이전에 작성했던 AlarmService.send()를 여기서 사용한다.
- 알림을 받은 사람(post 작성자)의 SseEmitter가 존재하면 실기간으로 알림 메시지를 전송한다.
[참고] alarm/list.html
<!DOCTYPE html>
<html lang="ko">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h2>🔔 실시간 알림</h2>
<!-- 알림 목록 -->
<ul id="alarmList"></ul>
</body>
<script>
// 알림 리스트를 비동기로 다시 불러오기
function fetchAlarms() {
fetch('/user/alarm/list')
.then(res => res.json())
.then(data => {
const alarmList = document.getElementById("alarmList");
alarmList.innerHTML = ''; // 기존 알림 지움
data.forEach(alarm => {
const li = document.createElement('li');
li.textContent = alarm.text;
alarmList.appendChild(li);
});
})
.catch(err => {
console.error('알림 로드 실패', err);
});
}
// SSE 연결
const sse = new EventSource('/user/alarm/subscribe');
sse.addEventListener("open", () => {
console.log("✅ SSE 연결 성공");
fetchAlarms(); // 연결된 시점에 최초 호출
});
sse.addEventListener("alarm", (event) => {
console.log("📨 새로운 알림 수신:", event.data);
fetchAlarms(); // 현재 페이지 다시 불러오기
});
sse.addEventListener("error", (event) => {
console.error("❌ SSE 오류", event);
sse.close();
});
fetchAlarms();
</script>
</html>