2016년 12월 7일 수요일

NodeJS로 SQS 다루기

지난 포스팅 Amazon Simple Queue Service (SQS)에서는 AWS 콘솔에서 SQS의 큐를 생성하고 메시지를 보내고 받는 방법에 대해 다뤘다. 이번 포스팅에서는 NodeJS에서 SQS를 다루는 방법에 대해 알아본다.

SQS의 기본 흐름은 다음과 같다. 이 흐름에 따라 코드를 작성할 예정이다.

프로젝트 저장소

https://github.com/icelancer/sqs-example/

환경 변수

SQS에 메시지를 보낼 때 필요한 환경 변수는 다음과 같다.
'use strict';

module.exports = {
  // Amazon credentials
  aws: {
    region: process.env.AWS_REGION || 'ap-northeast-2',
    accessKeyId: process.env.AWS_ACCESS_KEY_ID,
    secretAccessKey: process.env.AWS_SECRET_KEY
  },

  sqs: {
    apiVersion: '2012-11-05',
    queueUrl: process.env.SQS_URL
  }
};
실질적으로 필요한 값은 AWS_ACCESS_KEY_ID, AWS_SECRET_KEY, SQS_URL 이다.
SQS_URL은 aws 콘솔에서 확인 가능하다.




Sender

메시지를 보내는 코드는 다음과 같다.
'use strict';

const aws = require('aws-sdk');
const env = require('./environment');

const QUEUE_URL = env.sqs.queueUrl;

// AWS Configuration
aws.config.update(env.aws);

// SQS 객체 생성
const sqs = new aws.SQS(env.sqs.apiVersion);

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MessageBody: 'Hello',
  DelaySeconds: 0,
};

sqs.sendMessage(PARAMS).promise()
  .then(() => { console.log('Message 전송 성공'); })
  .catch(error => { console.error(error); });

sendMessage의 파라메터는 PARAMS에 정의해 놓았는데 QueueUrl과 MessageBody, MessageAttributes를 제외하고는 따로 지정하지 않으면 SQS를 만들 때 입력한 설정값을 따른다.

MessageAttributes는 이 코드에서는 사용하지 않았다. MessageBody는 String만 가능한데 다양한 타입의 데이터를 보내고 싶을 때는 MessageAttributes를 사용하면 된다.
{
  MessageAttributes: {
    "City": {
      DataType: "String",
        StringValue: "Any City"
    },
    "Greeting": {
      BinaryValue: <Binary String>,
        DataType: "Binary"
    },
    "Population": {
      DataType: "Number",
        StringValue: "1250800"
    }
  }
}
개인적으로는 MessageAttributes 보다는 MessageBody를 자주 사용하는데 String이라는 제약이 있다보니 객체를 JSON.stringify 메서드를 이용해 문자열로 변환해서 전달한다.
const message = {
  value: 'Hello',  
  number: 1234
};

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MessageBody: JSON.stringify(message),
  DelaySeconds: 0,
};
그 외 파라메터에 대한 내용은 aws 공식 문서를 보면 자세하게 나와있다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#sendMessage-property

Receiver

메시지 수신은 Sender보다는 복잡한데 Receiver는 메시지를 수신한 후 큐에서 해당 메시지를 지우는 역할도 해야한다.
'use strict';

const aws = require('aws-sdk');
const _ = require('lodash');
const env = require('./environment');

const QUEUE_URL = env.sqs.queueUrl;

// AWS Configuration
aws.config.update(env.aws);

// SQS 객체 생성
const sqs = new aws.SQS(env.sqs.apiVersion);

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MaxNumberOfMessages: 10
};

/**
 * SQS에서 받은 메시지를 콘솔에 출력한다.
 **/
function onReceiveMessage(messages) {
  if (_.isNil(messages.Messages) === false) {
    messages.Messages.forEach(message => {
      console.log(message.Body);
    });
  }

  return messages;
}

/**
 * SQS에서 받은 메시지를 삭제한다.
 **/
function deleteMessages(messages) {
  if (_.isNil(messages.Messages)) {
    return;
  }

  // SQS 삭제에 필요한 형식으로 변환한다.
  const entries = messages.Messages.map((msg) => {
    return {
      Id: msg.MessageId,
      ReceiptHandle: msg.ReceiptHandle,
    };
  });

  // 메시지를 삭제한다.
  return sqs.deleteMessageBatch({
    Entries: entries,
    QueueUrl: QUEUE_URL,
  }).promise();

}

