Post

Bull 메시지 큐를 활용한 요청/응답 메시지 패턴 구현

Bull 메시지 큐를 활용한 요청/응답 메시지 패턴 구현

Bull 메시지 큐를 활용한 요청/응답 메시지 패턴 구현

상관 식별자(Correlation Identifier) 개념과 Bull 메시지 큐를 활용해 단방향 채널 위에 요청/응답 패턴을 구현합니다.

1. 들어가며

보통 메세징 시스템에서 생산자와 작업자 사이에 요청/응답 통신이 반드시 필요한 것은 아니며,
단방향 비동기 통신 파이프 라인 구조를 통해 병렬 처리 및 확장을 수행합니다.

그러나 쿼리 처리는 처리 후 결과를 응답 받아야 합니다.
각 쿼리 요청을 태스크로 정의하고 ID 를 부여해 Bull 메시지 큐에 적재합니다.
메시지 큐의 태스크는 순서대로 실행된 후 요청 전송자(클라이언트)가 ID 로 두 메시지를 상호 연결하고 응답을 적절한 핸들러에 반환해 처리합니다.

▲ 상관 식별자를 사용한 요청/응답 메시지 교환

2. Bull 메시지 큐

Bull 메시지 큐는 redis 기반의 큐 시스템을 구현한 노드 라이브러리 입니다.
요청 피크를 완화할 때,
마이크로서비스 간에 통신채널을 생성할 때,
어플리케이션에서 무거운 작업을 많은 작업자에 나누어 오프로드 할 때 등
다양한 문제를 분할 정복 접근 방식으로 해결할 때 사용됩니다.

Bull 메시지 큐는 Job 이라 불리는 태스크 단위를 사용합니다.
Job 은 메시지 큐에 추가되고 처리될 때 까지 아래 라이프 사이클을 가집니다.

▲ Job 라이프 사이클

3. 각 요소들

패턴을 구현하는데 사용된 각 요소들을 설명합니다.

3-1. Job

각 쿼리 요청을 메시지 큐에 넣기 위해 Job 으로 추상화 합니다.
Job 의 id 는 클라이언트가 각 요청에 대한 응답을 식별을 위한 구분자로 주로 UUID 를 사용합니다.
그리고 응답을 처리할 핸들러를 callback 에 저장합니다.
getParam() 함수는 Job 을 생성할 때 사용합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// job.js

class Job {
  constructor(id, info) {
    this.id;
    this.callback;

    // Job 을 수행하는데 필요한 정보들을 클라이언트에게 받아옵니다.
    this.dbname = info.dbname;
    this.sql = info.sql;
    this.value = info.value;
    this.timeout = info.timeout;
    ...
  }

  getParam() {
    return {
      // Job 을 수행하는데 필요한 정보들
      dbname = this.dbname;
      sql = this.sql;
      value = this.value;
      timeout = this.timeout;
      ...
    }
  }
}

module.exports = Job;

3-2. JobManager

Job 매니저는 EventEmitter 를 상속 받습니다.
init 함수는 Bull 메시지 큐 인스턴스를 생성합니다. 그리고 큐에 저장된 메시지(Job) 를 처리할 방법을 this.queue.process(…) 에 정의합니다.

메시지 큐에서 메시지를 받아올 때 마다 this.queue.process 콜백 함수가 실행됩니다.
이 때 인자로 전달된 Job 에는 숫자나 문자열 데이터만 저장되어 있습니다.
job 인스턴스에서 결과를 처리할 핸들러를 바로 받아 실행할 수 있지만,
메시지에 숫자나 문자만 저장되는 Bull 메시지 큐 특성으로 인해 이벤트 전달 방식으로 this.emit(…) 를 사용해 결과를 전달합니다.

addJob 함수는 Job 을 메시지 큐에 저장합니다.
remvoeOnComplete 옵션으로 메시지 처리 완료 후 큐에서 Job 을 제거할 것인지 선택할 수 있고
timeout 옵션으로 메시지의 타임아웃을 설정할 수 있습니다.

this.queue.add(…) 로 메세지를 큐에 넣은 후 메시지 실행 결과를 전달받을 이벤트 핸들러를 등록합니다.
이벤트 핸들러는 한 번만 호출되면 되기 때문에 this.once(…) 로 등록합니다.
이 이벤트 핸들러에서 태스크 핸들러를 호출합니다.
요청과 응답 메세지를 연결하기 위해 Job 의 id 를 이벤트 명으로 설정합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// JobManager.js

const EventEmitter = require('events');
const Queue = require('bull');
const { doQuery } = require('./query');

class JobManager extends EventEmitter {
  constructor(queueName) {
    super();
    this.queue = null;
    this.queueName = queueName;
  }

