모든 코드는 github에 올라가 있습니다.
이제 메시지 보내는 것은 성공했으니 보낸 메시지를 받아보려고 합니다.
그 전에 MessageConverter
에 대해서 간단히 알아 봅시다 ㅎㅎ
QueueMessagingTemplate
은 단순한 String을 주고받는 메소드 외에, Java Object를 받는 다양한 전송 메소드들이 존재합니다. 그 중에 convertAndSend()
와 receiveAndConvert()
는 MessageConveter
인터페이스에게 직렬화/역직렬화 책임을 위임하고 있습니다.
실제로 코드를 살펴보면 아래와 같습니다.
AbstractMessageSendingTemplate (spring-messaging)
@Override
public <T> void convertAndSend(String destination, T payload) throws MessagingException {
D channel = resolveMessageChannelByLogicalName(destinationName);
convertAndSend(channel, payload); // 최종적으로 convertAndSend(channel, payload, null, null) 호출
}
@Override
public void convertAndSend(D destination, Object payload, @Nullable Map<String, Object> headers,
@Nullable MessagePostProcessor postProcessor) throws MessagingException {
Message<?> message = doConvert(payload, headers, postProcessor);
send(destination, message);
}
protected Message<?> doConvert(Object payload, @Nullable Map<String, Object> headers,
@Nullable MessagePostProcessor postProcessor) {
// 여차저차 전처리 ...
MessageConverter converter = getMessageConverter();
/** 바로 여기서 MessageConverter가 사용된다. **/
Message<?> message = (converter instanceof SmartMessageConverter ?
((SmartMessageConverter) converter).toMessage(payload, messageHeaders, conversionHint) :
converter.toMessage(payload, messageHeaders));
// 여차저차 후처리 ...
return message;
}
QueueMessagingTemplate (spring-cloud-aws-messaging)
// 최종적으로 호출되는 receiveAndConvert
@Override
public <T> T receiveAndConvert(QueueMessageChannel destination, Class<T> targetClass)
throws MessagingException {
Message<?> message = destination.receive();
if (message != null) {
/** 바로 여기서 MessageConverter가 사용된다 **/
return (T) getMessageConverter().fromMessage(message, targetClass);
}
else {
return null;
}
}
(아래는 참고용 계층구조)
MessageConveter
의 기본 구현체는 SimpleMessageConverter
라고 공식 document에 적혀있었는데 실제로 들어가 있는 MessageConverter는 CompositeMessageConverter 였습니다. (CompositeMessageConverter 내부에는 StringMessageConverter와 MappingJackson2MessageConverter가 들어가있습니다, 이를 알게 된 이유는 아래서 테스트한 Person 객체에 @Getter
를 주지 않고 돌려봤더니 CompositeMessageConverter에서 convert를 못했어~ 라는 에러가 떴기 때문입니다 ㅋㅋㅋㅋㅋ)
MessageConverter 인터페이스는 아래와 같이 생겼습니다.
public interface MessageConverter {
/**
* Message의 payload를 주어진 targetClass로 역직렬화 한다.
* 지원하지 않는 media type이거나 역직렬화를 수행할 수 없으면 null을 반환한다.
*/
@Nullable
Object fromMessage(Message<?> message, Class<?> targetClass);
/**
* 주어진 payload와 headers를 가지고 있는 Message를 만든다.
*/
@Nullable
Message<?> toMessage(Object payload, @Nullable MessageHeaders headers);
}
바로 테스트를 해보죠~
// MainController 메소드
@PostMapping("/message")
public void sendMessage(@RequestBody String name) {
messageSender.sendMessage(name);
}
// SqsMessageSender 메소드
public void sendMessage(String name) {
queueMessagingTemplate.convertAndSend("sqs-study", new Person(name, 20));
}
// Person 클래스
@Getter // Getter가 없으면 직렬화할때 MappingJackson2MessageConverter가 처리할 수 없게 된다
@ToString
@NoArgsConstructor // 기본생성자가 없으면 역직렬화할때 MappingJackson2MessageConverter가 처리할 수 없게 된다
@AllArgsConstructor
public class Person {
private String name;
private int age;
}
body에 lannstark를 넣고 POST /message
를 호출하니 SQS에 다음과 같은 JSON 형태로 message가 잘 존재하는 것을 확인할 수 있습니다..!! 와 누가 MessageConverter에 CompositeMessageConverter를 넣어준건지?!! 대박... (확인해보니 QueueMessagingTemplate가 생성될때 Jackson 모듈이 있는지 확인해서 넣어줍니다 ㅎㅎ)
이제 진짜로 메시지를 받아보겠습니다.
메시지를 받는 방법은 2가지가 존재하는데, 우선은 (덜 쓰일법한) 첫 번째 방법부터 알아보겠습니다. 두 번째 방법은 다른 posting에서 다루고 있습니다.
// MainController 메소드 추가
@GetMapping("/message")
public void getMessage() {
messageService.getMessage();
}
// SqsMessageSender에서 SqsMessageService로 이름 변경후 메소드 추가
public void getMessage() {
Person person = queueMessagingTemplate.receiveAndConvert("sqs-study", Person.class);
System.out.println("SQS 로부터 받은 메시지 : " + person);
}
출력결과 :
이때 주의할점!! receiveAndConvert
는 Message 처리의 성공/실패 여부와 무관하게 자동으로 Queue에 delete 요청을 날리게 됩니다. 또한 timeout 값에는 항상 0이 고정되어 있어 long polling 대신 short polling을 사용하게 됩니다. short polling을 사용하면, SQS 호출수가 늘어나 요금이 더 많이 청구될 수 있습니다.
아래는 receiveAndConvert
를 호출할 경우에 최종 사용되는 코드입니다.
@Override
public Message<String> receive(long timeout) {
ReceiveMessageResult receiveMessageResult = this.amazonSqs.receiveMessage(new ReceiveMessageRequest(this.queueUrl)
.withMaxNumberOfMessages(1)
.withWaitTimeSeconds(Long.valueOf(timeout).intValue())
.withAttributeNames(ATTRIBUTE_NAMES)
.withMessageAttributeNames(MESSAGE_ATTRIBUTE_NAMES));
if (receiveMessageResult.getMessages().isEmpty()) {
return null;
}
com.amazonaws.services.sqs.model.Message amazonMessage = receiveMessageResult.getMessages().get(0);
Message<String> message = createMessage(amazonMessage);
// 바로 여기서, 가져온 메시지를 바로 지워버린다. 때문에 Message를 받아와 처리가 되지 않았더라도 Queue에서는 찾을 수 없다.
this.amazonSqs.deleteMessage(new DeleteMessageRequest(this.queueUrl, amazonMessage.getReceiptHandle()));
return message;
}
음.. 위에서 덜 쓰일법한 이라고 했는데요, 가능한 사용하지 않는 편이 좋을 것 같습니다 ㅎㅎㅎ
메시지를 받아오는 두 번째 방법은 다음 포스팅에서 다루고 있습니다!
전체 시리즈 보러가기 : lannstark.tistory.com/88
'개발 공부 기록하기 > 03. AWS & Infra' 카테고리의 다른 글
[AWS SQS 들이파기] DLQ(Dead-Letter Queue) 설정하기 (0) | 2020.09.23 |
---|---|
[AWS SQS 들이파기] SQS에서 메시지 받기 II (Java + Gradle + Spring) (1) | 2020.09.23 |
[AWS SQS 들이파기] AWS SQS로 메시지 보내기 (Java + Gradle + Spring) (2) | 2020.09.23 |
[AWS SQS 들이파기] SQS 만들어보기 (0) | 2020.09.23 |
AWS Access Key 발급받고 사용하기 (1) | 2020.09.17 |