SQS의 기본 흐름은 다음과 같다. 이 흐름에 따라 코드를 작성할 예정이다.
프로젝트 저장소
https://github.com/icelancer/sqs-example/환경 변수
SQS에 메시지를 보낼 때 필요한 환경 변수는 다음과 같다.'use strict'; module.exports = { // Amazon credentials aws: { region: process.env.AWS_REGION || 'ap-northeast-2', accessKeyId: process.env.AWS_ACCESS_KEY_ID, secretAccessKey: process.env.AWS_SECRET_KEY }, sqs: { apiVersion: '2012-11-05', queueUrl: process.env.SQS_URL } };실질적으로 필요한 값은 AWS_ACCESS_KEY_ID, AWS_SECRET_KEY, SQS_URL 이다.
SQS_URL은 aws 콘솔에서 확인 가능하다.
Sender
메시지를 보내는 코드는 다음과 같다.'use strict'; const aws = require('aws-sdk'); const env = require('./environment'); const QUEUE_URL = env.sqs.queueUrl; // AWS Configuration aws.config.update(env.aws); // SQS 객체 생성 const sqs = new aws.SQS(env.sqs.apiVersion); const PARAMS = { QueueUrl: QUEUE_URL, MessageBody: 'Hello', DelaySeconds: 0, }; sqs.sendMessage(PARAMS).promise() .then(() => { console.log('Message 전송 성공'); }) .catch(error => { console.error(error); });sendMessage의 파라메터는 PARAMS에 정의해 놓았는데 QueueUrl과 MessageBody, MessageAttributes를 제외하고는 따로 지정하지 않으면 SQS를 만들 때 입력한 설정값을 따른다.
MessageAttributes는 이 코드에서는 사용하지 않았다. MessageBody는 String만 가능한데 다양한 타입의 데이터를 보내고 싶을 때는 MessageAttributes를 사용하면 된다.
{ MessageAttributes: { "City": { DataType: "String", StringValue: "Any City" }, "Greeting": { BinaryValue: <Binary String>, DataType: "Binary" }, "Population": { DataType: "Number", StringValue: "1250800" } } }개인적으로는 MessageAttributes 보다는 MessageBody를 자주 사용하는데 String이라는 제약이 있다보니 객체를 JSON.stringify 메서드를 이용해 문자열로 변환해서 전달한다.
const message = { value: 'Hello', number: 1234 }; const PARAMS = { QueueUrl: QUEUE_URL, MessageBody: JSON.stringify(message), DelaySeconds: 0, };그 외 파라메터에 대한 내용은 aws 공식 문서를 보면 자세하게 나와있다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#sendMessage-property
Receiver
메시지 수신은 Sender보다는 복잡한데 Receiver는 메시지를 수신한 후 큐에서 해당 메시지를 지우는 역할도 해야한다.'use strict'; const aws = require('aws-sdk'); const _ = require('lodash'); const env = require('./environment'); const QUEUE_URL = env.sqs.queueUrl; // AWS Configuration aws.config.update(env.aws); // SQS 객체 생성 const sqs = new aws.SQS(env.sqs.apiVersion); const PARAMS = { QueueUrl: QUEUE_URL, MaxNumberOfMessages: 10 }; /** * SQS에서 받은 메시지를 콘솔에 출력한다. **/ function onReceiveMessage(messages) { if (_.isNil(messages.Messages) === false) { messages.Messages.forEach(message => { console.log(message.Body); }); } return messages; } /** * SQS에서 받은 메시지를 삭제한다. **/ function deleteMessages(messages) { if (_.isNil(messages.Messages)) { return; } // SQS 삭제에 필요한 형식으로 변환한다. const entries = messages.Messages.map((msg) => { return { Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle, }; }); // 메시지를 삭제한다. return sqs.deleteMessageBatch({ Entries: entries, QueueUrl: QUEUE_URL, }).promise(); } sqs.receiveMessage(PARAMS).promise() .then(onReceiveMessage) .then(deleteMessages) .catch(error => { console.error(error); });
sqs.receiveMessage()를 호출하면 Queue에 있는 메시지를 가져온다. 파라메터로 전달한 PARAMS는 다음처럼 정의했는데 MaxNumberOfMessages는 한 번에 가져올 최대 메시지의 수이다.
const PARAMS = { QueueUrl: QUEUE_URL, MaxNumberOfMessages: 10 };
주의할 점은 MaxNumberOfMessages를 10으로 설정했다고해도 정확하게 10건을 가져오는 건 아니다. 실제 테스트해보니 Queue에 쌓인 데이터가 많을 때는 10건을 가져올 확률이 높지만 데이터가 적을 수록 10건을 다 채우지 않고 3~4건 정도만 가져올 때가 많다.
설정 가능한 값은 1~10까지이다.
/** * SQS에서 받은 메시지를 콘솔에 출력한다. **/ function onReceiveMessage(messages) { if (_.isNil(messages.Messages) === false) { messages.Messages.forEach(message => { console.log(message.Body); }); } return messages; }
onReceiverMessage에서는 Queue에서 받은 메시지를 화면에 출력하는 역할을 한다. Queue에서 받은 메시지 형식은 다음과 같다.
{ ResponseMetadata: { RequestId: 'ab97e6a0-72af-511a-9ba3-b6609a355741' }, Messages: [{ Body: "Hello", MessageId: "d6790f8d-d575-4f01-bc51-40122EXAMPLE", ReceiptHandle: "AQEBzbVv...fqNzFw==" }, { Body: "Hello", MessageId: "d6790f8d-d575-4f01-bc51-40122EXAMPLE", ReceiptHandle: "AQEBzbVv...fqNzFw==" }] }Queue에 데이터가 없을 때는 Messages 없이 ResponseMetadata만 반환한다. 따라서 메시지를 처리하기 전에 Messages가 있는지 먼저 확인 후 처리해야 한다.
receiveMessage에 대한 더 상세한 내용은 AWS API 문서를 참고한다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#receiveMessage-property
다음으로는 메시지를 삭제해야 하는데 삭제 처리를 하지 않으면 해당 메시지는 다시 큐에 들어가게 된다.
/** * SQS에서 받은 메시지를 삭제한다. **/ function deleteMessages(messages) { if (_.isNil(messages.Messages)) { return; } // SQS 삭제에 필요한 형식으로 변환한다. const entries = messages.Messages.map((msg) => { return { Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle, }; }); // 메시지를 삭제한다. return sqs.deleteMessageBatch({ Entries: entries, QueueUrl: QUEUE_URL, }).promise(); }
메시지를 단건으로 삭제할 때는 deleteMessage를 사용하지만 여러 건을 한꺼번에 지우고 싶다면 deleteMessageBatch 메서드를 호출한다. 삭제할 때는 MessageId와 ReceiptHandle 값이 필요한데 Queue에서 받은 메시지에서 이 값을 추출해서 deleteMessageBatch를 호출할 때 함께 넘겨주면 된다.
인자의 형식은 다음과 같다
{ Entries: [{ Id: "FirstMessage", ReceiptHandle: "AQEB1mgl...Z4GuLw==" }, { Id: "SecondMessage", ReceiptHandle: "AQEBLsYM...VQubAA==" }], QueueUrl: "https://sqs.us-east-1.amazonaws.com/80398EXAMPLE/MyQueue" };
deleteMessageBatch에 대한 더 상세한 내용은 AWS API를 참고한다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#deleteMessageBatch-property
전체 소스 코드는 https://github.com/icelancer/sqs-example/에서 확인할 수 있다.