docker - kafka on kubernetes cannot produce/consume topics (ClosedChannelException, ErrorLoggingCallback) -
i run 1 kafka , 3 zookeeper-server in docker on kubernetes following instruction. cannot produce/consume topics outside pod(docker container).
bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic [2016-06-11 15:14:46,889] error error when sending message topic test key: null, value: 3 bytes error: (org.apache.kafka.clients.producer.internals.errorloggingcallback) org.apache.kafka.common.errors.timeoutexception: batch containing 3 record(s) expired due timeout while requesting metadata brokers test-0 bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning [2016-06-11 15:15:58,985] warn fetching topic metadata correlation id 0 topics [set(test)] broker [brokerendpoint(1001,kafka-service,9092)] failed (kafka.client.clientutils$) java.nio.channels.closedchannelexception @ kafka.network.blockingchannel.send(blockingchannel.scala:110) @ kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:80) @ kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:79) @ kafka.producer.syncproducer.send(syncproducer.scala:124) @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:59) @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:94) @ kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66) @ kafka.utils.shutdownablethread.run(shutdownablethread.scala:63) [2016-06-11 15:15:58,992] warn [console-consumer-66869_tattoo-nv49c-1465629357799-ce1529da-leader-finder-thread], failed find leader set([test,0]) (kafka.consumer.consumerfetchermanager$leaderfinderthread) kafka.common.kafkaexception: fetching topic metadata topics [set(test)] broker [arraybuffer(brokerendpoint(1001,kafka-service,9092))] failed @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:73) @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:94) @ kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66) @ kafka.utils.shutdownablethread.run(shutdownablethread.scala:63) caused by: java.nio.channels.closedchannelexception @ kafka.network.blockingchannel.send(blockingchannel.scala:110) @ kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:80) @ kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:79) @ kafka.producer.syncproducer.send(syncproducer.scala:124) @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:59) ... 3 more kafka log: [2016-06-11 07:47:58,269] info [kafka server 1001], started (kafka.server.kafkaserver) [2016-06-11 07:53:50,404] info [replicafetchermanager on broker 1001] removed fetcher partitions [test,0] (kafka. server.replicafetchermanager) [2016-06-11 07:53:50,443] info completed load of log test-0 log end offset 0 (kafka.log.log) [2016-06-11 07:53:50,458] info created log partition [test,0] in /kafka/kafka-logs-kafka-controller-3rsv3 properties {compression.type -> producer, message.format.version -> 0.10.0-iv1, file.delete.delay.ms -> 60000, max. message.bytes -> 1000012, message.timestamp.type -> createtime, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message. timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.logmanager) [2016-06-11 07:53:50,459] info partition [test,0] on broker 1001: no checkpointed highwatermark found partition [test,0] (kafka.cluster.partition) [2016-06-11 07:57:57,955] info [group metadata manager on broker 1001]: removed 0 expired offsets in 0 milliseconds. ( kafka.coordinator.groupmetadatamanager)
and config/server.properties
broker.id=-1 log.dirs=/kafka/kafka-logs-kafka-controller-3rsv3 num.partitions=1 zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 zookeeper.connection.timeout.ms=6000 service.port.9092.tcp.addr=10.254.68.65 service.port.9092.tcp.proto=tcp service.service.port.kafka.port=9092 service.service.port=9092 service.port=tcp://10.254.68.65:9092 service.port.9092.tcp.port=9092 version=0.10.0.0 service.service.host=10.254.68.65 port=9092 advertised.host.name=kafka-service service.port.9092.tcp=tcp://10.254.68.65:9092 advertised.port=9092
but can bin/kafka-console-producer.sh --broker-list localhost:9092 --topic
, bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning
if inside pod (docker container).
and can create , list topics when connecting zookeeper's service:
bin/kafka-topics.sh --describe --zookeeper 5.6.7.8:2181 --topic test topic:test partitioncount:1 replicationfactor:1 configs: topic: test partition: 0 leader: 1001 replicas: 1001 isr: 1001
and yaml file creating kafka replicaiton-congtroller , service:
--- apiversion: v1 kind: service metadata: name: kafka-service2 labels: app: kafka2 spec: clusterip: none ports: - port: 9092 name: kafka-port targetport: 9092 protocol: tcp selector: app: kafka2 --- apiversion: v1 kind: replicationcontroller metadata: name: kafka-controller2 spec: replicas: 1 selector: app: kafka2 template: metadata: labels: app: kafka2 spec: containers: - name: kafka2 image: wurstmeister/kafka ports: - containerport: 9092 env: - name: kafka_advertised_port value: "9092" - name: kafka_advertised_host_name value: kafka-service2 - name: kafka_zookeeper_connect value: zoo1:2181,zoo2:2181,zoo3:2181
kafka register zookeeper service's name. , consuming/producing messages need access service names(here dns records on zookeeper-1, zookeeper-2, zookeeper-3), accessible through kubernetes' dns. application running on kubernetes can access kafka. therefore cannot use external ip of kafka-service or port-forwarding kafka pod localhost , access it.
but why can create, list , describe topics outside kubernetes cluster? guess because zookeepers can operation themselves. consuming/producing message need access advertised_host_name provided kafka.
Comments
Post a Comment