Message queue congestion caused by node.js using rabbitmq

problem description

use distributed crawlers to crawl asynchronously using proxy ip. The amqplib library of NODE uses a single crawler as a consumer, crawls the corresponding page after receiving the message and then stores it into the database. Multiple processes are crawled by PM2 on one machine, and multiple consumers on the same machine receive and crawl messages. A large number of page requests fail in the middle. After the request fails, nack is used as the processing tool. The problem is that with the increase of running time. The speed of receiving messages will be slower and slower. After restarting all the crawlers, the speed will increase. I don"t know much about the mechanism of rabbitmq. Why does the boss appear this kind of phenomenon

?

the specific code is as follows
/ / bind message queue
async function run () {

//MQ
ch = await amqpcfg.createChannel(configs)
//
let consumeParam = configs.ampq.Queue.Param
//
let receiveQueueName = configs.ampq.Queue.receive.QueueName
//
ch.consume(receiveQueueName, async(msg) => {
    try {
        doWork(msg)
    } catch (e) {
        console.log("\033[47;31m Work,: \033[40;31m " +e.stack  + "\033[0m")
    }
}, consumeParam)
    

}
/ / message queuing callback, which is the callback after receiving the message. HotelsSpider is the specific network request for asynchronous crawling
async function doWork (msg) {

let messageId = msg.properties.messageId
let Task = JSON.parse(transToString(msg.content))
//console.log(Task)
//
if (!Task.SpiderParams.PlatHotelNum || !Task.SpiderParams.TargerDate) {
    ch.ack(msg)
    await ResultTask(messageId, Task, "", 4)
    return
}

//1~4
let param = {
    hotelId: Task.SpiderParams.PlatHotelNum,
    day: getAfterDay(Task.SpiderParams.TargerDate, 0),
    GuestNum: Task.SpiderParams.GuestNum
}
//console.log(` Received ${JSON.stringify(param)} `)

let details
try {
    details = await HotelsSpider(messageId, param) // await taskQueue(messageId, param)
            Task.CompleteTime = moment().format()
            Task.State = (details && details.length ? 2 : 3)
            
                    //mongoDB
                    let mId = await MongoDbInsertOne({
                            "Task": Task,
                            "details": details
                    })
                    Task.DateID = mId
            //}
            //
            ch.ack(msg)
            await ResultTask(messageId, Task, "", Task.State)
    //console.log("details", details.length)
} catch (e) {
    let error = `${param.hotelId}-${param.day}-${param.GuestNum},message:${e.message.substr(0,100)}`
    console.log("\033[47;31m ,: \033[40;31m " + error + "\033[0m")
    ch.nack(msg);
    errorCountPP
    return
}
resultNum += 1
console.log(`--${getProcessRate(startTime, resultNum)}/,${moment().format("LT")},:{${details.length}},:${param.hotelId}-${param.day}-${param.GuestNum}`)
details = null

}
/ / specific network requests in
async function getHttpResp (hotelId, ins, out, adults) {

try {
    let url = `${configs.URL.base}/h${hotelId}.Hotel-Information`
    let params = {
        "chkin": ins,
        "chkout": out,
        "adults": adults,
        "mctc": 2
    }

    url = url + "?" + qs.stringify(params)
    let proxy = `http://${global.proxy.ip}:${global.proxy.port}`
    let res = await request_Hotels(url, proxy)
    let html = res.toString()
    return getExpediaHotelInfoByHtml(html)
} catch (e) {
    let error = `${hotelId},${ins},${out},${adults}`
    let message = e.message
    console.log("\033[47;31m ,: \033[40;31m " + error + "\033[0m")
    
    let errorFilter = errorCodeArray.find(item => {
        return message.indexOf(item) !== -1
    })
    if(errorFilter){
        errorTimeCount(30)
    }
    //,
    throw e
}

}

the network request time is not too long. In the request-promise library used, the timeout time is set at about 10 seconds, but the speed of receiving messages will be significantly slowed down after running for dozens of minutes. Once you have tried to open only one crawler on a machine and increase the number of messages received, the speed is very fast at the beginning, but there will also be congestion problems after running for a period of time

.
May.16,2022

probably consume messages all the time, regardless of whether they are finished or not, add ch.prefetch (10) to try.

Menu