벡엔드/MSA

Retry & DLQ

코딩하는이씨 2024. 3. 21. 20:50
728x90
반응형

Retry


Consumer가 메시지를 처리하는 도중 오류가 발생하면 해당 메시지를 다시 Polling하여 처리하는 작업을 Retry라고 한다.

spring:
  profiles: default
  jpa:
    properties:
      hibernate:
        show_sql: true
        format_sql: true
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      bindings:
        event-in:
          group: inventory
          destination: labshoppubsub
          contentType: application/json

					# /* Retry를 위해 추가된 부분
          consumer:
            max-attempts: 3              #<- 메시지 재시도 횟수
            back-off-initial-interval: 1000 #<- 첫 재시도 전에 대기할 시간 (1초)
            back-off-max-interval: 1000  #<- 재시도 사이의 대기 시간 (1초)
            back-off-multiplier: 1.0     #<- 재시도 간격간 적용되는 배수(현재 1배수)
            defaultRetryable: false      #<- 재시도 가능 예외 설정 (현재 예외 X)
					# */
        event-out:
          destination: labshoppubsub
          contentType: application/json

다음과 같이 consumer: 아래에 메시지 재시도 횟수, 대기 시간등을 설정하면 설정된 횟수 시간에 맞춰 메시지 처리를 재시도 하게된다.

만약 메시지가 설정된 횟수까지 재시도해도 계속 실패하면 오류메시지를 출력한다.

 

 

제대로 동작하는지 확인해보기 위해 일부로 RuntimeException()을 발생시켜 재시도여부를 확인해보자.

@Service
@Transactional
public class PolicyHandler {

    @Autowired
    InventoryRepository inventoryRepository;

    @StreamListener(KafkaProcessor.INPUT)
    public void whatever(@Payload String eventString) {}

    @StreamListener(
        value = KafkaProcessor.INPUT,
        condition = "headers['type']=='OrderPlaced'"
    )
    public void wheneverOrderPlaced_DecreaseStock(
        @Payload OrderPlaced orderPlaced
    ) {
        OrderPlaced event = orderPlaced;
        System.out.println(
            "\\n\\n##### listener DecreaseStock : " + orderPlaced + "\\n\\n"
        );

        Inventory.decreaseStock(event);

        // 강제 오류 발생
        throw new RuntimeException();
    }

}

다음과 같이 Counsumer쪽에서 이벤트를 받고 일부로 RuntimeException() 을 발생시켜 재시도 하도록 코드를 처리했다.

 

Consumer 측 로그를 확인해보면 메시지 처리를 위해 계속 재시도하다 지정된 재시도 횟수까지 정상 처리 되지 않자 오류를 표시하며 중지한다.

 

Dead Letter Queue(DLQ)


위에서 Retry를 계속 해도 처리하지 못하는 메시지를 Posion Pill 이라고 한다.

이러한 Posion Pill은 카프카에서 별도의 메시지 장소인 DLQ로 저장되게 된다.

 

 💡 DLQ는 하나의 Topic이며 Consumer에서 정상처리되지 못한 메시지(Posion Pill)들이 쌓여있다.

 

 

DLQ 설정을 위해 아래와 같은 코드를 추가해주어야 한다.

spring:
  profiles: default
  jpa:
    properties:
      hibernate:
        show_sql: true
        format_sql: true
  cloud:
    stream:
      kafka:
				# /* 추가된 부분
        bindings:
          event-in:
            consumer:
              enableDlq: true   # <- DLQ 처리 활성화 여부
              dlqName: dlq-kafkatest  # <- DLQ의 이름 지정
              dlqPartitions: 1   # <- DLQ의 파티션 수 설정
				# */
        binder:
          brokers: localhost:9092
        streams:
          binder:
            configuration:
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      bindings:
        event-in:
          group: inventory
          destination: labshoppubsub
          contentType: application/json
          consumer:
            max-attempts: 3
            back-off-initial-interval: 1000
            back-off-max-interval: 1000
            back-off-multiplier: 1.0
            defaultRetryable: false  
        event-out:
          destination: labshoppubsub
          contentType: application/json

 

 

이제 메시지가 재시도후 Posion Pill이 되면 정상적으로 DLQ에 저장되는지 확인해 보자.

로그를 확인해보면

Sent to DLQ  a message with key='null' and payload='{123, 34, 101, 118, 101, 110, 116, 84, 121, 112, 1...' received from 1: dlq-kafkatest-0@2

다음과 같이 DLQ로 전송된 것을 확인할 수 있다.

 

이제 DLQ를 확인해 메시지를 확인 해보자.

kafka 쉘 접속후 /bin 에서 확인해야 한다.

./kafka-console-consumer --bootstrap-server <http://localhost:9092> --topic dlq-kafkatest --from-beginning

위의 코드와 같이 DLQ에 설정한 이름을 이용해 일반적인 Topic을 확인하는 방식과 동일하게 확인할 수 있다.

 

다음과 같이 처리에 실패한 메시지들이 저장되어 있는 모습을 확인할 수 있다.

커밋모드가 자동일때 Dlq에 처리되지 않은 메세지를 보낸 후, 자동으로 Offset을 증가시켜 Lag가 쌓이지 않게 된다.

 

728x90
반응형