I try to create rabbit-mq publisher & subscriber. It works as expected until I try to restart my rabbit-mq server.
I use rabbitmq:3-management
docker image, ampqlib 5.3
, and Node.js 11.10.0
to make this simple program:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:[email protected]:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
So, first of all, I made two channels. One as publisher, and the other as consumer.
The publisher emit something to do
message to tasks
queue.
The consumer then catch the message and print it to the screen using console.log
.
It works as expected.
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:[email protected]:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
return channels;
})
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
Similar to my previous attempt, but this time I try to stop and start rabbit-mq container (restarting the server) before proceed.
It doesn't work, I get this error instead:
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
[guldan@draenor labs]$ node --version
v11.10.0
[guldan@draenor labs]$ docker start rabbitmq && node test.js
rabbitmq
{ Error: Channel ended, no reply will be forthcoming
at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)
at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)
at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)
at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)
at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)
at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)
at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (events.js:197:13)
at emitReadable_ (_stream_readable.js:539:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Channel ended, no reply will be forthcoming
at rej (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:195:7)
at Channel.C._rejectPending (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:197:28)
at Channel.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/channel.js:165:8)
at Connection.C._closeChannels (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:394:18)
at Connection.C.toClosed (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:401:8)
at Object.accept (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:96:18)
at Connection.mainAccept [as accept] (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (events.js:197:13)
at emitReadable_ (_stream_readable.js:539:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
My first attempt didn't work. So, I try to create new channel after restarting the server:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
function createChannel() {
return amqplib.connect("amqp://root:[email protected]:5672/")
.then((conn) => conn.createChannel());
}
Promise.all([createChannel(), createChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
return Promise.all([createChannel(), createChannel()]);
// return channels;
})
.then(async (channels) => {
const [publisherChannel, consumerChannel] = channels;
// publisher
await publisherChannel.assertQueue(q).then(function(ok) {
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
await consumerChannel.assertQueue(q).then(function(ok) {
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
And this time, I got this error instead:
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
I'm not really sure, but I think the error is related to It might be related to https://github.com/squaremo/amqp.node/issues/101.
I want workaround/solution to reconnect to rabbitmq after the server restarted. Any explanation/suggestion is also welcomed.
I try to go deeper and modify my code a bit:
const q = 'tasks';
const { execSync } = require("child_process");
const amqplib = require("amqplib");
async function createConnection() {
console.log("connect");
const conn = amqplib.connect("amqp://root:[email protected]:5672/");
console.log("connected");
return conn;
}
async function createChannel(conn) {
console.log("create channel");
const channel = conn.createChannel({durable: false});
console.log("channel created");
return channel;
}
async function createConnectionAndChannel() {
const conn = await createConnection();
const channel = await createChannel(conn);
return channel;
}
Promise.all([createConnectionAndChannel(), createConnectionAndChannel()])
.then((channels) => {
// Let's say rabbitmq is down, and then up again
console.log("restart server");
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
console.log("server restarted");
return Promise.all([createConnectionAndChannel(), createConnectionAndChannel()]);
// return channels;
})
.then(async (channels) => {
console.log("channels created");
const [publisherChannel, consumerChannel] = channels;
// publisher
console.log("publish");
await publisherChannel.assertQueue(q).then(function(ok) {
console.log("published");
return publisherChannel.sendToQueue(q, Buffer.from("something to do"));
});
// consumer
console.log("consume");
await consumerChannel.assertQueue(q).then(function(ok) {
console.log("consumed");
return consumerChannel.consume(q, function(msg) {
if (msg !== null) {
console.log(msg.content.toString());
consumerChannel.ack(msg);
}
});
});
})
.catch(console.warn);
And I get this output:
connect
connected
connect
connected
create channel
channel created
create channel
channel created
restart server
server restarted
connect
connected
connect
connected
{ Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17)
cause:
Error: Socket closed abruptly during opening handshake
at Socket.endWhileOpening (/home/guldan/Projects/kata/merapi-plugin-service-rabbit/node_modules/amqplib/lib/connection.js:260:17)
at Socket.emit (events.js:202:15)
at endReadableNT (_stream_readable.js:1129:12)
at processTicksAndRejections (internal/process/next_tick.js:76:17),
isOperational: true }
So I guess amqplib is able to re-connect but fail to create channel.
Finally I manage to find the answer:
const { execSync } = require("child_process");
const amqp = require("amqplib");
async function sleep(delay) {
return new Promise((resolve, reject) => {
setTimeout(resolve, delay);
});
}
async function createChannel(config) {
const { url, publishers, listeners } = Object.assign({url: "", publishers: {}, listeners: {}}, config);
try {
// create connection
const connection = await amqp.connect(url);
let channel = null;
connection._channels = [];
connection.on("error", (error) => {
console.error("Connection error : ", config, error);
});
connection.on("close", async (error) => {
if (channel) {
channel.close();
}
console.error("Connection close : ", config, error);
await sleep(1000);
createChannel(config);
});
// create channel
channel = await connection.createConfirmChannel();
channel.on("error", (error) => {
console.error("Channel error : ", config, error);
});
channel.on("close", (error) => {
console.error("Channel close : ", config, error);
});
// register listeners
for (queue in listeners) {
const callback = listeners[queue];
channel.assertQueue(queue, { durable: false });
channel.consume(queue, callback);
}
// publish
for (queue in publishers) {
const message = publishers[queue];
channel.assertQueue(queue, { durable: false });
channel.sendToQueue(queue, message);
}
return channel;
} catch (error) {
console.error("Create connection error : ", error);
await sleep(1000);
createChannel(config);
}
}
async function main() {
// publish "hello" message to queue
const channelPublish = await createChannel({
url: "amqp://root:[email protected]:5672",
publishers: {
"queue": Buffer.from("hello"),
}
});
// restart rabbitmq
execSync("docker stop rabbitmq");
execSync("docker start rabbitmq");
// consume message from queue
const channelConsume = await createChannel({
url: "amqp://root:[email protected]:5672",
listeners: {
"queue": (message) => {
console.log("Receive message ", message.content.toString());
},
}
});
return true;
}
main().catch((error) => console.error(error));
Basically, I attach the channel into connection. So whenever the connection yield error (e.g: rabbitmq server turned off), the program will wait for a second, and try to create a new connection.
The drawback is, I will loose the reference to the old connection and it's channel.
In order to overcome this trouble, I have store the queue, publishers, and consumers information somewhere else (in this case I put it as createChannel
's parameter).
Finally, every time I reconnect, I will also make the channel as well as constructing every publisher and consumer.
Not quite convenient, but at least it work as intended.