sqs.receiveMessage(PARAMS).promise()
  .then(onReceiveMessage)
  .then(deleteMessages)
  .catch(error => {
    console.error(error);
  });


sqs.receiveMessage()를 호출하면 Queue에 있는 메시지를 가져온다. 파라메터로 전달한 PARAMS는 다음처럼 정의했는데 MaxNumberOfMessages는 한 번에 가져올 최대 메시지의 수이다.

const PARAMS = {
  QueueUrl: QUEUE_URL,
  MaxNumberOfMessages: 10
};

주의할 점은 MaxNumberOfMessages를 10으로 설정했다고해도 정확하게 10건을 가져오는 건 아니다. 실제 테스트해보니 Queue에 쌓인 데이터가 많을 때는 10건을 가져올 확률이 높지만 데이터가 적을 수록 10건을 다 채우지 않고 3~4건 정도만 가져올 때가 많다.
설정 가능한 값은 1~10까지이다.

/**
 * SQS에서 받은 메시지를 콘솔에 출력한다.
 **/
function onReceiveMessage(messages) {
  if (_.isNil(messages.Messages) === false) {
    messages.Messages.forEach(message => {
      console.log(message.Body);
    });
  }

  return messages;
}

onReceiverMessage에서는 Queue에서 받은 메시지를 화면에 출력하는 역할을 한다. Queue에서 받은 메시지 형식은 다음과 같다.
{
  ResponseMetadata: { RequestId: 'ab97e6a0-72af-511a-9ba3-b6609a355741' },
  Messages: [{
    Body: "Hello",
    MessageId: "d6790f8d-d575-4f01-bc51-40122EXAMPLE", 
    ReceiptHandle: "AQEBzbVv...fqNzFw=="
  }, {
    Body: "Hello",
    MessageId: "d6790f8d-d575-4f01-bc51-40122EXAMPLE", 
    ReceiptHandle: "AQEBzbVv...fqNzFw=="
  }]
}
Queue에 데이터가 없을 때는 Messages 없이 ResponseMetadata만 반환한다. 따라서 메시지를 처리하기 전에 Messages가 있는지 먼저 확인 후 처리해야 한다.
receiveMessage에 대한 더 상세한 내용은 AWS API 문서를 참고한다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#receiveMessage-property

다음으로는 메시지를 삭제해야 하는데 삭제 처리를 하지 않으면 해당 메시지는 다시 큐에 들어가게 된다.
/**
 * SQS에서 받은 메시지를 삭제한다.
 **/
function deleteMessages(messages) {
  if (_.isNil(messages.Messages)) {
    return;
  }

  // SQS 삭제에 필요한 형식으로 변환한다.
  const entries = messages.Messages.map((msg) => {
    return {
      Id: msg.MessageId,
      ReceiptHandle: msg.ReceiptHandle,
    };
  });

  // 메시지를 삭제한다.
  return sqs.deleteMessageBatch({
    Entries: entries,
    QueueUrl: QUEUE_URL,
  }).promise();

}

메시지를 단건으로 삭제할 때는 deleteMessage를 사용하지만 여러 건을 한꺼번에 지우고 싶다면 deleteMessageBatch 메서드를 호출한다. 삭제할 때는 MessageId와 ReceiptHandle 값이 필요한데 Queue에서 받은 메시지에서 이 값을 추출해서 deleteMessageBatch를 호출할 때 함께 넘겨주면 된다.
인자의 형식은 다음과 같다
{
  Entries: [{
    Id: "FirstMessage", 
    ReceiptHandle: "AQEB1mgl...Z4GuLw=="
   }, {
    Id: "SecondMessage", 
    ReceiptHandle: "AQEBLsYM...VQubAA=="
   }], 
  QueueUrl: "https://sqs.us-east-1.amazonaws.com/80398EXAMPLE/MyQueue"
 };

deleteMessageBatch에 대한 더 상세한 내용은 AWS API를 참고한다.
http://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/SQS.html#deleteMessageBatch-property

전체 소스 코드는 https://github.com/icelancer/sqs-example/에서 확인할 수 있다.

2016년 12월 4일 일요일

Amazon Simple Queue Service (SQS)

SQS(Simple Queue Service)는 AWS에서 제공하는 메시지 큐(이하 MQ) 서비스이다. SQS는 이름 그대로 간단한 큐를 제공하는 서비스로 다른 MQ 미들웨어에서 지원하는 많은 기능을 지원하지 않는다. 그럼에도 불구하고 내가 SQS를 실제로 사용하는 이유는 다음과 같다.

