docker - kafka on kubernetes cannot produce/consume topics (ClosedChannelException, ErrorLoggingCallback) -


i run 1 kafka , 3 zookeeper-server in docker on kubernetes following instruction. cannot produce/consume topics outside pod(docker container).

bin/kafka-console-producer.sh --broker-list 1.2.3.4:9092 --topic  [2016-06-11 15:14:46,889] error error when sending message topic test key: null, value: 3 bytes error: (org.apache.kafka.clients.producer.internals.errorloggingcallback) org.apache.kafka.common.errors.timeoutexception: batch containing 3 record(s) expired due timeout while requesting metadata brokers test-0  bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning   [2016-06-11 15:15:58,985] warn fetching topic metadata correlation id 0 topics [set(test)] broker [brokerendpoint(1001,kafka-service,9092)] failed (kafka.client.clientutils$) java.nio.channels.closedchannelexception     @ kafka.network.blockingchannel.send(blockingchannel.scala:110)     @ kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:80)     @ kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:79)     @ kafka.producer.syncproducer.send(syncproducer.scala:124)     @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:59)     @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:94)     @ kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66)     @ kafka.utils.shutdownablethread.run(shutdownablethread.scala:63) [2016-06-11 15:15:58,992] warn [console-consumer-66869_tattoo-nv49c-1465629357799-ce1529da-leader-finder-thread], failed find leader set([test,0]) (kafka.consumer.consumerfetchermanager$leaderfinderthread) kafka.common.kafkaexception: fetching topic metadata topics [set(test)] broker [arraybuffer(brokerendpoint(1001,kafka-service,9092))] failed     @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:73)     @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:94)     @ kafka.consumer.consumerfetchermanager$leaderfinderthread.dowork(consumerfetchermanager.scala:66)     @ kafka.utils.shutdownablethread.run(shutdownablethread.scala:63) caused by: java.nio.channels.closedchannelexception     @ kafka.network.blockingchannel.send(blockingchannel.scala:110)     @ kafka.producer.syncproducer.liftedtree1$1(syncproducer.scala:80)     @ kafka.producer.syncproducer.kafka$producer$syncproducer$$dosend(syncproducer.scala:79)     @ kafka.producer.syncproducer.send(syncproducer.scala:124)     @ kafka.client.clientutils$.fetchtopicmetadata(clientutils.scala:59)     ... 3 more   kafka log:     [2016-06-11 07:47:58,269] info [kafka server 1001], started (kafka.server.kafkaserver) [2016-06-11 07:53:50,404] info [replicafetchermanager on broker 1001] removed fetcher partitions [test,0] (kafka.    server.replicafetchermanager) [2016-06-11 07:53:50,443] info completed load of log test-0 log end offset 0 (kafka.log.log) [2016-06-11 07:53:50,458] info created log partition [test,0] in /kafka/kafka-logs-kafka-controller-3rsv3     properties {compression.type -> producer, message.format.version -> 0.10.0-iv1, file.delete.delay.ms -> 60000, max.    message.bytes -> 1000012, message.timestamp.type -> createtime, min.insync.replicas -> 1, segment.jitter.ms -> 0,     preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable     -> true, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> delete, flush.ms ->     9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 604800000, message.    timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages ->     9223372036854775807}. (kafka.log.logmanager) [2016-06-11 07:53:50,459] info partition [test,0] on broker 1001: no checkpointed highwatermark found     partition [test,0] (kafka.cluster.partition) [2016-06-11 07:57:57,955] info [group metadata manager on broker 1001]: removed 0 expired offsets in 0 milliseconds. (    kafka.coordinator.groupmetadatamanager) 

and config/server.properties

broker.id=-1 log.dirs=/kafka/kafka-logs-kafka-controller-3rsv3 num.partitions=1 zookeeper.connect=zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181 zookeeper.connection.timeout.ms=6000  service.port.9092.tcp.addr=10.254.68.65 service.port.9092.tcp.proto=tcp service.service.port.kafka.port=9092 service.service.port=9092 service.port=tcp://10.254.68.65:9092 service.port.9092.tcp.port=9092 version=0.10.0.0 service.service.host=10.254.68.65 port=9092 advertised.host.name=kafka-service service.port.9092.tcp=tcp://10.254.68.65:9092 advertised.port=9092 

but can bin/kafka-console-producer.sh --broker-list localhost:9092 --topic , bin/kafka-console-consumer.sh --zookeeper 5.6.7.8:2181 --topic test --from-beginning if inside pod (docker container).

and can create , list topics when connecting zookeeper's service:

bin/kafka-topics.sh --describe --zookeeper 5.6.7.8:2181 --topic test topic:test  partitioncount:1    replicationfactor:1 configs:     topic: test partition: 0    leader: 1001    replicas: 1001  isr: 1001 

and yaml file creating kafka replicaiton-congtroller , service:

--- apiversion: v1 kind: service metadata:   name: kafka-service2   labels:     app: kafka2 spec:   clusterip: none   ports:   - port: 9092     name: kafka-port     targetport: 9092     protocol: tcp   selector:     app: kafka2 --- apiversion: v1 kind: replicationcontroller metadata:   name: kafka-controller2 spec:   replicas: 1   selector:     app: kafka2   template:     metadata:       labels:         app: kafka2     spec:       containers:       - name: kafka2         image: wurstmeister/kafka         ports:         - containerport: 9092         env:         - name: kafka_advertised_port           value: "9092"         - name: kafka_advertised_host_name           value: kafka-service2         - name: kafka_zookeeper_connect           value: zoo1:2181,zoo2:2181,zoo3:2181 

kafka register zookeeper service's name. , consuming/producing messages need access service names(here dns records on zookeeper-1, zookeeper-2, zookeeper-3), accessible through kubernetes' dns. application running on kubernetes can access kafka. therefore cannot use external ip of kafka-service or port-forwarding kafka pod localhost , access it.

but why can create, list , describe topics outside kubernetes cluster? guess because zookeepers can operation themselves. consuming/producing message need access advertised_host_name provided kafka.


Comments

Popular posts from this blog

wordpress - (T_ENDFOREACH) php error -

Export Excel workseet into txt file using vba - (text and numbers with formulas) -

Using django-mptt to get only the categories that have items -