KafkaStreams How to specify Serdes in stream aggregation?










1















I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.



I have a KStream bankTransactions where the keys are of type String and the values of type JsonNode so I configured my app's Serdes with



// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();

config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());


I want to aggregate the values in a KTable<String, Long> where the keys will be the same but the values will be Longs extracted from my Json.



So firstly I wrote:



KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);


And I get the following error at runtime:



Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.


I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long. So reading from confluent's doc I tried this



KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);


But then I get an error at compilation:



Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>


I tried different way to write this code (e.g. using Serdes.long() instead of my longSerdes, trying to parametrize the types of Materialize and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.



So my question is simple: How to I properly specify the Serdes that aggregate should use when they are not the defaults Serdes?










share|improve this question


























    1















    I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.



    I have a KStream bankTransactions where the keys are of type String and the values of type JsonNode so I configured my app's Serdes with



    // Definition of the different Serdes used in the streams
    final Serde<String> stringSerde = Serdes.String();
    final Serde<JsonNode> jsonSerde = new JsonSerde();
    final Serde<Long> longSerde = Serdes.Long();

    config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
    config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());


    I want to aggregate the values in a KTable<String, Long> where the keys will be the same but the values will be Longs extracted from my Json.



    So firstly I wrote:



    KTable<String, Long> totalBalances = bankTransactions
    .groupByKey()
    .aggregate(
    () -> 0L,
    (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
    Materialized.as("bank-total-balance")
    );


    And I get the following error at runtime:



    Caused by: org.apache.kafka.streams.errors.StreamsException:
    A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
    the actual value type (value type: java.lang.Long).
    Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.


    I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long. So reading from confluent's doc I tried this



    KTable<String, Long> totalBalances = bankTransactions
    .groupByKey()
    .aggregate(
    () -> 0L,
    (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
    Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
    );


    But then I get an error at compilation:



    Error:(121, 89) java: incompatible types:
    org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
    to org.apache.kafka.common.serialization.Serde<java.lang.Object>


    I tried different way to write this code (e.g. using Serdes.long() instead of my longSerdes, trying to parametrize the types of Materialize and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.



    So my question is simple: How to I properly specify the Serdes that aggregate should use when they are not the defaults Serdes?










    share|improve this question
























      1












      1








      1








      I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.



      I have a KStream bankTransactions where the keys are of type String and the values of type JsonNode so I configured my app's Serdes with



      // Definition of the different Serdes used in the streams
      final Serde<String> stringSerde = Serdes.String();
      final Serde<JsonNode> jsonSerde = new JsonSerde();
      final Serde<Long> longSerde = Serdes.Long();

      config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
      config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());


      I want to aggregate the values in a KTable<String, Long> where the keys will be the same but the values will be Longs extracted from my Json.



      So firstly I wrote:



      KTable<String, Long> totalBalances = bankTransactions
      .groupByKey()
      .aggregate(
      () -> 0L,
      (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
      Materialized.as("bank-total-balance")
      );


      And I get the following error at runtime:



      Caused by: org.apache.kafka.streams.errors.StreamsException:
      A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
      the actual value type (value type: java.lang.Long).
      Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.


      I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long. So reading from confluent's doc I tried this



      KTable<String, Long> totalBalances = bankTransactions
      .groupByKey()
      .aggregate(
      () -> 0L,
      (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
      Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
      );


      But then I get an error at compilation:



      Error:(121, 89) java: incompatible types:
      org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
      to org.apache.kafka.common.serialization.Serde<java.lang.Object>


      I tried different way to write this code (e.g. using Serdes.long() instead of my longSerdes, trying to parametrize the types of Materialize and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.



      So my question is simple: How to I properly specify the Serdes that aggregate should use when they are not the defaults Serdes?










      share|improve this question














      I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.



      I have a KStream bankTransactions where the keys are of type String and the values of type JsonNode so I configured my app's Serdes with



      // Definition of the different Serdes used in the streams
      final Serde<String> stringSerde = Serdes.String();
      final Serde<JsonNode> jsonSerde = new JsonSerde();
      final Serde<Long> longSerde = Serdes.Long();

      config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
      config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());


      I want to aggregate the values in a KTable<String, Long> where the keys will be the same but the values will be Longs extracted from my Json.



      So firstly I wrote:



      KTable<String, Long> totalBalances = bankTransactions
      .groupByKey()
      .aggregate(
      () -> 0L,
      (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
      Materialized.as("bank-total-balance")
      );


      And I get the following error at runtime:



      Caused by: org.apache.kafka.streams.errors.StreamsException:
      A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
      the actual value type (value type: java.lang.Long).
      Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.


      I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long. So reading from confluent's doc I tried this



      KTable<String, Long> totalBalances = bankTransactions
      .groupByKey()
      .aggregate(
      () -> 0L,
      (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
      Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
      );


      But then I get an error at compilation:



      Error:(121, 89) java: incompatible types:
      org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
      to org.apache.kafka.common.serialization.Serde<java.lang.Object>


      I tried different way to write this code (e.g. using Serdes.long() instead of my longSerdes, trying to parametrize the types of Materialize and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.



      So my question is simple: How to I properly specify the Serdes that aggregate should use when they are not the defaults Serdes?







      apache-kafka apache-kafka-streams






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 13 '18 at 14:28









      statoxstatox

      1,22811029




      1,22811029






















          1 Answer
          1






          active

          oldest

          votes


















          1














          It seems like the correct syntax is the following:



          KTable<String, Long> totalBalances = bankTransactions
          .groupByKey()
          .aggregate(
          () -> 0L,
          (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
          Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
          .withKeySerde(stringSerde)
          .withValueSerde(longSerde)
          );


          The three types after Materialize. are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.



          Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.






          share|improve this answer























          • Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples

            – Matthias J. Sax
            Nov 13 '18 at 23:37











          • @MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples

            – statox
            Nov 14 '18 at 8:32










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53283232%2fkafkastreams-how-to-specify-serdes-in-stream-aggregation%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          1














          It seems like the correct syntax is the following:



          KTable<String, Long> totalBalances = bankTransactions
          .groupByKey()
          .aggregate(
          () -> 0L,
          (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
          Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
          .withKeySerde(stringSerde)
          .withValueSerde(longSerde)
          );


          The three types after Materialize. are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.



          Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.






          share|improve this answer























          • Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples

            – Matthias J. Sax
            Nov 13 '18 at 23:37











          • @MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples

            – statox
            Nov 14 '18 at 8:32















          1














          It seems like the correct syntax is the following:



          KTable<String, Long> totalBalances = bankTransactions
          .groupByKey()
          .aggregate(
          () -> 0L,
          (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
          Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
          .withKeySerde(stringSerde)
          .withValueSerde(longSerde)
          );


          The three types after Materialize. are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.



          Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.






          share|improve this answer























          • Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples

            – Matthias J. Sax
            Nov 13 '18 at 23:37











          • @MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples

            – statox
            Nov 14 '18 at 8:32













          1












          1








          1







          It seems like the correct syntax is the following:



          KTable<String, Long> totalBalances = bankTransactions
          .groupByKey()
          .aggregate(
          () -> 0L,
          (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
          Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
          .withKeySerde(stringSerde)
          .withValueSerde(longSerde)
          );


          The three types after Materialize. are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.



          Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.






          share|improve this answer













          It seems like the correct syntax is the following:



          KTable<String, Long> totalBalances = bankTransactions
          .groupByKey()
          .aggregate(
          () -> 0L,
          (key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
          Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
          .withKeySerde(stringSerde)
          .withValueSerde(longSerde)
          );


          The three types after Materialize. are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.



          Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 13 '18 at 14:57









          statoxstatox

          1,22811029




          1,22811029












          • Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples

            – Matthias J. Sax
            Nov 13 '18 at 23:37











          • @MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples

            – statox
            Nov 14 '18 at 8:32

















          • Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples

            – Matthias J. Sax
            Nov 13 '18 at 23:37











          • @MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples

            – statox
            Nov 14 '18 at 8:32
















          Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples

          – Matthias J. Sax
          Nov 13 '18 at 23:37





          Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples

          – Matthias J. Sax
          Nov 13 '18 at 23:37













          @MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples

          – statox
          Nov 14 '18 at 8:32





          @MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples

          – statox
          Nov 14 '18 at 8:32

















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53283232%2fkafkastreams-how-to-specify-serdes-in-stream-aggregation%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          How to read a connectionString WITH PROVIDER in .NET Core?

          In R, how to develop a multiplot heatmap.2 figure showing key labels successfully

          Museum of Modern and Contemporary Art of Trento and Rovereto