개발 공부 기록하기/03. AWS & Infra

[AWS SQS 들이파기] SQS에서 메시지 받기 II (Java + Gradle + Spring)

lannstark 2020. 9. 23. 22:35

모든 코드는 github에 올라가 있습니다.

SQS로부터 메시지를 받는 두 번째 방법은~~ 바로바로 Annotation을 이용하는 방법입니다.

아래의 PersonListener 코드를 봐보죠!

@Slf4j
@Component // Bean 등록을 꼭 해줘야 한다!
public class PersonListener {

  @SqsListener(value = "sqs-study", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
  public void listen(@Payload Person person, @Headers Map<String, String> headers, Acknowledgment ack) {
    log.info("{}", person);
    log.info("{}", headers);
    log.info("{}", ack != null);
  }

}

@SqsListener 어노테이션의 value는 만들었던 SQS의 이름이 들어갑니다.

deletionPolicy는 메시지를 받은 이후의 삭제 정책인데, 4가지 경우의 옵션이 있습니다.

public enum SqsMessageDeletionPolicy {
  // 메소드로 Message가 들어오면 무조건 삭제요청을 보낸다
  ALWAYS,

  // 절대 삭제 요청을 보내지 않는다.
  // Acknowledgment 파라미터를 바인딩 받아 ack 메소드를 호출할때 삭제 요청을 보낸다
  NEVER,

  // redrive policy(DLQ)가 정의되지 않았으면 메시지를 삭제한다.
  NO_REDRIVE,

  // SqsListner 어노테이션이 붙은 메소드에서 에러가 나지 않으면 메시지를 삭제한다.
  // 에러가 난 경우에는 메시지를 삭제하지 않는다.
  ON_SUCCESS
}

개인적인 생각으로는 NEVERON_SUCCESS 중에 적절한 것을 사용하면 좋을 것 같은데, NEVER 를 사용하고 Acknowledgment를 쓰는게 조금더 끌립니다 ㅎㅎ
여튼 메소드도 만들었으니 바로 테스트를 해봐서, 메시지가 잘 들어오는지 로그를 확인해봅시다 ㅎㅎㅎ

기존의 Controller와 Service도 약간 수정했습니다. 메시지가 전달되면, @SqsListener로 잘 들어오는지 테스트 할 예정입니다.

// Controller
@PostMapping("/message")
public void sendMessage(@RequestBody Person person) {
  messageService.sendMessage(person);
}

// Service
public void sendMessage(Person person) {
  log.info("SQS에 Person을 전달합니다 : " + person);
  queueMessagingTemplate.convertAndSend("sqs-study", person);
}
# log.info에 timestamp를 찍기 위해 추가
logging:
  pattern:
    console: '%d{yyyy-MM-dd HH:mm:ss} %-5level --- [%thread] %logger{35} : %msg %n'

자 API를 호출해보면 console에 이렇게 찍힙니다!!

21:35:59 INFO  --- [http-nio-8080-exec-2] c.l.a.service.SqsMessageService : SQS에 Person을 전달합니다 : Person(name=lannstark, age=100)
21:36:07 INFO  --- [simpleMessageListenerContainer-2] c.l.a.listener.PersonListener : Person(name=lannstark, age=100)
21:36:07 INFO  --- [simpleMessageListenerContainer-2] c.l.a.listener.PersonListener : {Map<String, String>이 모두 출력됨}
21:36:07 INFO  --- [simpleMessageListenerContainer-2] c.l.a.listener.PersonListener : true
... 그리고 30초마다 계속 반복됩니다.
DELETE를 해주지 않았기 때문에 Queue에 메시지가 남아 있고 DLQ 설정도 되어 있지 않기 때문입니다.

Header로는 다음과 같은 정보들이 넘어왔습니다.

