首先原谅我是一个Egg使用新手,业务需求是需要监听多个RabbitMQ队列,将消息存入数据库 我的使用方式:在app.js中这样使用
class AppBootHook {
constructor(app) {
this.app = app;
}
async didReady() {
// Server is listening.
const ctx = await this.app.createAnonymousContext();
await ctx.service.amqp.amqplib.consumerService('test1', 'toDb');
await ctx.service.amqp.amqplib.consumerService('test2', 'toDb');
}
}
module.exports = AppBootHook;
在service文件夹中的amqp文件中是这样的
const amqplib = require('amqplib');
class ConsumerService extends Service {
async consumerService(queueName, channel) {
const { ctx } = this;
var open = amqplib.connect('amqp://127.0.0.1:5672');
open.then(function(conn) {
return conn.createChannel();
}).then(function(ch) {
return ch.assertQueue(queueName).then(function(ok) {
return ch.consume(queueName, async function(msg) {
if (msg !== null) {
var message = JSON.parse(msg.content.toString());
if(channel == 'toDb'){
// console.log(message)
var type = message.type;
switch(type){
case 'email':
await ctx.service.amqp.mqDb.addEmailService(message.data);
// 上面的方法中有一堆数据库的操作
break;
}
}
ch.ack(msg);
}
});
});
}).catch(console.warn);
}
}
我的问题是:
- 我使用的方法可以实现队列消息的消费,但是不知道是不是完美的,或者您有什么指导意见
- 上面的消费队列的方法有一个问题,当队列中有几十条消息或者更多,队列消息不会等上一条消息存到数据库之后,再去消费下一条消息,会多条消息同时操作数据库(同时操作数据库造成数据写入更新混乱)
- 使用的是 amqplib 插件,是否是插件使用有问题
希望可以得到解答,谢谢!