KafkaConsumer java client is not getting the records from topic but command line is retriving









up vote
0
down vote

favorite












I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092.



I'm able to produce record to the topic from java and retrieve it from console using below command.



 $ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false


In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.



 package com.example;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");

try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);

consumer.commitSync();







But the consumer is not retrieving records and it is always displaying "waiting for data".



What is the mistake in my java code. How can I retrieve data using java consumer.










share|improve this question

















  • 2




    did you try changing consumer group.id
    – Deadpool
    Nov 11 at 6:28










  • Technically, the console consumer is also a Java consumer
    – cricket_007
    Nov 11 at 7:44







  • 1




    @Deadpool I believe is right. The moment you run this __consumer_offsets should contain the next offset to be read in per your group.id. If you change the group.id it will receive the contents again. auto.offset.reset only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment()) after the initial poll.
    – Daniel Hinojosa
    Nov 11 at 18:57















up vote
0
down vote

favorite












I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092.



I'm able to produce record to the topic from java and retrieve it from console using below command.



 $ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false


In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.



 package com.example;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");

try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);

consumer.commitSync();







But the consumer is not retrieving records and it is always displaying "waiting for data".



What is the mistake in my java code. How can I retrieve data using java consumer.










share|improve this question

















  • 2




    did you try changing consumer group.id
    – Deadpool
    Nov 11 at 6:28










  • Technically, the console consumer is also a Java consumer
    – cricket_007
    Nov 11 at 7:44







  • 1




    @Deadpool I believe is right. The moment you run this __consumer_offsets should contain the next offset to be read in per your group.id. If you change the group.id it will receive the contents again. auto.offset.reset only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment()) after the initial poll.
    – Daniel Hinojosa
    Nov 11 at 18:57













up vote
0
down vote

favorite









up vote
0
down vote

favorite











I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092.



I'm able to produce record to the topic from java and retrieve it from console using below command.



 $ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false


In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.



 package com.example;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");

try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);

consumer.commitSync();







But the consumer is not retrieving records and it is always displaying "waiting for data".



What is the mistake in my java code. How can I retrieve data using java consumer.










share|improve this question













I have customer-avro topic in kafka cluster with broker list kafka1.com:9092,kafka2.com:9092,kafka3.com:9092.



I'm able to produce record to the topic from java and retrieve it from console using below command.



 $ sudo bin/kafka-avro-console-consumer --bootstrap-server kafka1.com:9092,kafka2.com:9092,kafka3.com:9092 --topic customer-avro --property schema.registry.url=http://schemaregistry1.com:8081 --from-beginning
OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase from one, then you should configure the number of parallel GC threads appropriately using -XX:ParallelGCThreads=N
"first_name":"Mottu","last_name":"kalidasan","age":24,"height":5.4,"weight":6.4,"automated_email":false


In the above command it is obvious there is a record in the topic. So I written below consumer logic to retrieve records.



 package com.example;

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerV1
public static void main(String args)
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1.com:9092,kafka2.com:9092,kafka3.com:9092");
props.setProperty("group.id", "my-avro-consumer");
props.setProperty("enable.auto.commit", "false");
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.setProperty("schema.registry.url", "http://schemaregistry1.com:8081");
props.setProperty("specific.avro.reader", "true");

try(KafkaConsumer<String, Customer> consumer = new KafkaConsumer<String, Customer>(props))
String topic = "customer-avro";
consumer.subscribe(Collections.singleton(topic));
System.out.println("Waiting for data...");
while (true)
ConsumerRecords<String, Customer> records = consumer.poll(500);
for (ConsumerRecord<String, Customer> record : records)
record.key();
Customer customer = record.value();
System.out.println("customer "+ customer);

consumer.commitSync();







But the consumer is not retrieving records and it is always displaying "waiting for data".



What is the mistake in my java code. How can I retrieve data using java consumer.







apache-kafka kafka-consumer-api






share|improve this question













share|improve this question











share|improve this question




share|improve this question










asked Nov 11 at 2:02









Rajkumar Natarajan

9821033




9821033







  • 2




    did you try changing consumer group.id
    – Deadpool
    Nov 11 at 6:28










  • Technically, the console consumer is also a Java consumer
    – cricket_007
    Nov 11 at 7:44







  • 1




    @Deadpool I believe is right. The moment you run this __consumer_offsets should contain the next offset to be read in per your group.id. If you change the group.id it will receive the contents again. auto.offset.reset only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment()) after the initial poll.
    – Daniel Hinojosa
    Nov 11 at 18:57













  • 2




    did you try changing consumer group.id
    – Deadpool
    Nov 11 at 6:28










  • Technically, the console consumer is also a Java consumer
    – cricket_007
    Nov 11 at 7:44







  • 1




    @Deadpool I believe is right. The moment you run this __consumer_offsets should contain the next offset to be read in per your group.id. If you change the group.id it will receive the contents again. auto.offset.reset only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment()) after the initial poll.
    – Daniel Hinojosa
    Nov 11 at 18:57








2




2




did you try changing consumer group.id
– Deadpool
Nov 11 at 6:28




did you try changing consumer group.id
– Deadpool
Nov 11 at 6:28












Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44





Technically, the console consumer is also a Java consumer
– cricket_007
Nov 11 at 7:44





1




1




@Deadpool I believe is right. The moment you run this __consumer_offsets should contain the next offset to be read in per your group.id. If you change the group.id it will receive the contents again. auto.offset.reset only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment()) after the initial poll.
– Daniel Hinojosa
Nov 11 at 18:57





@Deadpool I believe is right. The moment you run this __consumer_offsets should contain the next offset to be read in per your group.id. If you change the group.id it will receive the contents again. auto.offset.reset only applies if there is no valid offset. The moment you run it once, you have a valid offset. If you always want to receive from the beginning then you need to call seekToBeginning(consumer.assignment()) after the initial poll.
– Daniel Hinojosa
Nov 11 at 18:57


















active

oldest

votes











Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245217%2fkafkaconsumer-java-client-is-not-getting-the-records-from-topic-but-command-line%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown






























active

oldest

votes













active

oldest

votes









active

oldest

votes






active

oldest

votes















 

draft saved


draft discarded















































 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245217%2fkafkaconsumer-java-client-is-not-getting-the-records-from-topic-but-command-line%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







這個網誌中的熱門文章

Barbados

How to read a connectionString WITH PROVIDER in .NET Core?

Node.js Script on GitHub Pages or Amazon S3