  • SentTimestamp : 메시지가 전송된 timestamp
  • ReceiptHandle : 메시지에 대해 응답할때 사용될 것 같은 긴 문자열
  • SenderId : 보내는 주체를 특정한 문자열
  • LogicalResourceId : Queue의 이름 (sqs-study)
  • ApproximateReceiveCount : 대략적으로 이 메시지를 몇 번째로 받았는지 (숫자가 계속 증가한다)
  • Acknowledgment : ack를 위한 spring-cloud-aws의 구현체
  • Visibility : 이 메시지의 visibility timeout을 설정하기 위한 spring-cloud-aws의 구현체
  • contentType : application/json
  • lookupDestination : sqs-study
  • ApproximateFirstReceiveTimestamp : 대략적으로 이 메시지를 첫 번재 받았을때의 timestamp
  • MessageId

Visibility timeout 이란, 그 메시지가 특정 소비자에게 전달된 이후로, 다른 소비자가 가져가지 못하는 시간을 의미합니다.

30초마다 반복되는 이유는 폴링 기간이 30으로 설정되어 있기 때문인듯 합니다.

메시지 수신 정보를 보면, 다음과 같은 설명이 되어 있습니다.

긴 폴링(long polling)의 경우, 폴링 기간 값을 0보다 크게 설정합니다. 긴 폴링은 빈 응답수와 잘못된 응답을 제거하여 Amazon SQS 사용 비용을 줄이는데 도움이 됩니다. 긴 폴링은 다음과 같은 이점을 제공합니다.

