2016년 12월 7일 수요일

NodeJS로 SQS 다루기

지난 포스팅 Amazon Simple Queue Service (SQS)에서는 AWS 콘솔에서 SQS의 큐를 생성하고 메시지를 보내고 받는 방법에 대해 다뤘다. 이번 포스팅에서는 NodeJS에서 SQS를 다루는 방법에 대해 알아본다.

SQS의 기본 흐름은 다음과 같다. 이 흐름에 따라 코드를 작성할 예정이다.

프로젝트 저장소

https://github.com/icelancer/sqs-example/

환경 변수

SQS에 메시지를 보낼 때 필요한 환경 변수는 다음과 같다.
'use strict';

module.exports = {
  // Amazon credentials
  aws: {
    region: process.env.AWS_REGION || 'ap-northeast-2',
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_KEY
  },

  sqs: {
    apiVersion: '2012-11-05',
    queueUrl: process.env.SQS_URL
  }
};
실질적으로 필요한 값은 AWS_ACCESS_KEY_ID, AWS_SECRET_KEY, SQS_URL 이다.
SQS_URL은 aws 콘솔에서 확인 가능하다.




Sender

메시지를 보내는 코드는 다음과 같다.
'use strict';

const aws = require('aws-sdk');
const env = require('./environment');

const QUEUE_URL = env.sqs.queueUrl;

// AWS Configuration
aws.config.update(env.aws);

// SQS 객체 생성
const sqs = new aws.SQS(env.sqs.apiVersion);

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MessageBody: 'Hello',
  DelaySeconds: 0,
};

sqs.sendMessage(PARAMS).promise()
  .then(() => { console.log('Message 전송 성공'); })
  .catch(error => { console.error(error); });

sendMessage의 파라메터는 PARAMS에 정의해 놓았는데 QueueUrl과 MessageBody, MessageAttributes를 제외하고는 따로 지정하지 않으면 SQS를 만들 때 입력한 설정값을 따른다.

MessageAttributes는 이 코드에서는 사용하지 않았다. MessageBody는 String만 가능한데 다양한 타입의 데이터를 보내고 싶을 때는 MessageAttributes를 사용하면 된다.
{
  MessageAttributes: {
    "City": {
      DataType: "String",
        StringValue: "Any City"
    },
    "Greeting": {
      BinaryValue: <Binary String>,
        DataType: "Binary"
    },
    "Population": {
      DataType: "Number",
        StringValue: "1250800"
    }
  }
}
개인적으로는 MessageAttributes 보다는 MessageBody를 자주 사용하는데 String이라는 제약이 있다보니 객체를 JSON.stringify 메서드를 이용해 문자열로 변환해서 전달한다.
const message = {
  value: 'Hello',  
  number: 1234
};

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MessageBody: JSON.stringify(message),
  DelaySeconds: 0,
};
그 외 파라메터에 대한 내용은 aws 공식 문서를 보면 자세하게 나와있다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#sendMessage-property

Receiver

메시지 수신은 Sender보다는 복잡한데 Receiver는 메시지를 수신한 후 큐에서 해당 메시지를 지우는 역할도 해야한다.
'use strict';

const aws = require('aws-sdk');
const _ = require('lodash');
const env = require('./environment');

const QUEUE_URL = env.sqs.queueUrl;

// AWS Configuration
aws.config.update(env.aws);

// SQS 객체 생성
const sqs = new aws.SQS(env.sqs.apiVersion);

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MaxNumberOfMessages: 10
};

/**
 * SQS에서 받은 메시지를 콘솔에 출력한다.
 **/
function onReceiveMessage(messages) {
  if (_.isNil(messages.Messages) === false) {
    messages.Messages.forEach(message => {
      console.log(message.Body);
    });
  }

  return messages;
}

/**
 * SQS에서 받은 메시지를 삭제한다.
 **/
function deleteMessages(messages) {
  if (_.isNil(messages.Messages)) {
    return;
  }

  // SQS 삭제에 필요한 형식으로 변환한다.
  const entries = messages.Messages.map((msg) => {
    return {
      Id: msg.MessageId,
      ReceiptHandle: msg.ReceiptHandle,
    };
  });

  // 메시지를 삭제한다.
  return sqs.deleteMessageBatch({
    Entries: entries,
    QueueUrl: QUEUE_URL,
  }).promise();

}