편리함
다른 MQ처럼 설치할 필요가 없고 AWS에서 서비스로 제공하고 있어 사용량에 따른 Scaling을 염려할 필요가 없다. MQ를 사용하는 만큼만 돈을 지불하면 되는 방식이라 초기 사용에도 큰 부담이 없다.

안정성
서버 장애에 대한 걱정이 없다. MQ를 사용할 때 High Availability를 위해 클러스터링과 같이 고려해야 할 요소가 몇 가지 있는데 이에 대한 사전 지식 없이도 사용할 수 있다.

배치 처리
다른 MQ처럼 단건으로 메시지를 생산하고 소비하는 방식이 아닌 배치로 생산하고 소비할 수 있다. 단, 배치의 크기는 최대 10건으로 제한적이다.

SQS에서 Queue 만들기


SQS 메인 화면에서 Create New Queue 버튼을 누르면 아래의 화면이 뜬다.


다양한 옵션이 있는데 이 옵션에 대한 설명은 다음과 같다.
  • Queue Name: 사용할 큐의 이름
  • Default Visibility Timeout: receiver가 메시지를 가져가면 다른 receiver가 읽을 수 없는 상태로 유지하는 시간. (최소 0초 ~ 최대 12시간)
  • Message Retention Period: 삭제되지 않은 메시지를 큐에 보관하는 기간. (최소 1분 ~ 14일)
  • Maximum Message Size: 메시지 최대 크기 (최소 1KB ~ 최대 256KB)
  • Delivery Delay: 메시지를 받은 후 Receiver에게 전달하기 전까지의 지연 시간 (최소 0초 ~ 최대 15분)
  • Receive Message Wait Time: Queue가 비어있을 때 Receiver가 queue에 메시지가 들어오기를 기다릴 시간. (최소 0초 ~ 최대 20초)
Dead Letter Queue Settings에 대해 알아보기 전에 Dead Letter Queue에 대해 먼저 알아보자.
Dead Letter Queue는 처리하지 못한 메시지가 저장되는 곳이다. 그럼 처리하지 못했다는 의미는 무슨 말일까? Receiver가 처리하겠다고 가져간 후에 Default Visibility Timeout 내에 Queue에서 삭제되지 않은 메시지를 의미한다. 이런 메시지는 Dead Letter Queue Settings의 설정 값에 따라 지정한 횟수에 다다르면 Queue에서 제거되고 Dead Letter Queue에 저장된다.
  • Use Redrive Policy: Maximum Receivers에 입력한 횟수를 넘어서면 메시지를 Dead Letter Queue로 이동시키겠다는 설정.
  • Dead Letter Queue: 큐 이름
  • Maximum Receivers: 메시지가 재시도 횟수 (최소 1회 ~ 최대 1000회)
이번 포스팅에서는 전체 흐름을 쉽게 파악하기 위해 SQS 콘솔에서 메시지를 보내고 받고 또 지워본다. NodeJS를 이용한 SQS 연동은 다음 포스팅에서 다룰 예정이다.

전체 흐름


SQS에 메시지 보내기

생성한 blog-queue를 선택하고 Queue Actions -> Send a Message를 클릭하면 다음의 팝업이 뜬다.

전송할 메시지를 입력한 후에 Send Message를 클릭하면 메시지가 전송된다.
팝업에서 "Delay delivery of this message by" 를 체크하고 메시지를 보내면 입력한 시간만큼의 딜레이 후에 Receiver가 메시지를 가져갈 수 있다.

메시지를 보내면 다음의 그림처럼 메시지 1개가 있다고 표시된다. 


큐 목록에 Messages AvailableMessages in Flight 이 있는데 각각의 의미는 다음과 같다.
  • Messages Available: Receiver가 가져갈 수 있는 메시지의 수
  • Messages in Flight: 이미 다른 Receiver가 가져간 메시지의 수
Messages in Flight에 표시되는 메시지는 Queue를 만들 때 설정했던 Default Visibility Timeout 시간이 지나면 Messages Available에 카운팅된다.

SQS에서 메시지 조회


View/Delete Messages를 클릭하면 다이얼로그가 뜨는데 "Start Polling For Messages" 버튼을 누르면 다음처럼 전송한 메시지가 나타난다.


이렇게 메시지를 가져오면 Messages in Flight에는 카운팅이 올라간다.


SQS에서 메시지 삭제하기

