개발 공부 기록하기/03. AWS & Infra

[AWS SQS 들이파기] SQS에서 메시지 받기 I (Java + Gradle + Spring)

lannstark 2020. 9. 23. 20:26

모든 코드는 github에 올라가 있습니다.

이제 메시지 보내는 것은 성공했으니 보낸 메시지를 받아보려고 합니다.
그 전에 MessageConverter 에 대해서 간단히 알아 봅시다 ㅎㅎ

QueueMessagingTemplate은 단순한 String을 주고받는 메소드 외에, Java Object를 받는 다양한 전송 메소드들이 존재합니다. 그 중에 convertAndSend()receiveAndConvert()MessageConveter 인터페이스에게 직렬화/역직렬화 책임을 위임하고 있습니다.

실제로 코드를 살펴보면 아래와 같습니다.

AbstractMessageSendingTemplate (spring-messaging)

@Override
public <T> void convertAndSend(String destination, T payload) throws MessagingException {
  D channel = resolveMessageChannelByLogicalName(destinationName);
  convertAndSend(channel, payload); // 최종적으로 convertAndSend(channel, payload, null, null) 호출 
}

@Override
public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,
  @Nullable MessagePostProcessor postProcessor) throws MessagingException {
  Message<?> message = doConvert(payload, headers, postProcessor);
  send(destination, message);
}

protected Message<?> doConvert(Object payload, @Nullable Map<String, Object> headers,
  @Nullable MessagePostProcessor postProcessor) {
  // 여차저차 전처리 ...
  MessageConverter converter = getMessageConverter();

  /** 바로 여기서 MessageConverter가 사용된다. **/
    Message<?> message = (converter instanceof SmartMessageConverter ?
                    ((SmartMessageConverter) converter).toMessage(payload, messageHeaders, conversionHint) :
                    converter.toMessage(payload, messageHeaders));

  // 여차저차 후처리 ...
  return message;
}

QueueMessagingTemplate (spring-cloud-aws-messaging)

// 최종적으로 호출되는 receiveAndConvert 
@Override
public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass)
        throws MessagingException {
  Message<?> message = destination.receive();
  if (message != null) {
    /** 바로 여기서 MessageConverter가 사용된다 **/
    return (T) getMessageConverter().fromMessage(message, targetClass);
  }
  else {
    return null;
  }
}

(아래는 참고용 계층구조)

MessageConveter의 기본 구현체는 SimpleMessageConverter라고 공식 document에 적혀있었는데 실제로 들어가 있는 MessageConverter는 CompositeMessageConverter 였습니다. (CompositeMessageConverter 내부에는 StringMessageConverter와 MappingJackson2MessageConverter가 들어가있습니다, 이를 알게 된 이유는 아래서 테스트한 Person 객체에 @Getter를 주지 않고 돌려봤더니 CompositeMessageConverter에서 convert를 못했어~ 라는 에러가 떴기 때문입니다 ㅋㅋㅋㅋㅋ)

MessageConverter 인터페이스는 아래와 같이 생겼습니다.

public interface MessageConverter {

  /**
   * Message의 payload를 주어진 targetClass로 역직렬화 한다.
   * 지원하지 않는 media type이거나 역직렬화를 수행할 수 없으면 null을 반환한다.
   */
  @Nullable
  Object fromMessage(Message<?> message, Class<?> targetClass);

  /**
   * 주어진 payload와 headers를 가지고 있는 Message를 만든다.
   */
  @Nullable
  Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);

}

바로 테스트를 해보죠~

// MainController 메소드
@PostMapping("/message")
public void sendMessage(@RequestBody String name) {
  messageSender.sendMessage(name);
}

// SqsMessageSender 메소드
public void sendMessage(String name) {
  queueMessagingTemplate.convertAndSend("sqs-study", new Person(name, 20));
}

// Person 클래스
@Getter // Getter가 없으면 직렬화할때 MappingJackson2MessageConverter가 처리할 수 없게 된다
@ToString
@NoArgsConstructor // 기본생성자가 없으면 역직렬화할때 MappingJackson2MessageConverter가 처리할 수 없게 된다
@AllArgsConstructor
public class Person {

  private String name;
  private int age;

}

body에 lannstark를 넣고 POST /message 를 호출하니 SQS에 다음과 같은 JSON 형태로 message가 잘 존재하는 것을 확인할 수 있습니다..!! 와 누가 MessageConverter에 CompositeMessageConverter를 넣어준건지?!! 대박... (확인해보니 QueueMessagingTemplate가 생성될때 Jackson 모듈이 있는지 확인해서 넣어줍니다 ㅎㅎ)

이제 진짜로 메시지를 받아보겠습니다.

메시지를 받는 방법은 2가지가 존재하는데, 우선은 (덜 쓰일법한) 첫 번째 방법부터 알아보겠습니다. 두 번째 방법은 다른 posting에서 다루고 있습니다.

// MainController 메소드 추가
@GetMapping("/message")
public void getMessage() {
  messageService.getMessage();
}

// SqsMessageSender에서 SqsMessageService로 이름 변경후 메소드 추가
public void getMessage() {
  Person person = queueMessagingTemplate.receiveAndConvert("sqs-study", Person.class);
  System.out.println("SQS 로부터 받은 메시지 : " + person);
}

출력결과 :

이때 주의할점!! receiveAndConvert는 Message 처리의 성공/실패 여부와 무관하게 자동으로 Queue에 delete 요청을 날리게 됩니다. 또한 timeout 값에는 항상 0이 고정되어 있어 long polling 대신 short polling을 사용하게 됩니다. short polling을 사용하면, SQS 호출수가 늘어나 요금이 더 많이 청구될 수 있습니다.

아래는 receiveAndConvert를 호출할 경우에 최종 사용되는 코드입니다.

@Override
public Message<String> receive(long timeout) {
  ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(new ReceiveMessageRequest(this.queueUrl)
      .withMaxNumberOfMessages(1)
      .withWaitTimeSeconds(Long.valueOf(timeout).intValue())
      .withAttributeNames(ATTRIBUTE_NAMES)
      .withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
  if (receiveMessageResult.getMessages().isEmpty()) {
    return null;
  }
  com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
  Message<String> message = createMessage(amazonMessage);

  // 바로 여기서, 가져온 메시지를 바로 지워버린다. 때문에 Message를 받아와 처리가 되지 않았더라도 Queue에서는 찾을 수 없다.
  this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
  return message;
}

음.. 위에서 덜 쓰일법한 이라고 했는데요, 가능한 사용하지 않는 편이 좋을 것 같습니다 ㅎㅎㅎ

메시지를 받아오는 두 번째 방법은 다음 포스팅에서 다루고 있습니다!

 

전체 시리즈 보러가기 : lannstark.tistory.com/88

 

AWS SQS 들이파기

안녕하세요~~ 여러분~! 공부하는개발자 입니다 ㅎㅎㅎ 이번 시간에는 바로바로~ SQS에 대해서 알아보려고 합니다. SQS란 무엇인지, Java + Spring으로 어떻게 사용할 수 있는지, 사용시 주의할 점 등은

lannstark.tistory.com