  init() {
    if (!this.queue) {
      this.queue = new Queue(this.queueName, `redis://127.0.0.1:6479`, { prefix: `job_` });

      this.queue.process(async (job, done) => {
        done(await doQuery(job.data));
      });

      this.queue.on('completed', (job, result) => {
        this.emit(job.data.id, result);
      });

      this.queue.on('failed', job => {
        this.emit(job.data.id, new Error('Query failed'));
      });
    }
  }

  addJob(job) {
    const message = {
      id: job.id,
      ...job.getParam()
    }

    this.queue.add(message, {
      removeOnComplete: true,
      timeout: 5 * 1000
    }).then(() => {
      this.once(job.id, data => job.callback(data));
    });

    close() {
      return new Promise(async resolve => {
        if (this.queue) {
          this.queue.clean(10)
            .then(() => this.queue.close(true)
              .then(() => resolve(true)));
        }
      })
    }
  }
}

module.exports = JobManager;

3-3. 클라이언트

Job 매니저 인스턴스를 생성하고 메세지 큐에 사용자가 요청한 Job 을 추가합니다.
각 Job 은 고유한 ID 로 UUID 값을 가집니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// client.js
function genUUID() {
  const gen = () => ((1 + Math.random()) * 0x10000 | 0).toString(16).subString(1);
  return `${gen() + gen()}-${gen()}-${gen()}-${gen()}-${gen() + gen() + gen()}`;
}

const JobManager = new JobManager('queryManager');
JobManager.init();

for (const d of list) {
  jobManager.addJob(
    new job(genUUID()), 
    {
      dbname: 'MySql',
      sql: 'INSERT INTO board VALUES(?,?,...)'
      value: `[[${d.line}, ${d.date}...], [${d.line}, ${d.date}...]...]`
      timeout: 5000
      ...
    });
}

4. 동작 과정

1
2
3
4
5
6
7
8
9
10
11
12
13
// client.js
...
for (const d of list) {
  jobManager.addJob(                               // (1)
    new job(genUUID()), 
    {
      dbname: 'MySql',
      sql: 'INSERT INTO board VALUES(?,?,...)'
      value: `[[${d.line}, ${d.date}...], [${d.line}, ${d.date}...]...]`
      timeout: 5000
      ...
    });
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// JobManager.js

const EventEmitter = require('events');
const Queue = require('bull');
const { doQuery } = require('./query');

class JobManager extends EventEmitter {
  constructor(queueName) {
    super();
    this.queue = null;
    this.queueName = queueName;
  }

  init() {
    if (!this.queue) {
      this.queue = new Queue(this.queueName, `redis://127.0.0.1:6479`, { prefix: `job_` });

      this.queue.process(async (job, done) => {           // (4)
        done(await doQuery(job.data));    
      });

      this.queue.on('completed', (job, result) => {       // (5-1)
        this.emit(job.data.id, result);
      });

      this.queue.on('failed', job => {                    // (5-2)
        this.emit(job.data.id, new Error('Query failed'));
      });
    }
  }

  addJob(job) {
    const message = {
      id: job.id,
      ...job.getParam()
    }

    this.queue.add(message, {                                 // (2)
      removeOnComplete: true,
      timeout: 5 * 1000
    }).then(() => {
      this.once(job.id, data => job.callback(data));        // (3)
    });

    ...
  }
}

(1) client.js 에서 Job 을 추가합니다.
(2) Job 을 큐에 추가합니다.
(3) 큐에 추가한 후 Job ID 를 이벤트 명으로 하는 이벤트 핸들러를 등록합니다. 이벤트 핸들러는 Job 실행 결과를 처리할 핸들러를 호출합니다.
(4) Job 이 자신의 순서가 되었을 때 처리되어 쿼리를 수행합니다. 수행 결과를 done() 함수의 인자로 전달해 ‘completed’ 이벤트에서 받을 수 있도록 합니다.
(5-1) Job 의 process 가 완료되면 ‘completed’ 이벤트 핸들러가 호출 됩니다. 핸들러에서 Job 이 처리된 결과를 Job ID 이벤트로 쿼리 처리 결과를 전달합니다. 이 때 (3) 에 등록한 이벤트 핸들러가 호출되고 Job 실행 결과를 콜백 함수로 처리합니다.
(5-2) Job 의 process 가 실패하면 ‘failed’ 이벤트 핸들러가 호출 됩니다. 핸들러에서 에러를 만들어 Job ID 이벤트로 전달합니다. 이 때 (3) 에 등록한 이벤트 핸들러가 호출되어 콜백 함수에서 에러를 핸들링 합니다.

This post is licensed under CC BY 4.0 by the author.