SSE(Server-Sent Events)
2024. 11. 6. 17:14ㆍJava
SSE 개념
서버 -> 클라이언트 단방향 실시간 통신
WebSocket과 달리 서버에서 클라이언트로만 데이터를 보냄
HTTP 프로토콜 사용으로 별도 프로토콜 필요 없음
자동 재접속 지원
Repository 구조
Key-Value 구조
Key(String): "memberId_timeStamp" // 예: "123_1698985544"
Value: SseEmitter 객체
public interface EmitterRepository {
void save(String emitterId, SseEmitter sseEmitter);
void deleteById(String emitterId);
Map<String, SseEmitter> findAllByIdStartWith(Long memberId);
}
@Repository
public class EmitterMemoryRepository implements EmitterRepository {
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
@Override
public void save(String emitterId, SseEmitter sseEmitter) {
emitters.put(emitterId, sseEmitter);
}
@Override
public void deleteById(String emitterId) {
emitters.remove(emitterId);
}
@Override
public Map<String, SseEmitter> findAllByIdStartWith(Long memberId) {
String emitterIdPrefix = memberId + "_";
return emitters.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(emitterIdPrefix))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
주요 메서드 흐름
1. connectNotification() - SSE 연결 설정
↓
2. sendNotification() - 실제 알림 전송
↓
3. send() - 개별 이벤트 전송 처리
\[SSE 연결 프로세스\]
1. 클라이언트 연결 요청
↓
2. connectNotification() 실행
- emitterId 생성: "memberId\_timestamp"
- SseEmitter 객체 생성 (timeout: 2시간)
- Repository에 저장
↓
3. 이벤트 핸들러 등록
- onCompletion: 연결 종료시 처리
- onTimeout: 타임아웃 발생시 처리
- onError: 에러 발생시 처리
↓
4. 초기 연결 메시지 전송
- \[connected\] MemberId={memberId}
↓
5. 이전 미수신 이벤트 처리
- lastEventId 기준으로 누락된 이벤트 재전송
서비스 클래스 구현
@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationService {
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 120;
private final EmitterRepository emitterRepository;
private final MemberRepository memberRepository;
public SseEmitter connectNotification(ConnectNotificationCommand connectNotificationCommand) {
Long memberId = connectNotificationCommand.memberId();
String lastEventId = connectNotificationCommand.lastEventId();
String emitterId = format("{0}_{1}", memberId, System.currentTimeMillis());
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(emitterId, emitter);
emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
emitter.onError(e -> emitterRepository.deleteById(emitterId));
send(emitter, emitterId, format("[connected] MemberId={0}", memberId));
if (!connectNotificationCommand.lastEventId().isEmpty()) {
Map<String, SseEmitter> events = emitterRepository.findAllByIdStartWith(memberId);
events.entrySet().stream().filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> send(emitter, emitterId, entry.getValue()));
}
return emitter;
}
// 실제 알림 전송
public void sendNotification(SendNotificationCommand sendNotificationCommand) {
Long memberId = sendNotificationCommand.memberId();
String title = sendNotificationCommand.title();
String content = sendNotificationCommand.content();
NotificationType notificationType = sendNotificationCommand.notificationType();
verifyExistsUser(memberId);
Notification notification = Notification.builder()
.title(title)
.content(content)
.memberId(memberId)
.notificationType(notificationType)
.build();
Map<String, SseEmitter> emitters = emitterRepository.findAllByIdStartWith(memberId);
emitters.forEach((key, emitter) -> {
send(emitter, key, NotificationResponse.from(notification));
});
}
private void verifyExistsUser(Long memberId) {
memberRepository.findById(memberId)
.orElseThrow(() -> new NotFoundMemberException("존재하지 않은 유저 입니다"));
}
private void send(SseEmitter emitter, String emitterId, NotificationResponse data) {
try {
emitter.send(SseEmitter.event()
.id(emitterId)
.name(data.notificationType().getValue())
.data(data));
} catch (IOException ex) {
emitterRepository.deleteById(emitterId);
}
}
private void send(SseEmitter emitter, String emitterId, Object data) {
try {
emitter.send(SseEmitter.event()
.id(emitterId)
.data(data));
} catch (IOException e) {
emitterRepository.deleteById(emitterId);
log.error("알림 전송에 실패했습니다", e);
}
}
}
알림 전송 프로세스
[알림 전송 프로세스]
1. sendNotification() 호출
↓
2. 회원 존재 확인
memberRepository.findById()
↓
3. Notification 객체 생성
Notification.builder()
.title(title)
.content(content)
.memberId(memberId)
.notificationType(notificationType)
.build()
↓
4. 해당 회원의 모든 Emitter 조회
emitterRepository.findAllByIdStartWith(memberId)
↓
5. 각 Emitter에 알림 전송
send(emitter, key, NotificationResponse.from(notification))
- 회원별 다중 연결 지원 (여러 기기 동시 접속)
- 알림 타입별 구분된 처리 (NotificationType 활용)
- 누락된 이벤트 재전송 기능
- 자동 연결 해제 및 리소스 정리
- 에러 처리 및 로깅
@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/notifications")
public class NotificationController {
private final NotificationService notificationService;
@GetMapping(value = "/connect", produces = "text/event-stream")
public ResponseEntity<SseEmitter> sseConnection(@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId,
@LoginUser Long memberId) {
ConnectNotificationCommand connectNotificationCommand = ConnectNotificationCommand.of(memberId, lastEventId);
SseEmitter sseEmitter = notificationService.connectNotification(connectNotificationCommand);
return ResponseEntity.ok(sseEmitter);
}
}
엔드포인트 구성은 아래처럼 됩니다.
URL: /v1/notifications/connect
Method: GET
produces: text/event-stream // SSE 연결임을 명시
Return: SseEmitter
@RequestHeader(
value = "Last-Event-ID", // 마지막으로 수신한 이벤트 ID
required = false, // 필수값 아님
defaultValue = "" // 기본값 빈 문자열
) String lastEventId
Controller 흐름은
1. 클라이언트 SSE 연결 요청
GET /v1/notifications/connect
↓
2. 요청 헤더 확인
- Last-Event-ID 추출
- 현재 로그인 사용자 확인
↓
3. Command 객체 생성
ConnectNotificationCommand.of(memberId, lastEventId)
↓
4. 서비스 호출
notificationService.connectNotification()
↓
5. SseEmitter 반환
ResponseEntity.ok(sseEmitter)
처음 SSE를 사용해보는 중 이지만
- 알림 영속성 저장 추가
- 알림 읽음 상태 관리
- 알림 기간 제한 설정
- 재시도 메커니즘 구현
- 부하 분산 전략 수립
- 알림 타입별 엔드포인트 분리
- 연결 상태 모니터링 추가
- 클라이언트 정보 로깅
- 연결 수 제한 설정
의 확장을 고려할 수 있으며 실시간 알림 시스템은 v2로 해서 업그레이드 해봐야겠습니다.
'Java' 카테고리의 다른 글
JWT 그런데 OAuth2.0 곁들인 (2) (1) | 2024.11.10 |
---|---|
JWT 그런데 OAuth2.0 곁들인 (1) | 2024.11.08 |
정적 팩토리 메서드 네이밍의 차이 (of vs from) (0) | 2024.11.04 |
QueryDSL 아이템 목록 페이징 처리 (0) | 2024.10.31 |
StringUtils 라이브러리 (0) | 2024.10.29 |