KafkaConsumer java client is not getting the records from topic but command line is retriving
up vote
0
down vote
favorite
I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092
.
I'm able to produce record to the topic from java and retrieve it from console using below command.
$ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false
In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.
package com.example;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");
try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);
consumer.commitSync();
But the consumer is not retrieving records and it is always displaying "waiting for data".
What is the mistake in my java code. How can I retrieve data using java consumer.
apache-kafka kafka-consumer-api
add a comment |
up vote
0
down vote
favorite
I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092
.
I'm able to produce record to the topic from java and retrieve it from console using below command.
$ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false
In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.
package com.example;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");
try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);
consumer.commitSync();
But the consumer is not retrieving records and it is always displaying "waiting for data".
What is the mistake in my java code. How can I retrieve data using java consumer.
apache-kafka kafka-consumer-api
2
did you try changing consumergroup.id
– Deadpool
Nov 11 at 6:28
Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44
1
@Deadpool I believe is right. The moment you run this__consumer_offsets
should contain the next offset to be read in per yourgroup.id
. If you change thegroup.id
it will receive the contents again.auto.offset.reset
only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to callseekToBeginning(consumer.assignment())
after the initialpoll
.
– Daniel Hinojosa
Nov 11 at 18:57
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092
.
I'm able to produce record to the topic from java and retrieve it from console using below command.
$ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false
In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.
package com.example;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");
try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);
consumer.commitSync();
But the consumer is not retrieving records and it is always displaying "waiting for data".
What is the mistake in my java code. How can I retrieve data using java consumer.
apache-kafka kafka-consumer-api
I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092
.
I'm able to produce record to the topic from java and retrieve it from console using below command.
$ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false
In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.
package com.example;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;
public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");
try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);
consumer.commitSync();
But the consumer is not retrieving records and it is always displaying "waiting for data".
What is the mistake in my java code. How can I retrieve data using java consumer.
apache-kafka kafka-consumer-api
apache-kafka kafka-consumer-api
asked Nov 11 at 2:02
Rajkumar Natarajan
9821033
9821033
2
did you try changing consumergroup.id
– Deadpool
Nov 11 at 6:28
Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44
1
@Deadpool I believe is right. The moment you run this__consumer_offsets
should contain the next offset to be read in per yourgroup.id
. If you change thegroup.id
it will receive the contents again.auto.offset.reset
only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to callseekToBeginning(consumer.assignment())
after the initialpoll
.
– Daniel Hinojosa
Nov 11 at 18:57
add a comment |
2
did you try changing consumergroup.id
– Deadpool
Nov 11 at 6:28
Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44
1
@Deadpool I believe is right. The moment you run this__consumer_offsets
should contain the next offset to be read in per yourgroup.id
. If you change thegroup.id
it will receive the contents again.auto.offset.reset
only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to callseekToBeginning(consumer.assignment())
after the initialpoll
.
– Daniel Hinojosa
Nov 11 at 18:57
2
2
did you try changing consumer
group.id
– Deadpool
Nov 11 at 6:28
did you try changing consumer
group.id
– Deadpool
Nov 11 at 6:28
Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44
Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44
1
1
@Deadpool I believe is right. The moment you run this
__consumer_offsets
should contain the next offset to be read in per your group.id
. If you change the group.id
it will receive the contents again. auto.offset.reset
only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment())
after the initial poll
.– Daniel Hinojosa
Nov 11 at 18:57
@Deadpool I believe is right. The moment you run this
__consumer_offsets
should contain the next offset to be read in per your group.id
. If you change the group.id
it will receive the contents again. auto.offset.reset
only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment())
after the initial poll
.– Daniel Hinojosa
Nov 11 at 18:57
add a comment |
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245217%2fkafkaconsumer-java-client-is-not-getting-the-records-from-topic-but-command-line%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
2
did you try changing consumer
group.id
– Deadpool
Nov 11 at 6:28
Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44
1
@Deadpool I believe is right. The moment you run this
__consumer_offsets
should contain the next offset to be read in per yourgroup.id
. If you change thegroup.id
it will receive the contents again.auto.offset.reset
only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to callseekToBeginning(consumer.assignment())
after the initialpoll
.– Daniel Hinojosa
Nov 11 at 18:57