NodeJS kafka 测试读工具

const kafka = require('kafka-node');

let conn = {'kafkaHost':'10.10.0.21:9092'}; //ip和端口
let consumers = [
    {
        'type': 'consumer',
        'options': {'autoCommit': true},
        'name':'common',
        'topic':[
            //修改topic
            {'topic': 'data_output', 'partition': 0}
        ]
    }
];

let MQ = function(){

}

MQ.prototype.AddConsumer = function (conn, topics, options, handler){
    let client = new kafka.KafkaClient(conn);
    let consumer = new kafka.Consumer(client, topics, options);

    if(!!handler){
        consumer.on('message', handler);
    }

    consumer.on('error', function(err){
        console.error('consumer error ',err.stack);
    });
}

var mq = new MQ();


mq.AddConsumer(conn, consumers[0].topic, consumers[0].options, function (message){
    console.log(message.value);
});