SQS에서는 처리가 완료되었으면 해당 메시지를 직접 삭제해야 한다.
그림처럼 메시지를 조회 후 체크를 한 후 오른쪽 하단의 "Delete 1 Message"를 누르면 된다.

Message Queue는 왜 사용해야 하는가?

일반적인 서버-클라이언트 구조에서는 사용자가 요청을 하면 서버는 그에 대한 처리를 한 후 사용자에게 응답을 한다. 간단한 서버 구조에서는 굳이 Message Queue(이하 MQ)를 사용할 필요가 없다. 우선 MQ를 적용하려면 RabbitMQ, Kafka, ActiveMQ등 다양한 MQ 중에서 시스템 목적에 맞는 MQ를 선정해야 하고 또 서버에 MQ를 설치해야 한다. 설치로 끝이 아니라 그 사용 방법 및 라이브러리 사용법도 익혀야 하며 MQ가 지원하는 다양한 옵션 중에 시스템 목적에 맞는 옵션을 찾아 설정해야 하고 주고 받을 메시지 구조도 정의해야 할 뿐만 아니라 다양한 전달 방식 중 시스템 목적에 맞는 방식을 선정해야 한다. 단순히 서버에서 처리하면 이런 불편함 없이 간단하게 해결할 수 있는데 왜 MQ를 사용해야 할까?

애플리케이션/시스템 간의 통신

서버 간에 데이터를 주고 받거나 어떤 작업을 요청을 할 때는 항상 시스템 장애를 염두에 두어야 한다. 서버가 갑자기 죽거나 서버 점검 등으로 다운타임이 발생하는 동안에는 요청을 보낼 수가 없다. 요청하는 서버에서 failover 처리를 해놓고 연계 시스템이 다시 살아났을 때 요청을 보내는 방법도 있지만 MQ를 이용하면 더욱 간편하게 처리할 수 있다.
출처: rabbitmq.com
P는 C에 직접 요청하는 것이 아닌 MQ에 전달한다. 그럼 C는 MQ로 부터 요청 데이터를 수신해서 처리한다. 만약 C가 요청을 받을 수 없을 수 없는 상황이라면 해당 요청은 C가 받을 때까지 MQ에 머무르게 된다.
물론 이런 상황에서 MQ에 다운타임이 발생하면 무용지물이 되어버리겠지만, 많은 MQ가 고가용성을 위해 클러스터링 등을 지원한다.

서버 부하가 많은 작업

이미지 처리, 비디오 인코딩, 대용량 데이터 처리와 같은 작업은 메모리와 CPU를 많이 사용한다. 이러한 작업은 동시에 처리할 수 있는 양이 상당히 한정적이어서 필요하다고 무작정 요청을 처리할 수는 없다. 이 때에도 MQ를 사용하면 편리한데 처리해야할 작업을 MQ에 넣어두고 서버는 자신이 동시에 처리할 수 있는 양에 따라 하나의 작업이 끝나면 다음에 처리할 작업을 MQ에서 가져와 처리하면 된다.

부하분산

MQ를 통해 부하분산 처리도 가능하다. 지금까지 설명은 하나의 서버에 대해서만 설명했다.
출처: rabbitmq.com
그림처럼 여러 대의 서버가 하나의 큐를 바라보도록 구성하면 처리할 데이터가 많아져도 각 서버는 자신의 처리량에 맞게 태스크를 가져와 처리할 수 있다. 이러한 구조는  horizontal scaling에 유리하다.

데이터 손실 방지

MQ를 사용하지 않는다면 외부에서 받은 요청을 메모리에 저장했다가 들어온 순서대로 처리하게 할 수도 있다. 하지만 어떠한 이유로 서버가 다운되어 버리면 메모리에 쌓아둔 요청은 모두 없어지고 만다. MQ를 사용하면 이를 방지할 수 있는데 MQ로부터 가져온 태스크를 일정 시간이 지나도록 처리했다고 다시 MQ에 알려주지 않으면 MQ는 이 태스크를 다시 큐에 넣어 다시 처리할 수 있도록 한다.


MQ를 사용할 때 얻을 수 있는 잇점은 많지만 적재적소에 사용해야 한다. 요청 결과를 즉시 응답해야할 때에는 MQ는 어울리지 않는다. 주로 요청과는 별개로 처리할 수 있는 비동기 처리에 어울린다. 또한 서버에서 간단하게 처리할 수 있는 일을 MQ를 통해 전달하면 필요없는 오버헤드될 수도 있다.