SSE(Server-Sent Events)

2024. 11. 6. 17:14Java

SSE 개념

서버 -> 클라이언트 단방향 실시간 통신
WebSocket과 달리 서버에서 클라이언트로만 데이터를 보냄
HTTP 프로토콜 사용으로 별도 프로토콜 필요 없음
자동 재접속 지원

Repository 구조

Key-Value 구조
Key(String): "memberId_timeStamp"  // 예: "123_1698985544"
Value: SseEmitter 객체
public interface EmitterRepository {

    void save(String emitterId, SseEmitter sseEmitter);

    void deleteById(String emitterId);

    Map<String, SseEmitter> findAllByIdStartWith(Long memberId);

}
@Repository
public class EmitterMemoryRepository implements EmitterRepository {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    @Override
    public void save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
    }

    @Override
    public void deleteById(String emitterId) {
        emitters.remove(emitterId);
    }

    @Override
    public Map<String, SseEmitter> findAllByIdStartWith(Long memberId) {
        String emitterIdPrefix = memberId + "_";
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(emitterIdPrefix))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }
}

주요 메서드 흐름

1.  connectNotification() - SSE 연결 설정  
    ↓
2.  sendNotification() - 실제 알림 전송  
    ↓
3.  send() - 개별 이벤트 전송 처리

\[SSE 연결 프로세스\]

1.  클라이언트 연결 요청  
    ↓
2.  connectNotification() 실행
    - emitterId 생성: "memberId\_timestamp"
    -   SseEmitter 객체 생성 (timeout: 2시간)
    -   Repository에 저장  
        ↓
3.  이벤트 핸들러 등록
    -   onCompletion: 연결 종료시 처리
    -   onTimeout: 타임아웃 발생시 처리
    -   onError: 에러 발생시 처리  
        ↓
4.  초기 연결 메시지 전송
    -   \[connected\] MemberId={memberId}  
        ↓
5.  이전 미수신 이벤트 처리
    -   lastEventId 기준으로 누락된 이벤트 재전송

서비스 클래스 구현

@Slf4j
@Service
@RequiredArgsConstructor
public class NotificationService {

    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 120;

    private final EmitterRepository emitterRepository;
    private final MemberRepository memberRepository;

    public SseEmitter connectNotification(ConnectNotificationCommand connectNotificationCommand) {
        Long memberId = connectNotificationCommand.memberId();
        String lastEventId = connectNotificationCommand.lastEventId();

        String emitterId = format("{0}_{1}", memberId, System.currentTimeMillis());
        SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
        emitterRepository.save(emitterId, emitter);

        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));
        emitter.onError(e -> emitterRepository.deleteById(emitterId));

        send(emitter, emitterId, format("[connected] MemberId={0}", memberId));

        if (!connectNotificationCommand.lastEventId().isEmpty()) {
            Map<String, SseEmitter> events = emitterRepository.findAllByIdStartWith(memberId);
            events.entrySet().stream().filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
                    .forEach(entry -> send(emitter, emitterId, entry.getValue()));
        }

        return emitter;
    }

    // 실제 알림 전송
    public void sendNotification(SendNotificationCommand sendNotificationCommand) {
        Long memberId = sendNotificationCommand.memberId();
        String title = sendNotificationCommand.title();
        String content = sendNotificationCommand.content();
        NotificationType notificationType = sendNotificationCommand.notificationType();

        verifyExistsUser(memberId);
        Notification notification = Notification.builder()
                .title(title)
                .content(content)
                .memberId(memberId)
                .notificationType(notificationType)
                .build();

        Map<String, SseEmitter> emitters = emitterRepository.findAllByIdStartWith(memberId);
        emitters.forEach((key, emitter) -> {
            send(emitter, key, NotificationResponse.from(notification));
        });
    }

    private void verifyExistsUser(Long memberId) {
        memberRepository.findById(memberId)
                .orElseThrow(() -> new NotFoundMemberException("존재하지 않은 유저 입니다"));
    }

    private void send(SseEmitter emitter, String emitterId, NotificationResponse data) {
        try {
            emitter.send(SseEmitter.event()
                    .id(emitterId)
                    .name(data.notificationType().getValue())
                    .data(data));
        } catch (IOException ex) {
            emitterRepository.deleteById(emitterId);
        }
    }

    private void send(SseEmitter emitter, String emitterId, Object data) {
        try {
            emitter.send(SseEmitter.event()
                    .id(emitterId)
                    .data(data));
        } catch (IOException e) {
            emitterRepository.deleteById(emitterId);
            log.error("알림 전송에 실패했습니다", e);
        }
    }
}

알림 전송 프로세스

[알림 전송 프로세스]
1. sendNotification() 호출
   ↓
2. 회원 존재 확인
   memberRepository.findById() 
   ↓
3. Notification 객체 생성
   Notification.builder()
       .title(title)
       .content(content)
       .memberId(memberId)
       .notificationType(notificationType)
       .build()
   ↓
4. 해당 회원의 모든 Emitter 조회
   emitterRepository.findAllByIdStartWith(memberId)
   ↓
5. 각 Emitter에 알림 전송
   send(emitter, key, NotificationResponse.from(notification))
  • 회원별 다중 연결 지원 (여러 기기 동시 접속)
  • 알림 타입별 구분된 처리 (NotificationType 활용)
  • 누락된 이벤트 재전송 기능
  • 자동 연결 해제 및 리소스 정리
  • 에러 처리 및 로깅

@RestController
@RequiredArgsConstructor
@RequestMapping("/v1/notifications")
public class NotificationController {

    private final NotificationService notificationService;

    @GetMapping(value = "/connect", produces = "text/event-stream")
    public ResponseEntity<SseEmitter> sseConnection(@RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId,
                                                    @LoginUser Long memberId) {

        ConnectNotificationCommand connectNotificationCommand = ConnectNotificationCommand.of(memberId, lastEventId);
        SseEmitter sseEmitter = notificationService.connectNotification(connectNotificationCommand);
        return ResponseEntity.ok(sseEmitter);
    }
}

엔드포인트 구성은 아래처럼 됩니다.
URL: /v1/notifications/connect
Method: GET
produces: text/event-stream // SSE 연결임을 명시
Return: SseEmitter

@RequestHeader(
    value = "Last-Event-ID",  // 마지막으로 수신한 이벤트 ID
    required = false,         // 필수값 아님
    defaultValue = ""         // 기본값 빈 문자열
) String lastEventId

Controller 흐름은

1. 클라이언트 SSE 연결 요청
   GET /v1/notifications/connect
   ↓
2. 요청 헤더 확인
   - Last-Event-ID 추출
   - 현재 로그인 사용자 확인
   ↓
3. Command 객체 생성
   ConnectNotificationCommand.of(memberId, lastEventId)
   ↓
4. 서비스 호출
   notificationService.connectNotification()
   ↓
5. SseEmitter 반환
   ResponseEntity.ok(sseEmitter)

처음 SSE를 사용해보는 중 이지만

  • 알림 영속성 저장 추가
  • 알림 읽음 상태 관리
  • 알림 기간 제한 설정
  • 재시도 메커니즘 구현
  • 부하 분산 전략 수립
  • 알림 타입별 엔드포인트 분리
  • 연결 상태 모니터링 추가
  • 클라이언트 정보 로깅
  • 연결 수 제한 설정

의 확장을 고려할 수 있으며 실시간 알림 시스템은 v2로 해서 업그레이드 해봐야겠습니다.