sqs.receiveMessage(PARAMS).promise()
  .then(onReceiveMessage)
  .then(deleteMessages)
  .catch(error => {
    console.error(error);
  });


sqs.receiveMessage()를 호출하면 Queue에 있는 메시지를 가져온다. 파라메터로 전달한 PARAMS는 다음처럼 정의했는데 MaxNumberOfMessages는 한 번에 가져올 최대 메시지의 수이다.

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MaxNumberOfMessages: 10
};

주의할 점은 MaxNumberOfMessages를 10으로 설정했다고해도 정확하게 10건을 가져오는 건 아니다. 실제 테스트해보니 Queue에 쌓인 데이터가 많을 때는 10건을 가져올 확률이 높지만 데이터가 적을 수록 10건을 다 채우지 않고 3~4건 정도만 가져올 때가 많다.
설정 가능한 값은 1~10까지이다.

/**
 * SQS에서 받은 메시지를 콘솔에 출력한다.
 **/
function onReceiveMessage(messages) {
  if (_.isNil(messages.Messages) === false) {
    messages.Messages.forEach(message => {
      console.log(message.Body);
    });
  }

  return messages;
}

onReceiverMessage에서는 Queue에서 받은 메시지를 화면에 출력하는 역할을 한다. Queue에서 받은 메시지 형식은 다음과 같다.
{
  ResponseMetadata: { RequestId: 'ab97e6a0-72af-511a-9ba3-b6609a355741' },
  Messages: [{
    Body: "Hello",
    MessageId: "d6790f8d-d575-4f01-bc51-40122EXAMPLE", 
    ReceiptHandle: "AQEBzbVv...fqNzFw=="
  }, {
    Body: "Hello",
    MessageId: "d6790f8d-d575-4f01-bc51-40122EXAMPLE", 
    ReceiptHandle: "AQEBzbVv...fqNzFw=="
  }]
}
Queue에 데이터가 없을 때는 Messages 없이 ResponseMetadata만 반환한다. 따라서 메시지를 처리하기 전에 Messages가 있는지 먼저 확인 후 처리해야 한다.
receiveMessage에 대한 더 상세한 내용은 AWS API 문서를 참고한다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#receiveMessage-property

다음으로는 메시지를 삭제해야 하는데 삭제 처리를 하지 않으면 해당 메시지는 다시 큐에 들어가게 된다.
/**
 * SQS에서 받은 메시지를 삭제한다.
 **/
function deleteMessages(messages) {
  if (_.isNil(messages.Messages)) {
    return;
  }

  // SQS 삭제에 필요한 형식으로 변환한다.
  const entries = messages.Messages.map((msg) => {
    return {
      Id: msg.MessageId,
      ReceiptHandle: msg.ReceiptHandle,
    };
  });

  // 메시지를 삭제한다.
  return sqs.deleteMessageBatch({
    Entries: entries,
    QueueUrl: QUEUE_URL,
  }).promise();

}

메시지를 단건으로 삭제할 때는 deleteMessage를 사용하지만 여러 건을 한꺼번에 지우고 싶다면 deleteMessageBatch 메서드를 호출한다. 삭제할 때는 MessageId와 ReceiptHandle 값이 필요한데 Queue에서 받은 메시지에서 이 값을 추출해서 deleteMessageBatch를 호출할 때 함께 넘겨주면 된다.
인자의 형식은 다음과 같다
{
  Entries: [{
    Id: "FirstMessage", 
    ReceiptHandle: "AQEB1mgl...Z4GuLw=="
   }, {
    Id: "SecondMessage", 
    ReceiptHandle: "AQEBLsYM...VQubAA=="
   }], 
  QueueUrl: "https://sqs.us-east-1.amazonaws.com/80398EXAMPLE/MyQueue"
 };

deleteMessageBatch에 대한 더 상세한 내용은 AWS API를 참고한다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#deleteMessageBatch-property

전체 소스 코드는 https://github.com/icelancer/sqs-example/에서 확인할 수 있다.

댓글 없음: