KafkaStreams How to instantiate a ConsumerRecordFactory?
I am trying to use the ConsumerRecordFactory
provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:
// Properties of the application
Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");
// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
My problem is that when I compile my code I get the following error:
Error:(70, 52) java: reference to create is ambiguous
both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match
So I understand that kafka streams defines the generic method create(K,V,long)
and that when I create my factory with non generic types I create a new method which is in conflict with the first one.
My question is how am I supposed to instanciate my ConsumerRecordFactory
?
I tried making my factory more generic with ConsumerRecordFactory<Object, Integer>
but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory
, and this SO answer seems to be using the same code as the documentation.
(I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams
is a good way to reach people used to the ConsumerRecordFactory
)
apache-kafka-streams inferred-type
add a comment |
I am trying to use the ConsumerRecordFactory
provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:
// Properties of the application
Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");
// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
My problem is that when I compile my code I get the following error:
Error:(70, 52) java: reference to create is ambiguous
both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match
So I understand that kafka streams defines the generic method create(K,V,long)
and that when I create my factory with non generic types I create a new method which is in conflict with the first one.
My question is how am I supposed to instanciate my ConsumerRecordFactory
?
I tried making my factory more generic with ConsumerRecordFactory<Object, Integer>
but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory
, and this SO answer seems to be using the same code as the documentation.
(I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams
is a good way to reach people used to the ConsumerRecordFactory
)
apache-kafka-streams inferred-type
add a comment |
I am trying to use the ConsumerRecordFactory
provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:
// Properties of the application
Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");
// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
My problem is that when I compile my code I get the following error:
Error:(70, 52) java: reference to create is ambiguous
both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match
So I understand that kafka streams defines the generic method create(K,V,long)
and that when I create my factory with non generic types I create a new method which is in conflict with the first one.
My question is how am I supposed to instanciate my ConsumerRecordFactory
?
I tried making my factory more generic with ConsumerRecordFactory<Object, Integer>
but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory
, and this SO answer seems to be using the same code as the documentation.
(I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams
is a good way to reach people used to the ConsumerRecordFactory
)
apache-kafka-streams inferred-type
I am trying to use the ConsumerRecordFactory
provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:
// Properties of the application
Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");
// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();
// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);
// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
My problem is that when I compile my code I get the following error:
Error:(70, 52) java: reference to create is ambiguous
both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match
So I understand that kafka streams defines the generic method create(K,V,long)
and that when I create my factory with non generic types I create a new method which is in conflict with the first one.
My question is how am I supposed to instanciate my ConsumerRecordFactory
?
I tried making my factory more generic with ConsumerRecordFactory<Object, Integer>
but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory
, and this SO answer seems to be using the same code as the documentation.
(I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams
is a good way to reach people used to the ConsumerRecordFactory
)
apache-kafka-streams inferred-type
apache-kafka-streams inferred-type
edited Nov 15 '18 at 14:48
Nishu Tayal
12.8k73483
12.8k73483
asked Nov 15 '18 at 14:14
statoxstatox
1,23611129
1,23611129
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
There are some issues in the below code:
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer() );
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
- You have defined valueType as Integer in the
ConsumerRecordFactory
, but increate()
method you are passing Long type value. factory.create()
returns aConsumerRecord
instead ofConsumerRecordFactory
.
Regarding the ambiguity of method, you are right. So avoid that issue, use following :
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
new StringSerializer(),
new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);
You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?
– statox
Nov 15 '18 at 14:39
Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
– Nishu Tayal
Nov 15 '18 at 14:47
If you think it solves your issue, feel free to accept/upvote the answer :)
– Nishu Tayal
Nov 15 '18 at 14:48
Yup I was just reading the doc and testing a bit more before accepting :)
– statox
Nov 15 '18 at 14:51
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321384%2fkafkastreams-how-to-instantiate-a-consumerrecordfactory%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
There are some issues in the below code:
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer() );
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
- You have defined valueType as Integer in the
ConsumerRecordFactory
, but increate()
method you are passing Long type value. factory.create()
returns aConsumerRecord
instead ofConsumerRecordFactory
.
Regarding the ambiguity of method, you are right. So avoid that issue, use following :
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
new StringSerializer(),
new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);
You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?
– statox
Nov 15 '18 at 14:39
Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
– Nishu Tayal
Nov 15 '18 at 14:47
If you think it solves your issue, feel free to accept/upvote the answer :)
– Nishu Tayal
Nov 15 '18 at 14:48
Yup I was just reading the doc and testing a bit more before accepting :)
– statox
Nov 15 '18 at 14:51
add a comment |
There are some issues in the below code:
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer() );
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
- You have defined valueType as Integer in the
ConsumerRecordFactory
, but increate()
method you are passing Long type value. factory.create()
returns aConsumerRecord
instead ofConsumerRecordFactory
.
Regarding the ambiguity of method, you are right. So avoid that issue, use following :
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
new StringSerializer(),
new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);
You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?
– statox
Nov 15 '18 at 14:39
Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
– Nishu Tayal
Nov 15 '18 at 14:47
If you think it solves your issue, feel free to accept/upvote the answer :)
– Nishu Tayal
Nov 15 '18 at 14:48
Yup I was just reading the doc and testing a bit more before accepting :)
– statox
Nov 15 '18 at 14:51
add a comment |
There are some issues in the below code:
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer() );
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
- You have defined valueType as Integer in the
ConsumerRecordFactory
, but increate()
method you are passing Long type value. factory.create()
returns aConsumerRecord
instead ofConsumerRecordFactory
.
Regarding the ambiguity of method, you are right. So avoid that issue, use following :
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
new StringSerializer(),
new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);
There are some issues in the below code:
// Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer() );
// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);
- You have defined valueType as Integer in the
ConsumerRecordFactory
, but increate()
method you are passing Long type value. factory.create()
returns aConsumerRecord
instead ofConsumerRecordFactory
.
Regarding the ambiguity of method, you are right. So avoid that issue, use following :
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
new StringSerializer(),
new IntegerSerializer()
);
// Use ConsumerRecord here instead of ConsumerRecordFactory
ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);
answered Nov 15 '18 at 14:32
Nishu TayalNishu Tayal
12.8k73483
12.8k73483
You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?
– statox
Nov 15 '18 at 14:39
Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
– Nishu Tayal
Nov 15 '18 at 14:47
If you think it solves your issue, feel free to accept/upvote the answer :)
– Nishu Tayal
Nov 15 '18 at 14:48
Yup I was just reading the doc and testing a bit more before accepting :)
– statox
Nov 15 '18 at 14:51
add a comment |
You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?
– statox
Nov 15 '18 at 14:39
Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
– Nishu Tayal
Nov 15 '18 at 14:47
If you think it solves your issue, feel free to accept/upvote the answer :)
– Nishu Tayal
Nov 15 '18 at 14:48
Yup I was just reading the doc and testing a bit more before accepting :)
– statox
Nov 15 '18 at 14:51
You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?
– statox
Nov 15 '18 at 14:39
You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?
– statox
Nov 15 '18 at 14:39
Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
– Nishu Tayal
Nov 15 '18 at 14:47
Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,
– Nishu Tayal
Nov 15 '18 at 14:47
If you think it solves your issue, feel free to accept/upvote the answer :)
– Nishu Tayal
Nov 15 '18 at 14:48
If you think it solves your issue, feel free to accept/upvote the answer :)
– Nishu Tayal
Nov 15 '18 at 14:48
Yup I was just reading the doc and testing a bit more before accepting :)
– statox
Nov 15 '18 at 14:51
Yup I was just reading the doc and testing a bit more before accepting :)
– statox
Nov 15 '18 at 14:51
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321384%2fkafkastreams-how-to-instantiate-a-consumerrecordfactory%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown