KafkaStreams How to specify Serdes in stream aggregation?
I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.
I have a KStream bankTransactions
where the keys are of type String
and the values of type JsonNode
so I configured my app's Serdes with
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
I want to aggregate the values in a KTable<String, Long>
where the keys will be the same but the values will be Long
s extracted from my Json.
So firstly I wrote:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
And I get the following error at runtime:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long
. So reading from confluent's doc I tried this
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
But then I get an error at compilation:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
I tried different way to write this code (e.g. using Serdes.long()
instead of my longSerdes
, trying to parametrize the types of Materialize
and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.
So my question is simple: How to I properly specify the Serdes that aggregate
should use when they are not the defaults Serdes?
apache-kafka apache-kafka-streams
add a comment |
I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.
I have a KStream bankTransactions
where the keys are of type String
and the values of type JsonNode
so I configured my app's Serdes with
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
I want to aggregate the values in a KTable<String, Long>
where the keys will be the same but the values will be Long
s extracted from my Json.
So firstly I wrote:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
And I get the following error at runtime:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long
. So reading from confluent's doc I tried this
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
But then I get an error at compilation:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
I tried different way to write this code (e.g. using Serdes.long()
instead of my longSerdes
, trying to parametrize the types of Materialize
and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.
So my question is simple: How to I properly specify the Serdes that aggregate
should use when they are not the defaults Serdes?
apache-kafka apache-kafka-streams
add a comment |
I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.
I have a KStream bankTransactions
where the keys are of type String
and the values of type JsonNode
so I configured my app's Serdes with
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
I want to aggregate the values in a KTable<String, Long>
where the keys will be the same but the values will be Long
s extracted from my Json.
So firstly I wrote:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
And I get the following error at runtime:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long
. So reading from confluent's doc I tried this
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
But then I get an error at compilation:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
I tried different way to write this code (e.g. using Serdes.long()
instead of my longSerdes
, trying to parametrize the types of Materialize
and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.
So my question is simple: How to I properly specify the Serdes that aggregate
should use when they are not the defaults Serdes?
apache-kafka apache-kafka-streams
I am working on a Kafka streams application and I have some trouble figuring out how to make an aggregation work.
I have a KStream bankTransactions
where the keys are of type String
and the values of type JsonNode
so I configured my app's Serdes with
// Definition of the different Serdes used in the streams
final Serde<String> stringSerde = Serdes.String();
final Serde<JsonNode> jsonSerde = new JsonSerde();
final Serde<Long> longSerde = Serdes.Long();
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, jsonSerde.getClass().getName());
I want to aggregate the values in a KTable<String, Long>
where the keys will be the same but the values will be Long
s extracted from my Json.
So firstly I wrote:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance")
);
And I get the following error at runtime:
Caused by: org.apache.kafka.streams.errors.StreamsException:
A serializer (value: org.apache.kafka.connect.json.JsonSerializer) is not compatible to
the actual value type (value type: java.lang.Long).
Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.
I understand that Kafka is complaining because I'm trying to use the default Json serdes to serialize a Long
. So reading from confluent's doc I tried this
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.as("bank-total-balance").withValueSerde(Serdes.Long())
);
But then I get an error at compilation:
Error:(121, 89) java: incompatible types:
org.apache.kafka.common.serialization.Serde<java.lang.Long> cannot be converted
to org.apache.kafka.common.serialization.Serde<java.lang.Object>
I tried different way to write this code (e.g. using Serdes.long()
instead of my longSerdes
, trying to parametrize the types of Materialize
and even trying to write my initializer and aggregator as function, Java 7 style) but I can't figure out what I am doing wrong.
So my question is simple: How to I properly specify the Serdes that aggregate
should use when they are not the defaults Serdes?
apache-kafka apache-kafka-streams
apache-kafka apache-kafka-streams
asked Nov 13 '18 at 14:28
statoxstatox
1,22811029
1,22811029
add a comment |
add a comment |
1 Answer
1
active
oldest
votes
It seems like the correct syntax is the following:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
The three types after Materialize.
are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.
Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.
Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples
– Matthias J. Sax
Nov 13 '18 at 23:37
@MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples
– statox
Nov 14 '18 at 8:32
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53283232%2fkafkastreams-how-to-specify-serdes-in-stream-aggregation%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
It seems like the correct syntax is the following:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
The three types after Materialize.
are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.
Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.
Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples
– Matthias J. Sax
Nov 13 '18 at 23:37
@MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples
– statox
Nov 14 '18 at 8:32
add a comment |
It seems like the correct syntax is the following:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
The three types after Materialize.
are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.
Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.
Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples
– Matthias J. Sax
Nov 13 '18 at 23:37
@MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples
– statox
Nov 14 '18 at 8:32
add a comment |
It seems like the correct syntax is the following:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
The three types after Materialize.
are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.
Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.
It seems like the correct syntax is the following:
KTable<String, Long> totalBalances = bankTransactions
.groupByKey()
.aggregate(
() -> 0L,
(key, transaction, balance) -> (Long)((Long)balance + transaction.get("amount").asLong()),
Materialized.<String, Long, KeyValueStore<Bytes, byte>>as("bank-total-balances")
.withKeySerde(stringSerde)
.withValueSerde(longSerde)
);
The three types after Materialize.
are the ones of the key, of the value and the one of the store used to materialized the KTable and this one shouldn't change. Then we can define the Serdes used to write in this key value store.
Note I got this syntax from a random repo found on github, I would still gladly accept an answer with a more precise answer backed by some documentation.
answered Nov 13 '18 at 14:57
statoxstatox
1,22811029
1,22811029
Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples
– Matthias J. Sax
Nov 13 '18 at 23:37
@MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples
– statox
Nov 14 '18 at 8:32
add a comment |
Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples
– Matthias J. Sax
Nov 13 '18 at 23:37
@MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples
– statox
Nov 14 '18 at 8:32
Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples
– Matthias J. Sax
Nov 13 '18 at 23:37
Using Java8 you would not need to specify the generic types. In Java7, if the type in unknown and Object is assume for the value and thus the case fails. It's not specific to KafkaStreams but some Java magic. Check out the example repo and check out the Java7 examples: github.com/confluentinc/kafka-streams-examples
– Matthias J. Sax
Nov 13 '18 at 23:37
@MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples
– statox
Nov 14 '18 at 8:32
@MatthiasJ.Sax Thanks for pointing that out, I'll take a look at the examples
– statox
Nov 14 '18 at 8:32
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53283232%2fkafkastreams-how-to-specify-serdes-in-stream-aggregation%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown