kafka streams DSL: add an option parameter to disable repartition when using `map` `selectByKey` `groupBy`










0















According to the documents, streams will be marked for repartition when applied map selectKey groupBy even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?



Here is my user case:
there is a topic has been partitioned by user_id.



# topic 'user', format '%key,%value'
partition-1:
user1,'user_id':'user1', 'device_id':'device1'
user1,'user_id':'user1', 'device_id':'device1'
user1,'user_id':'user1', 'device_id':'device2'
partition-2:
user2,'user_id':'user2', 'device_id':'device3'
user2,'user_id':'user2', 'device_id':'device4'


I want to count user_id-device_id pairs using DSL as follow:



stream
.groupBy((user_id, value) ->
JSONObject event = new JSONObject(value);
String userId = event.getString('user_id');
String deviceId = event.getString('device_id');
return String.format("%s&%s", userId,deviceId);
)
.count();


Actually the new key has been partitioned indirectly. There is no need to do it again.










share|improve this question




























    0















    According to the documents, streams will be marked for repartition when applied map selectKey groupBy even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?



    Here is my user case:
    there is a topic has been partitioned by user_id.



    # topic 'user', format '%key,%value'
    partition-1:
    user1,'user_id':'user1', 'device_id':'device1'
    user1,'user_id':'user1', 'device_id':'device1'
    user1,'user_id':'user1', 'device_id':'device2'
    partition-2:
    user2,'user_id':'user2', 'device_id':'device3'
    user2,'user_id':'user2', 'device_id':'device4'


    I want to count user_id-device_id pairs using DSL as follow:



    stream
    .groupBy((user_id, value) ->
    JSONObject event = new JSONObject(value);
    String userId = event.getString('user_id');
    String deviceId = event.getString('device_id');
    return String.format("%s&%s", userId,deviceId);
    )
    .count();


    Actually the new key has been partitioned indirectly. There is no need to do it again.










    share|improve this question


























      0












      0








      0








      According to the documents, streams will be marked for repartition when applied map selectKey groupBy even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?



      Here is my user case:
      there is a topic has been partitioned by user_id.



      # topic 'user', format '%key,%value'
      partition-1:
      user1,'user_id':'user1', 'device_id':'device1'
      user1,'user_id':'user1', 'device_id':'device1'
      user1,'user_id':'user1', 'device_id':'device2'
      partition-2:
      user2,'user_id':'user2', 'device_id':'device3'
      user2,'user_id':'user2', 'device_id':'device4'


      I want to count user_id-device_id pairs using DSL as follow:



      stream
      .groupBy((user_id, value) ->
      JSONObject event = new JSONObject(value);
      String userId = event.getString('user_id');
      String deviceId = event.getString('device_id');
      return String.format("%s&%s", userId,deviceId);
      )
      .count();


      Actually the new key has been partitioned indirectly. There is no need to do it again.










      share|improve this question
















      According to the documents, streams will be marked for repartition when applied map selectKey groupBy even though the new key has been partitioned appropriately. Is it possible to add an option parameter to disable repartition ?



      Here is my user case:
      there is a topic has been partitioned by user_id.



      # topic 'user', format '%key,%value'
      partition-1:
      user1,'user_id':'user1', 'device_id':'device1'
      user1,'user_id':'user1', 'device_id':'device1'
      user1,'user_id':'user1', 'device_id':'device2'
      partition-2:
      user2,'user_id':'user2', 'device_id':'device3'
      user2,'user_id':'user2', 'device_id':'device4'


      I want to count user_id-device_id pairs using DSL as follow:



      stream
      .groupBy((user_id, value) ->
      JSONObject event = new JSONObject(value);
      String userId = event.getString('user_id');
      String deviceId = event.getString('device_id');
      return String.format("%s&%s", userId,deviceId);
      )
      .count();


      Actually the new key has been partitioned indirectly. There is no need to do it again.







      apache-kafka-streams






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 15 '18 at 2:17







      Lifei Chen

















      asked Nov 13 '18 at 4:47









      Lifei ChenLifei Chen

      609




      609






















          1 Answer
          1






          active

          oldest

          votes


















          0














          If you use .groupBy(), it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.



          In your case, you are changing the keys anyways, so that will create a re-partition topic.






          share|improve this answer























          • In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id

            – Lifei Chen
            Nov 15 '18 at 2:14












          • GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id

            – Nishu Tayal
            Nov 15 '18 at 8:36











          • Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record

            – Lifei Chen
            Nov 15 '18 at 9:33












          • No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning

            – Nishu Tayal
            Nov 15 '18 at 10:00











          • Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)

            – Lifei Chen
            Nov 15 '18 at 10:06










          Your Answer






          StackExchange.ifUsing("editor", function ()
          StackExchange.using("externalEditor", function ()
          StackExchange.using("snippets", function ()
          StackExchange.snippets.init();
          );
          );
          , "code-snippets");

          StackExchange.ready(function()
          var channelOptions =
          tags: "".split(" "),
          id: "1"
          ;
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function()
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled)
          StackExchange.using("snippets", function()
          createEditor();
          );

          else
          createEditor();

          );

          function createEditor()
          StackExchange.prepareEditor(
          heartbeatType: 'answer',
          autoActivateHeartbeat: false,
          convertImagesToLinks: true,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: 10,
          bindNavPrevention: true,
          postfix: "",
          imageUploader:
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          ,
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          );



          );













          draft saved

          draft discarded


















          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53273984%2fkafka-streams-dsl-add-an-option-parameter-to-disable-repartition-when-using-ma%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown

























          1 Answer
          1






          active

          oldest

          votes








          1 Answer
          1






          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes









          0














          If you use .groupBy(), it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.



          In your case, you are changing the keys anyways, so that will create a re-partition topic.






          share|improve this answer























          • In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id

            – Lifei Chen
            Nov 15 '18 at 2:14












          • GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id

            – Nishu Tayal
            Nov 15 '18 at 8:36











          • Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record

            – Lifei Chen
            Nov 15 '18 at 9:33












          • No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning

            – Nishu Tayal
            Nov 15 '18 at 10:00











          • Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)

            – Lifei Chen
            Nov 15 '18 at 10:06















          0














          If you use .groupBy(), it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.



          In your case, you are changing the keys anyways, so that will create a re-partition topic.






          share|improve this answer























          • In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id

            – Lifei Chen
            Nov 15 '18 at 2:14












          • GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id

            – Nishu Tayal
            Nov 15 '18 at 8:36











          • Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record

            – Lifei Chen
            Nov 15 '18 at 9:33












          • No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning

            – Nishu Tayal
            Nov 15 '18 at 10:00











          • Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)

            – Lifei Chen
            Nov 15 '18 at 10:06













          0












          0








          0







          If you use .groupBy(), it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.



          In your case, you are changing the keys anyways, so that will create a re-partition topic.






          share|improve this answer













          If you use .groupBy(), it always causes data re-partitioning. If possible use groupByKey instead, which will re-partition data only if required.



          In your case, you are changing the keys anyways, so that will create a re-partition topic.







          share|improve this answer












          share|improve this answer



          share|improve this answer










          answered Nov 14 '18 at 18:51









          Nishu TayalNishu Tayal

          11.8k73481




          11.8k73481












          • In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id

            – Lifei Chen
            Nov 15 '18 at 2:14












          • GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id

            – Nishu Tayal
            Nov 15 '18 at 8:36











          • Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record

            – Lifei Chen
            Nov 15 '18 at 9:33












          • No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning

            – Nishu Tayal
            Nov 15 '18 at 10:00











          • Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)

            – Lifei Chen
            Nov 15 '18 at 10:06

















          • In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id

            – Lifei Chen
            Nov 15 '18 at 2:14












          • GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id

            – Nishu Tayal
            Nov 15 '18 at 8:36











          • Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record

            – Lifei Chen
            Nov 15 '18 at 9:33












          • No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning

            – Nishu Tayal
            Nov 15 '18 at 10:00











          • Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)

            – Lifei Chen
            Nov 15 '18 at 10:06
















          In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id

          – Lifei Chen
          Nov 15 '18 at 2:14






          In fact, the data has been partitioned indirectly (data is repartitioned by user_id, but user_id_device_id also be partitioned too), so repartition is not needed. Besides, I do not want to change the key of kafka record, I just want the key I used for aggregation in kafka streams is user_id_device_id but not user_id

          – Lifei Chen
          Nov 15 '18 at 2:14














          GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id

          – Nishu Tayal
          Nov 15 '18 at 8:36





          GroupByKey/Aggregation won't change the key of the source topic. Instead, it creates a repartition topic as internal topic. That belongs to application.id

          – Nishu Tayal
          Nov 15 '18 at 8:36













          Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record

          – Lifei Chen
          Nov 15 '18 at 9:33






          Yes, my suggestion is whether it is possilbe that developer of kafka streams could add an option parameter to disable repartition (because it will generate repartition topics) and the key for aggregation not using the key of record directly but also key generated by value of record

          – Lifei Chen
          Nov 15 '18 at 9:33














          No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning

          – Nishu Tayal
          Nov 15 '18 at 10:00





          No, it is not possible at the moment in Streams DSL directly. You can low level Processor API to implement custom logic to avoid the repartitioning

          – Nishu Tayal
          Nov 15 '18 at 10:00













          Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)

          – Lifei Chen
          Nov 15 '18 at 10:06





          Thanks, I will approve the answer, if there are features about it, please mention it in later comments. :)

          – Lifei Chen
          Nov 15 '18 at 10:06

















          draft saved

          draft discarded
















































          Thanks for contributing an answer to Stack Overflow!


          • Please be sure to answer the question. Provide details and share your research!

          But avoid


          • Asking for help, clarification, or responding to other answers.

          • Making statements based on opinion; back them up with references or personal experience.

          To learn more, see our tips on writing great answers.




          draft saved


          draft discarded














          StackExchange.ready(
          function ()
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53273984%2fkafka-streams-dsl-add-an-option-parameter-to-disable-repartition-when-using-ma%23new-answer', 'question_page');

          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          這個網誌中的熱門文章

          How to read a connectionString WITH PROVIDER in .NET Core?

          Node.js Script on GitHub Pages or Amazon S3

          Museum of Modern and Contemporary Art of Trento and Rovereto