I have been trying this for two weeks now, I am running Kafka cluster on separate machines than my connect nodes. I am unable to get connect running properly. I can read and write to kafka no issue. Zookeeper seems to be running fine.
I launch connect:
$ bin/connect-distributed connect-distributed.properties
Connect keeps looping through this error:
[2018-08-21 15:45:12,161] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,163] INFO [Worker clientId=c1, groupId=connect-cluster] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:509)
[2018-08-21 15:45:12,165] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
[2018-08-21 15:45:12,266] INFO [Worker clientId=c1, groupId=connect-cluster] Discovered group coordinator 172.25.40.219:9092 (id: 2147483645 rack: null) (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:677)
[2018-08-21 15:45:12,267] INFO [Worker clientId=c1, groupId=connect-cluster] Group coordinator 172.25.1.2:9092 (id: 2147483645 rack: null) is unavailable or invalid, will attempt rediscovery (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:729)
Here is what my connect-distributed.properties looks like:
bootstrap.servers=172.25.1.2:9092,172.25.1.3:9092,172.25.1.4:9092
group.id=connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=3
config.storage.topic=connect-configs
config.storage.replication.factor=3
status.storage.topic=connect-status
status.storage.replication.factor=3
status.storage.partitions=3
offset.flush.interval.ms=10000
rest.host.name=172.25.1.5
rest.port=8083
heartbeat.interval.ms=3000
session.timeout.ms=30000
security.protocol=PLAINTEXT
client.id=c1
plugin.path=/usr/share/java
__Consumer_offsets topic looks like this:
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:3 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: 2 Replicas: 1,2,3 Isr: 3,2
Topic: __consumer_offsets Partition: 1 Leader: 2 Replicas: 1,2,3 Isr: 3,2
Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 1,2,3 Isr: 3,2
Topic: __consumer_offsets Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 4 Leader: 2 Replicas: 2 Isr: 2.... etc
After writing a connector in Go, I came across the same issue. I was forced to solve it myself.
When a connector connects to kafka it automatically writes to the topics and to __offset_topics
. When a connector crashes, it leaves trace of itself in those tables as the coordinator. When a new connector starts up it finds the record in the table and attempts to communicate with the coordinator. The coordinator fails to respond and the connector never works.
You can fix this one of two ways, Delete all the topics (connect-configs
, connect-offsets
, connect-status
, __offset_topics
) and restart the cluster. The other method is to remove the coordinator from the topics, which I am currently unsure how to perform.