  • 응답을 전송하기 전에 대기열에서 메시지를 사용할 수 있을 때까지 Amazon SQS가 대기하도록 하여 빈 응답을 제거합니다.
  • 하위 집합이 아닌 모든 Amazon SQS 서버를 쿼리하여 잘못된 빈 응답을 제거합니다.

 

이 두 번째 문장의 의미는 SQS의 원리를 알아야 이해가 되는데, SQS는 내부적으로 메시지를 분산해서 저장한다고 합니다. 이때 메시지를 요청하는 짧은 폴링(short polling)의 경우는 분산된 모든 서버에서 메시지를 받아오는게 아니라, 특정 몇 개의 분산서버에서만 메시지를 받아 올 수 있습니다. 만약 메시지가 5개가 있고 2군데의 분산 서버에 몰빵(?) 되어 있는데, 다른 분산 서버를 short polling으로 찌르면 '잘못된 빈 응답'이 나올 수 있는 것이죠.

긴 폴링은 이런 상황을 막아준다고 합니다 ㅎㅎ

이제 ack를 해보겠습니다!

@SqsListener(value = "sqs-study", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(@Payload Person person, @Headers Map<String, String> headers, Acknowledgment ack) {
  log.info("{}", person);
  ack.acknowledge();
}

이렇게 하고 Application을 다시 껐다 키니

21:46:51 INFO  --- [simpleMessageListenerContainer-2] c.l.a.listener.PersonListener : Person(name=lannstark, age=100)

한 번의 수신 이후 다시 로그가 나오지 않았습니다 ㅎㅎㅎ AWS SQS 받는 법 끝!!

받는 방법 자체는 끝났지만, 내부 동작 원리에 대해서는 생각해볼거리가 한 가지 있습니다.

Thread

로그를 통해 알 수 있는 한 가지 흥미로운 점은 thread가 Spring Boot Tomcat thread가 아닌 SimpleMessageListenerContainer의 스레드란 점입니다. 이 말인 즉슨 메시지가 N개가 들어올때 여러 스레드가 생성될 수 있다는 건데... 과연 ack를 안주고 sleep을 걸면 어떻게 될까요?!!

메시지 2개(100살 Person 하나 200살 Person 하나)를 Queue에 미리 넣어놓고 테스트를 해보겠습니다! 무려 50초를 sleep 하도록 설정 ㅎㅎ

@SqsListener(value = "sqs-study", deletionPolicy = SqsMessageDeletionPolicy.NEVER)
public void listen(@Payload Person person) throws Exception{
  log.info("{}", person);
  Thread.sleep(50 * 1000);
}

자 결과는?!! 로그는 아래와 같이 나왔습니다 (보기 편하게 편집함)

22:03:48 [simpleMessageListenerContainer-2] : Person(name=lannstark, age=200)
22:03:48 [simpleMessageListenerContainer-3] : Person(name=lannstark, age=100)

22:04:42 [simpleMessageListenerContainer-2] : Person(name=lannstark, age=200)
22:04:42 [simpleMessageListenerContainer-3] : Person(name=lannstark, age=100)

22:05:36 [simpleMessageListenerContainer-2] : Person(name=lannstark, age=200)
22:05:36 [simpleMessageListenerContainer-3] : Person(name=lannstark, age=100)

...

오호 메시지를 받아 잠들었고, PersonListener의 listen은 동시에 호출되었습니다.

메시지를 3개 이상 넣어 놓고 해봐도 로그는 동일했습니다. 설정된 최대 max thread 개수만큼만 동시에 처리할 수 있도록 되어 있네요. (아래 서술했지만 메시지를 처리하는 기본 thread 수는 2개입니다)

AbstractMessageListenerContainer

@SqsListener 어노테이션 역시 내부적으로는 폴링을 하고 있는데 (SQS에 계속 "야 메시지 있냐? 있으면 줘~" 요청을 보내는 방식) 관련 코드는 AbstractMessageListenerContainer와 그 구현체인 SimpleMessageListenerContainer 에서 살짝 볼 수 있습니다. 몇 가지 알고 있으면 좋을 사항들을 나열해보자면,

  • 메시지를 처리하는 기본 thread 개수는 2개입니다. 때문에 위의 테스트에서 최대 2개의 thread만 활용 되었습니다.
  • Long polling 방식을 기본적으로 사용하고 있으며, 폴링 요청 이후 메시지를 대기하는 시간은 최대값인 20초를 쓰고 있습니다. 공식 문서에는 최대값이 20초라고 되어 있는데 정작 메시지 수신 정보에서는 0 ~ 999로 되어 있습니다.. (혹시 차이를 아시는 분은 댓글 부탁드립니다!!)

(999초로 폴링 기간을 설정하고 @SqsListener를 동작시켜도 timeout이 나지 않았습니다)

  • AbstractMessageListenerContainer에서 Spring Bean의 LifeCycle을 온전하게 구현해두었고 Phase 우선순위를 가장 높이 설정해 두었기 때문에 별도의 graceful shutdown 설정을 하지 않아도 괜찮습니다만, spring-cloud-aws 버전에 따라 구현이 바뀌는 것을 고려한다면 직접 graceful shutdown을 설정하는 것도 좋아 보입니다.
@Slf4j
@RequiredArgsConstructor
@Component
public class CollectorGracefulShutdownHandler implements SmartLifecycle, BeanFactoryAware {
  private final Map<String, SimpleMessageListenerContainer> messageListenerContainers;

  private boolean isRunning;
  private DefaultSingletonBeanRegistry beanFactory;

  @Override
  public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
    this.beanFactory = (DefaultSingletonBeanRegistry) beanFactory;
  }

  @Override
  public boolean isAutoStartup() {
    return true;
  }

  @Override
  public void stop(Runnable callback) {
    stop();
    callback.run();
  }

  @Override
  public void start() {
    this.isRunning = true;
  }

  @Override
  public void stop() {
    stopSqsListeners();
    this.isRunning = false;
    log.info("[GracefulShutdown 완료]");
  }

  private void stopSqsListeners() {
    this.messageListenerContainers.keySet().forEach(beanName -> beanFactory.destroySingleton(beanName));
  }

  @Override
  public boolean isRunning() {
    return this.isRunning;
  }

  @Override
  public int getPhase() {
    return Integer.MAX_VALUE; // 명시적으로 PHASE 설정
  }
}

 

전체 시리즈 보러가기 : lannstark.tistory.com/88

 

AWS SQS 들이파기

안녕하세요~~ 여러분~! 공부하는개발자 입니다 ㅎㅎㅎ 이번 시간에는 바로바로~ SQS에 대해서 알아보려고 합니다. SQS란 무엇인지, Java + Spring으로 어떻게 사용할 수 있는지, 사용시 주의할 점 등은

lannstark.tistory.com