I tried to connect to kafka? with eggjs

I use eggjs to try to connect to kafka, so that topic corresponds to the file name under app/kafka, and finally executes all the methods under the file name. If I want to deal with the business in a separate directory under the app directory, at first I imagine that I can"t find the corresponding controller method when the onmessage is triggered by loadController,kafka, and finally solve it in a rough way. I don"t know if there"s any problem, if there"s anything I can give you. The code is as follows:

load kafka:

//kafka-load
import * as path from "path"
export default app => {
    let dirs = app.loader
        .getLoadUnits()
        .map(unit => path.join(unit.path, "app", "kafka"))

    app.kafka = app.kafka || {}
    new app.loader.FileLoader({
        directory: dirs,
        target: app.kafka,
        initializer: (kafka, opts) => {
            const fileName = path.basename(opts.path, path.extname(opts.path))
            Object.keys(kafka).map(action => {
                if (!app.kafka[fileName]) {
                    app.kafka[fileName] = {}
                }
                app.kafka[fileName][action] = kafka[action]
            })
            return null
        }
    }).load()
}


kafka connect

//kafka connect
import * as Kafka from "kafka-node"
import { KafkaConfig } from "../config/config.d"
import load from "./kafkaLoad"

export default app => {
    load(app)
    const config: KafkaConfig = app.config.kafka
    const zookeepers = config.host.join(",")
    const client = new Kafka.Client(zookeepers, config.clientId)
    const consumer = new Kafka.Consumer(client, config.topics, config.options)
    const topics = config.topics.map(item => item.topic)

    consumer.on("message", message => {
        const topicConsumers = app.kafka[message.topic]
        if (topicConsumers) {
            Object.keys(topicConsumers).map(name =>
                topicConsumers[name].call(app, message.value)
            )
        }
        app.logger.info(
            `[egg-kafka] Receive producer message`,
            JSON.stringify(message)
        )
    })

    consumer.on("error", error => {
        app.coreLogger.error(`[egg-kafka] init instance error`, error)
    })

    app.beforeStart(() => {
        app.coreLogger.info(
            `[egg-kafka] init instance success ,host@${zookeepers} -----> topic@${topics}`
        )
    })
}

kafka controller

export interface TopicNodejsMethods {
    test1(message: { [key: string]: any }): Promise<any>
    test2(message: { [key: string]: any }): Promise<any>
}

export type TopicNodejs = Application & TopicNodejsMethods

export default {
    async test1(message) {

        this.io.of("/").emit("passAlarm",message)
    },
    async test2(message) {
        console.log(message)
    }
} as TopicNodejs

Mar.20,2021
Menu