现在的位置: 首页 > 问答 > 正文

Can't get kafka console producer or consumer to work

2016年10月22日 问答 ⁄ 共 2389字 ⁄ 字号 暂无评论

I was able to get kafka to work fine when I spun it up on my local machine. But when I try to get it to work on an AWS instance nothing seems to work right. I tried spinning up my own server and doing just like I did locally spinning up zookeeper and kafka like so

curl http://apache.spinellicreations.com/kafka/0.10.0.0/kafka_2.11-0.10.0.0.tgz | tar -xzf

cd kafka_2.11-0.10.0.0 

bin/zookeeper-server-start.sh config/zookeeper.properties &

bin/kafka-server-start.sh config/server.properties &

I also tried using the AMI from bitami which seems to be an all in one AMI. Creating the topic seems to work fine. But when I try to run the console producer I get an error

SEASPAULSON-MAC:kafka_2.11-0.10.0.0 spaulson$ bin/kafka-console-producer.sh --broker-list ec2-54-186-31-109.us-west-2.compute.amazonaws.com:9092 --topic test
blah
[2016-10-20 12:13:23,395] ERROR Error when sending message to topic test with key: null, value: 4 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Batch containing 1 record(s) expired due to timeout while requesting metadata from brokers for test-0

I also get an error when I try to start up a console consumer that repeats over and over.

bin/kafka-console-consumer.sh --zookeeper ec2-54-186-31-109.us-west-2.compute.amazonaws.com:2181 --topic test --from-beginning


[2016-10-19 18:26:47,175] WARN Fetching topic metadata with correlation id 152 for topics [Set(test)] from broker [BrokerEndPoint(0,ip-172-31-52-58.ec2.internal,9092)] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException
        at kafka.network.BlockingChannel.send(BlockingChannel.scala:110)
        at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:80)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:79)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:124)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:59)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:94)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)

I feel like these kinds of operations should be trivial but it's proving very challenging. I'm having trouble finding documentation on how to diagnose issues and figure out what's going wrong. The best I found is this command

KAFKA_HOME/bin/kafka-topics.sh --describe --topic test --zookeeper ec2-54-186-31-109.us-west-2.compute.amazonaws.com:2181
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

Does the Leader: 0 indicate something went wrong? But what?

给我留言

您必须 [ 登录 ] 才能发表留言!

×