[eggjs/egg]Egg中如何消费RabbitMQ队列

2024-07-22 380 views
4

首先原谅我是一个Egg使用新手,业务需求是需要监听多个RabbitMQ队列,将消息存入数据库 我的使用方式:在app.js中这样使用

class AppBootHook {
  constructor(app) {
    this.app = app;
  }
  async didReady() {
    // Server is listening.
    const ctx = await this.app.createAnonymousContext();
    await ctx.service.amqp.amqplib.consumerService('test1', 'toDb');
    await ctx.service.amqp.amqplib.consumerService('test2', 'toDb');
  }
}
module.exports = AppBootHook;

在service文件夹中的amqp文件中是这样的

const amqplib = require('amqplib');

class ConsumerService extends Service {
async consumerService(queueName, channel) {
    const { ctx } = this;
    var open = amqplib.connect('amqp://127.0.0.1:5672');
    open.then(function(conn) {
      return conn.createChannel();
    }).then(function(ch) {
      return ch.assertQueue(queueName).then(function(ok) {
        return ch.consume(queueName, async function(msg) {
          if (msg !== null) {
            var message = JSON.parse(msg.content.toString());
            if(channel == 'toDb'){
              // console.log(message)
              var type = message.type;
              switch(type){
                case 'email':
                  await ctx.service.amqp.mqDb.addEmailService(message.data);
                  // 上面的方法中有一堆数据库的操作
                  break;
              }
            }
            ch.ack(msg);
          }
        });
      });
    }).catch(console.warn);
  }
}

我的问题是:

  1. 我使用的方法可以实现队列消息的消费,但是不知道是不是完美的,或者您有什么指导意见
  2. 上面的消费队列的方法有一个问题,当队列中有几十条消息或者更多,队列消息不会等上一条消息存到数据库之后,再去消费下一条消息,会多条消息同时操作数据库(同时操作数据库造成数据写入更新混乱)
  3. 使用的是 amqplib 插件,是否是插件使用有问题

希望可以得到解答,谢谢!

回答

4

找到一种解决方法,再说明该方法之前,再说一下项目中的应用场景:

  1. 监听消息队列,消费消息
  2. 拿到消息进行数据库操作(中间有更新、查找、插入等),因为前面的消息跟后面的消息可能会操作数据库中同一条记录,所以必须保证消息的顺序消费

目前使用的是给消费消息的方法加一个锁,我使用的是 async-lock 插件,目前可以实现上面说到的应用场景,但是出现新的问题:消息队列中的消息很快就都会进入到没有ack(但是对消息的消费仍然是顺序的),目前没有找到好的解决方法,如果您有什么建议,欢迎指导! image 下面是代码:(消费队列的代码与上面问题中的代码只是书写形式不同,使用了await和async)

// 1. 创建链接对象
    const connection = await amqplib.connect('amqp://vankle:vankle123@47.254.94.232:5672');
    // 2. 获取通道
    const ch = await connection.createChannel();
    // 3. 声明参数
    // 4. 声明队列,交换机默认为 AMQP default
    await ch.assertQueue(queueName);
    var num = 0;
    await ch.consume(queueName, async (msg) => {
      var message = JSON.parse(msg.content.toString());

      lock.acquire('key', async (done) => {
        console.log('上锁了...')
        await ctx.service.test.echo(message);
        ch.ack(msg);
        console.log('锁打开了...')
        done();
      }, function(err, ret) {
      }, {});
    });

image

6

唉,无语... 刚才跑了一下,消息少的时候还是可以的,消息如果特别多(刚才我这里有待消费的3000多条),程序运行过程中卡住了,也不知道什么原因... 希望哪个大佬看到这个问题解答一下,不胜感激!

7

最近刚好也在学rabbitmq使用,提一下我个人的想法。 1、看了一下,你这里有两个队列操作数据库(test1和test2)?个人觉得要保证顺序执行更新数据库操作的话,这里要将操作相同数据库的操作放在一个队列里面,否则的话分成两个队列很难保证执行的顺序性(将操作同样数据库的放进队列,由于队列是先进先出的特点,所以执行的操作也会是顺序性的)。 2、Unacked问题:我这边模拟了一下,应该是你的队列没有设置prefetch的问题,prefetch属性用于控制每次发给消费者的消息数目最大值,如果没设置的话会将队列中的消息一次性发给消费者(这里不确定是不是全部,肯定不止一条就是了),消费者将所有消息消费之后才会返回ack,我这边设置了ch.prefetch(1)之后打断点调试可以看到每执行一次消费Unacked数就会减一,如果不设置需要等所有消息消费完了Unacked才会归零。 3、程序卡住问题:这个感觉可能要结合场景分析了,我这边曾遇到或思考过的场景是:a、检查sql操作是否会锁表,比如update之类的。b、操作数据库的sql语句有性能问题,频繁调用导致io阻塞了。总之这个的话需要结合系统资源占用情况还有具体的代码才好排查。我自己也只是半桶水(以前只遇到过性能很差的sql查询大量数据,导致io负载高网站卡住的情况),如果找到问题了可以一起分享哈 4、我也是刚学的,上面的只是我的一些看法,并不一定是对的,如果有大神看到有问题的地方欢迎指教,共同学习~

9

@dark-tone 谢谢你的回复

  1. 操作两个数据库是对不同类型的消息进行了区分(不同类型消息之前不会操作统一个数据库)
  2. 这个我需要检查一下吧,队列也不是我维护的?
  3. 操作数据库执行很多更新插入操作,对程序跟踪排错的能力还需要再提高一下? 另外,我总觉得我的队列监控方法写到app.js文件里面不规范,但是又不知道该写到那里,您有没有什么看法?
6

@TheShineJames @dark-tone @thonatos @huacnlee 你可以监听到队列的消息变化,或者说队列中有消息推送到egg客户端嘛?为什么我通过定时任务去获取消息队列中的消息,每次只能获取到一条

1

这个跟egg没关系,直接看amqplib的文档就行http://www.squaremobius.net/amqp.node/,文档很详细。 你可以封装成插件:https://www.npmjs.com/package/@eggplugin/rabbitmq , channel创建之后复用channel。

4

每次取get肯定要是一个,如果想取多个可以多次get。 最好不要主动去取,比如队列为空了,你需要主加定时器循环get查看队列是否有新数据加入,consume是最合适的方式,如果有就通知你,不需要定时器,prefetch设置你每次期望收到的最大数量

9

@TheShineJames 最后解决了吗?怎么使用的

4

没有解决,公司之前用的 Java 写的,想改 Node,最后还是跑的原来的代码,Egg 代码算是白写

1

@Cikayo 白写,我靠

8

可以设置 prefetch 为1 当只有有一条正在被消费的时候,会等待这一条确认之后 才会推送新的消息过来。就能实现LZ的问题了