KafkaStreams How to instantiate a ConsumerRecordFactory?










1















I am trying to use the ConsumerRecordFactory provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:



// Properties of the application
Properties streamsConfiguration = new Properties();

// Give the Streams application a unique name. The name must be unique in the Kafka cluster
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

// Create the topology builder
StreamsBuilder builder = new StreamsBuilder();

// Run it on the test driver
TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

// Feed input data
ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
"input-topic",
new StringSerializer(),
new IntegerSerializer()
);

// Create a test record
ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


My problem is that when I compile my code I get the following error:



Error:(70, 52) java: reference to create is ambiguous
both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match


So I understand that kafka streams defines the generic method create(K,V,long) and that when I create my factory with non generic types I create a new method which is in conflict with the first one.



My question is how am I supposed to instanciate my ConsumerRecordFactory?



I tried making my factory more generic with ConsumerRecordFactory<Object, Integer> but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory, and this SO answer seems to be using the same code as the documentation.



(I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams is a good way to reach people used to the ConsumerRecordFactory)










share|improve this question




























    1















    I am trying to use the ConsumerRecordFactory provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:



    // Properties of the application
    Properties streamsConfiguration = new Properties();

    // Give the Streams application a unique name. The name must be unique in the Kafka cluster
    streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
    streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

    // Create the topology builder
    StreamsBuilder builder = new StreamsBuilder();

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
    "input-topic",
    new StringSerializer(),
    new IntegerSerializer()
    );

    // Create a test record
    ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


    My problem is that when I compile my code I get the following error:



    Error:(70, 52) java: reference to create is ambiguous
    both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
    and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match


    So I understand that kafka streams defines the generic method create(K,V,long) and that when I create my factory with non generic types I create a new method which is in conflict with the first one.



    My question is how am I supposed to instanciate my ConsumerRecordFactory?



    I tried making my factory more generic with ConsumerRecordFactory<Object, Integer> but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory, and this SO answer seems to be using the same code as the documentation.



    (I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams is a good way to reach people used to the ConsumerRecordFactory)










    share|improve this question


























      1












      1








      1








      I am trying to use the ConsumerRecordFactory provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:



      // Properties of the application
      Properties streamsConfiguration = new Properties();

      // Give the Streams application a unique name. The name must be unique in the Kafka cluster
      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

      // Create the topology builder
      StreamsBuilder builder = new StreamsBuilder();

      // Run it on the test driver
      TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

      // Feed input data
      ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
      "input-topic",
      new StringSerializer(),
      new IntegerSerializer()
      );

      // Create a test record
      ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


      My problem is that when I compile my code I get the following error:



      Error:(70, 52) java: reference to create is ambiguous
      both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
      and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match


      So I understand that kafka streams defines the generic method create(K,V,long) and that when I create my factory with non generic types I create a new method which is in conflict with the first one.



      My question is how am I supposed to instanciate my ConsumerRecordFactory?



      I tried making my factory more generic with ConsumerRecordFactory<Object, Integer> but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory, and this SO answer seems to be using the same code as the documentation.



      (I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams is a good way to reach people used to the ConsumerRecordFactory)










      share|improve this question
















      I am trying to use the ConsumerRecordFactory provided by Kafka Streams following mainly the confluent doc to test a streaming application, here is the code I have so far:



      // Properties of the application
      Properties streamsConfiguration = new Properties();

      // Give the Streams application a unique name. The name must be unique in the Kafka cluster
      streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "testing_application");
      streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyserver:2181");

      // Create the topology builder
      StreamsBuilder builder = new StreamsBuilder();

      // Run it on the test driver
      TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), streamsConfiguration);

      // Feed input data
      ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
      "input-topic",
      new StringSerializer(),
      new IntegerSerializer()
      );

      // Create a test record
      ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


      My problem is that when I compile my code I get the following error:



      Error:(70, 52) java: reference to create is ambiguous
      both method create(K,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory
      and method create(java.lang.String,V,long) in org.apache.kafka.streams.test.ConsumerRecordFactory match


      So I understand that kafka streams defines the generic method create(K,V,long) and that when I create my factory with non generic types I create a new method which is in conflict with the first one.



      My question is how am I supposed to instanciate my ConsumerRecordFactory?



      I tried making my factory more generic with ConsumerRecordFactory<Object, Integer> but then the inferred type doesn't match. And I can't find other example the confluent github repo kafka-streams-examples doesn't seem to use a ConsumerRecordFactory, and this SO answer seems to be using the same code as the documentation.



      (I am aware that the problem is more about java than about kafka streams but I thought that tagging it with apache-kafka-streams is a good way to reach people used to the ConsumerRecordFactory)







      apache-kafka-streams inferred-type






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 15 '18 at 14:48









      Nishu Tayal

      12.8k73483




      12.8k73483










      asked Nov 15 '18 at 14:14









      statoxstatox

      1,23611129




      1,23611129






















          1 Answer
          1






          active

          oldest

          votes


















          1














          There are some issues in the below code:



          // Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
          "input-topic",
          new StringSerializer(),
          new IntegerSerializer() );

          // Create a test record
          ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


          1. You have defined valueType as Integer in the ConsumerRecordFactory, but in create() method you are passing Long type value.


          2. factory.create() returns a ConsumerRecord instead of ConsumerRecordFactory.

          Regarding the ambiguity of method, you are right. So avoid that issue, use following :



          ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
          new StringSerializer(),
          new IntegerSerializer()
          );
          // Use ConsumerRecord here instead of ConsumerRecordFactory
          ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);





          share|improve this answer























          • You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?

            – statox
            Nov 15 '18 at 14:39











          • Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,

            – Nishu Tayal
            Nov 15 '18 at 14:47











          • If you think it solves your issue, feel free to accept/upvote the answer :)

            – Nishu Tayal
            Nov 15 '18 at 14:48











          • Yup I was just reading the doc and testing a bit more before accepting :)

            – statox
            Nov 15 '18 at 14:51










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321384%2fkafkastreams-how-to-instantiate-a-consumerrecordfactory%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          There are some issues in the below code:



          // Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
          "input-topic",
          new StringSerializer(),
          new IntegerSerializer() );

          // Create a test record
          ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


          1. You have defined valueType as Integer in the ConsumerRecordFactory, but in create() method you are passing Long type value.


          2. factory.create() returns a ConsumerRecord instead of ConsumerRecordFactory.

          Regarding the ambiguity of method, you are right. So avoid that issue, use following :



          ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
          new StringSerializer(),
          new IntegerSerializer()
          );
          // Use ConsumerRecord here instead of ConsumerRecordFactory
          ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);





          share|improve this answer























          • You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?

            – statox
            Nov 15 '18 at 14:39











          • Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,

            – Nishu Tayal
            Nov 15 '18 at 14:47











          • If you think it solves your issue, feel free to accept/upvote the answer :)

            – Nishu Tayal
            Nov 15 '18 at 14:48











          • Yup I was just reading the doc and testing a bit more before accepting :)

            – statox
            Nov 15 '18 at 14:51















          1














          There are some issues in the below code:



          // Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
          "input-topic",
          new StringSerializer(),
          new IntegerSerializer() );

          // Create a test record
          ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


          1. You have defined valueType as Integer in the ConsumerRecordFactory, but in create() method you are passing Long type value.


          2. factory.create() returns a ConsumerRecord instead of ConsumerRecordFactory.

          Regarding the ambiguity of method, you are right. So avoid that issue, use following :



          ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
          new StringSerializer(),
          new IntegerSerializer()
          );
          // Use ConsumerRecord here instead of ConsumerRecordFactory
          ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);





          share|improve this answer























          • You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?

            – statox
            Nov 15 '18 at 14:39











          • Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,

            – Nishu Tayal
            Nov 15 '18 at 14:47











          • If you think it solves your issue, feel free to accept/upvote the answer :)

            – Nishu Tayal
            Nov 15 '18 at 14:48











          • Yup I was just reading the doc and testing a bit more before accepting :)

            – statox
            Nov 15 '18 at 14:51













          1












          1








          1







          There are some issues in the below code:



          // Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
          "input-topic",
          new StringSerializer(),
          new IntegerSerializer() );

          // Create a test record
          ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


          1. You have defined valueType as Integer in the ConsumerRecordFactory, but in create() method you are passing Long type value.


          2. factory.create() returns a ConsumerRecord instead of ConsumerRecordFactory.

          Regarding the ambiguity of method, you are right. So avoid that issue, use following :



          ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
          new StringSerializer(),
          new IntegerSerializer()
          );
          // Use ConsumerRecord here instead of ConsumerRecordFactory
          ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);





          share|improve this answer













          There are some issues in the below code:



          // Feed input data ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>(
          "input-topic",
          new StringSerializer(),
          new IntegerSerializer() );

          // Create a test record
          ConsumerRecordFactory<byte, byte> record = factory.create("key", 42L);


          1. You have defined valueType as Integer in the ConsumerRecordFactory, but in create() method you are passing Long type value.


          2. factory.create() returns a ConsumerRecord instead of ConsumerRecordFactory.

          Regarding the ambiguity of method, you are right. So avoid that issue, use following :



          ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>( 
          new StringSerializer(),
          new IntegerSerializer()
          );
          // Use ConsumerRecord here instead of ConsumerRecordFactory
          ConsumerRecord<byte, byte> record = factory.create("input-topic","key", 42);






          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 15 '18 at 14:32









          Nishu TayalNishu Tayal

          12.8k73483




          12.8k73483












          • You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?

            – statox
            Nov 15 '18 at 14:39











          • Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,

            – Nishu Tayal
            Nov 15 '18 at 14:47











          • If you think it solves your issue, feel free to accept/upvote the answer :)

            – Nishu Tayal
            Nov 15 '18 at 14:48











          • Yup I was just reading the doc and testing a bit more before accepting :)

            – statox
            Nov 15 '18 at 14:51

















          • You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?

            – statox
            Nov 15 '18 at 14:39











          • Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,

            – Nishu Tayal
            Nov 15 '18 at 14:47











          • If you think it solves your issue, feel free to accept/upvote the answer :)

            – Nishu Tayal
            Nov 15 '18 at 14:48











          • Yup I was just reading the doc and testing a bit more before accepting :)

            – statox
            Nov 15 '18 at 14:51
















          You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?

          – statox
          Nov 15 '18 at 14:39





          You're right about the types error, it was a bad copy past but is not my actual code. About your solution the the factory it seems to be working, but I don't understand why not defining the topic name in the factory but rather in the record solves the problem?

          – statox
          Nov 15 '18 at 14:39













          Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,

          – Nishu Tayal
          Nov 15 '18 at 14:47





          Great. This ambiguity will occur when your <K> is String. But if you define some other key type, it will work as the method signature will be unique. You can find more details here : kafka.apache.org/11/javadoc/org/apache/kafka/streams/test/…,

          – Nishu Tayal
          Nov 15 '18 at 14:47













          If you think it solves your issue, feel free to accept/upvote the answer :)

          – Nishu Tayal
          Nov 15 '18 at 14:48





          If you think it solves your issue, feel free to accept/upvote the answer :)

          – Nishu Tayal
          Nov 15 '18 at 14:48













          Yup I was just reading the doc and testing a bit more before accepting :)

          – statox
          Nov 15 '18 at 14:51





          Yup I was just reading the doc and testing a bit more before accepting :)

          – statox
          Nov 15 '18 at 14:51



















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53321384%2fkafkastreams-how-to-instantiate-a-consumerrecordfactory%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          How to read a connectionString WITH PROVIDER in .NET Core?

          Node.js Script on GitHub Pages or Amazon S3

          Museum of Modern and Contemporary Art of Trento and Rovereto