본문 바로가기

Proj/구구모

댓글 알림 구현(feat. sse)

1. 개요

 댓글 알림을 구현해야 했다. 누군가가 댓글을 작성하면 해당 댓글의 게시글 작성자에게 push 알림을 보내는 기능을 구현하고자 한다. 이 프로젝트는 댓글과 부모 댓글에 대한 대댓글로만 구현했으므로 회의를 통해 게시글 작성자에게만 댓글 알림을 보내도록 요구사항을 잡았다.

 

2. SSE 구현 코드

2-1. Entity

@Entity
@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class Notification {

    @Id
    @GeneratedValue
    @Column(name = "notification_id")
    private Long id;

    private String content;

    private String message;

    private NotificationType notificationType;

    @Builder.Default
    private boolean isRead = false;

    @Builder.Default
    private LocalDateTime createDate = LocalDateTime.now();

    @ManyToOne(fetch = FetchType.LAZY)
    @JoinColumn(name = "member_id")
    private Member member;      //알림 수신자

    //postNoti
    private Long postId;
    private String senderNick;


    public void read() {
        this.isRead = true;
    }
}

 우선 저장할 알림 엔티티이다. 사이트 내에서도 받은 알림 내역을 확인해야 하므로 이와 같이 저장하였다.

2-2 DTO

@Getter
@SuperBuilder
@AllArgsConstructor
@NoArgsConstructor(access = AccessLevel.PROTECTED)
public class NotificationDto {

    private Long id;
    private String name;
    private String content;
    private NotificationType notificationType;
    private String message;
    private LocalDateTime createDate;
    private boolean isRead;



}
@Getter
@AllArgsConstructor
@SuperBuilder
public class PostNotificationDto extends NotificationDto{

    private Long postId;
    private String senderNick;
}

 이후 알림의 종류를 확장하기 용이하도록 필수 정보와 부가 정보를 상속으로 분리하였다.

2-3 Controller

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/v1")
public class NotificationController {

    private final NotificationService notificationService;

    /**
     * 로그인 한 유저 sse 연결
     */
    @GetMapping(value = "/subscribe", produces = "text/event-stream")
    public SseEmitter subscribe(@AuthenticationPrincipal CustomUserDetails principal,
                                @RequestHeader(value = "Last-Event-ID", required = false, defaultValue = "") String lastEventId) {
        return notificationService.subscribe(principal, lastEventId);
    }

    @GetMapping("/notification")
    public <T extends NotificationDto> ApiResponse<List<T>> findNoti(@AuthenticationPrincipal CustomUserDetails principal) {
        return ApiResponse.createSuccess(notificationService.findNotification(principal));
    }

    @PatchMapping("/notification/{noti_id}")
    public ApiResponse<String> read(@AuthenticationPrincipal CustomUserDetails principal,
                                    @PathVariable("noti_id") Long id) {

        notificationService.read(principal, id);
        return ApiResponse.createSuccess("알림 읽음처리 완료");
    }

    @PatchMapping("/notification")
    public ApiResponse<String> readAll(@AuthenticationPrincipal CustomUserDetails principal) {

        notificationService.readAll(principal);
        return ApiResponse.createSuccess("알림 모두 읽음처리 완료");
    }

    @DeleteMapping("/notification/{noti_id}")
    public ApiResponse<String> deleteNoti(@AuthenticationPrincipal CustomUserDetails principal,
                                          @PathVariable("noti_id") Long id) {
        notificationService.deleteNotification(principal, id);

        return ApiResponse.createSuccess("알림 삭제 완료");
    }
}

 필요한 기능들을 controller로 구현하였다. sse 연결이 가장 복잡하므로 이후 기본적인 crud들은 이 글에서 생략하도록 한다.

2-4 Service

@Service
@RequiredArgsConstructor
@Slf4j
public class NotificationService {

    private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;

    private final MemberRepository memberRepository;
    private final NotificationRepository notificationRepository;
    private final EmitterRepository emitterRepository;



    public SseEmitter subscribe(CustomUserDetails principal, String lastEventId) {



        String emitterId = principal.getUsername() + "_" + System.currentTimeMillis();

        SseEmitter emitter = emitterRepository.save(emitterId, new SseEmitter(DEFAULT_TIMEOUT));

        emitter.onCompletion(() -> emitterRepository.deleteById(emitterId));
        emitter.onTimeout(() -> emitterRepository.deleteById(emitterId));

        // 503 에러를 방지하기 위한 더미 이벤트 전송
        String eventId = principal.getUsername() + "_" + System.currentTimeMillis();
        sendNotification(emitter, eventId, emitterId,
                "EventStream Created. [userEmail=" + principal.getUsername() + "]");

        // 클라이언트가 미수신한 Event 목록이 존재할 경우 전송하여 Event 유실을 예방
        if (hasLostData(lastEventId)) {
            sendLostData(lastEventId, principal.getUsername(), emitterId, emitter);
        }

        return emitter;

    }


}

 우선 로그인 시 해당 계정과 서버의 sse 연결이 필요하다. 그래야 이후에 알림을 보낼 수 있다. 우선 emitterId를 생성하고, emitterRepository에 이 ID로 저장한다. emitterId, eventId는 접속 시각을 구분하기 위해 currentTimeMillis를 붙여 만든다.

//게시글 관련 정보 전송
    @Transactional
    public void send(Member receiver, String content, String message, Long postId, String senderNick) {

        Notification notification = Notification.builder()
                .member(receiver)
                .content(content)
                .message(message)
                .notificationType(NotificationType.COMMENT)
                .postId(postId)
                .senderNick(senderNick)
                .build();

        notificationRepository.save(notification);

        String receiverEmail = receiver.getUsername();
        String eventId = receiverEmail + "_" + System.currentTimeMillis();
        Map<String, SseEmitter> emitters = emitterRepository.findAllEmitterStartWithByMemberId(receiverEmail);
        emitters.forEach(
                (key, emitter) -> {
                    emitterRepository.saveEventCache(key, notification);
                    Notification resp = Notification.builder()
                            .id(notification.getId())
                            .content(notification.getContent())
                            .message(notification.getMessage())
                            .notificationType(NotificationType.COMMENT)
                            .createDate(notification.getCreateDate())
                            .postId(notification.getPostId())
                            .senderNick(notification.getSenderNick())
                            .build();
                    sendNotification(emitter, eventId, key, resp);
                }
        );
    }

 다음은 send이다. 수신자와 알림 내용을 인자로 받아 알림을 저장, eventId를 생성하고, emitter(수신자)를 찾아 해당 알림을 전송한다.

2-5 EmitterRepository

@Repository
@Slf4j
public class EmitterRepositoryImpl implements EmitterRepository {

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

    @Override
    public SseEmitter save(String emitterId, SseEmitter sseEmitter) {
        emitters.put(emitterId, sseEmitter);
        return sseEmitter;
    }

    @Override
    public void saveEventCache(String eventCacheId, Object event) {
        log.info("add eventcache={}", event);
        eventCache.put(eventCacheId, event);
        log.info("cachesize={}", eventCache.size());
    }


    @Override
    public Map<String, SseEmitter> findAllEmitterStartWithByMemberId(String memberId) {
        return emitters.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public Map<String, Object> findAllEventCacheStartWithByMemberId(String memberId) {
        log.info("cachesize={}", eventCache.size());
        return eventCache.entrySet().stream()
                .filter(entry -> entry.getKey().startsWith(memberId))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
    }

    @Override
    public void deleteById(String id) {
        emitters.remove(id);
    }

    @Override
    public void deleteAllEmitterStartWithId(String memberId) {
        emitters.forEach(
                (key, emitter) -> {
                    if (key.startsWith(memberId)) {
                        emitters.remove(key);
                    }
                }
        );
    }

    @Override
    public void deleteAllEventCacheStartWithId(String memberId) {
        eventCache.forEach(
                (key, emitter) -> {
                    if (key.startsWith(memberId)) {
                        eventCache.remove(key);
                    }
                }
        );
    }
}

 Repository이지만 동시성 이슈가 발생할 수 있으므로 ConcurrentHashmap을 사용하여 저장한다.

    private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
    private final Map<String, Object> eventCache = new ConcurrentHashMap<>();

 emitterId와 emitter 객체를 Map으로 담는 emitters, eventId와 전송해야 할 Notification을 담는 eventCache를 선언했다. 나머지 함수는 단순 crud이다.

3. 적용

    public void save(CreateCmntReq req, CustomUserDetails principal) {

		//댓글 저장 로직
        ....

        //댓글 작성자가 게시글 작성자 본인이 아닐 경우 게시글 작성자에게 알림 전송
        if (!targetPost.getMember().equals(author)) {
            notificationService.send(targetPost.getMember(), cmnt.getContent(),
                    "게시글에 새 댓글이 작성되었습니다", targetPost.getId(), author.getNickname());
        }

    }

 댓글을 작성했는데 댓글 작성자와 게시글 작성자가 다를 경우 해당 정보들을 담아 알림을 보내도록 하였다.

4. 그런데...

이렇게 적용을 하려 보니 문제가 있었다. sse를 연결하면 db connection을 물고 있는데, 이렇게 되면 starvation이 발생할 수 있다. 이를 해결하기 위해선 osiv를 꺼야 하는데, 무턱대고 이를 켜게 되면 영속성 컨텍스트가 트랜잭션 안에서만 살아있으므로 Lazy Loading이 동작하지 않게 된다.

 또, 위 코드는 eventCache에 데이터를 저장하고, 전송하는 로직이 제대로 동작하지 않았다. 

이런 문제로 알림은 FCM으로 구현하기로 했다. 이는 다음 글에 작성하기로 하자.