sending input from single spout to multiple bolts with Fields grouping in Apache Storm










1















builder.setSpout("spout", new TweetSpout());
builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
new Fields("field1"));


I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.



However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?



To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?










share|improve this question




























    1















    builder.setSpout("spout", new TweetSpout());
    builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
    new Fields("field1"));


    I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.



    However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?



    To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?










    share|improve this question


























      1












      1








      1








      builder.setSpout("spout", new TweetSpout());
      builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
      new Fields("field1"));


      I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.



      However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?



      To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?










      share|improve this question
















      builder.setSpout("spout", new TweetSpout());
      builder.setBolt("bolt", new TweetCounter(), 2).fieldsGrouping("spout",
      new Fields("field1"));


      I have an input field "field1" added in fields grouping. By definition of fields grouping, all tweets with same "field1" should go to a single task of TweetCounter. The executors # set for TweetCounter bolt is 2.



      However, if "field1" is the same in all the tuples of incoming stream, does this mean that even though I specified 2 executors for TweetCounter, the stream would only be sent to one of them and the other instance remains empty?



      To go further with my particular use case, how can I use a single spout and send data to different bolts based on a particular value of an input field (field1)?







      apache-storm






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Nov 15 '18 at 19:32







      Sahil

















      asked Nov 15 '18 at 18:23









      SahilSahil

      134




      134






















          2 Answers
          2






          active

          oldest

          votes


















          0














          It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :




          This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).




          You can see it's example uses here:



           collector.emitDirect(getWordCountIndex(word),new Values(word));


          where getWordCountIndex returns the index of the component where this tuple will be processes.






          share|improve this answer






























            0














            An alternative to using emitDirect as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.



            For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping as follows:



            public class ShuffleGrouping implements CustomStreamGrouping, Serializable 
            private ArrayList<List<Integer>> choices;
            private AtomicInteger current;

            @Override
            public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
            choices = new ArrayList<List<Integer>>(targetTasks.size());
            for (Integer i : targetTasks)
            choices.add(Arrays.asList(i));

            current = new AtomicInteger(0);
            Collections.shuffle(choices, new Random());


            @Override
            public List<Integer> chooseTasks(int taskId, List<Object> values)
            int rightNow;
            int size = choices.size();
            while (true)
            rightNow = current.incrementAndGet();
            if (rightNow < size)
            return choices.get(rightNow);
            else if (rightNow == size)
            current.set(0);
            return choices.get(0);

            // race condition with another thread, and we lost. try again




            Storm will call prepare to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:



            TopologyBuilder tp = new TopologyBuilder();
            tp.setSpout("spout", new MySpout(), 1);
            tp.setBolt("bolt", new MyBolt())
            .customGrouping("spout", new ShuffleGrouping());


            Be aware that groupings need to be Serializable and thread safe.






            share|improve this answer






















              Your Answer






              StackExchange.ifUsing("editor", function ()
              StackExchange.using("externalEditor", function ()
              StackExchange.using("snippets", function ()
              StackExchange.snippets.init();
              );
              );
              , "code-snippets");

              StackExchange.ready(function()
              var channelOptions =
              tags: "".split(" "),
              id: "1"
              ;
              initTagRenderer("".split(" "), "".split(" "), channelOptions);

              StackExchange.using("externalEditor", function()
              // Have to fire editor after snippets, if snippets enabled
              if (StackExchange.settings.snippets.snippetsEnabled)
              StackExchange.using("snippets", function()
              createEditor();
              );

              else
              createEditor();

              );

              function createEditor()
              StackExchange.prepareEditor(
              heartbeatType: 'answer',
              autoActivateHeartbeat: false,
              convertImagesToLinks: true,
              noModals: true,
              showLowRepImageUploadWarning: true,
              reputationToPostImages: 10,
              bindNavPrevention: true,
              postfix: "",
              imageUploader:
              brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
              contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
              allowUrls: true
              ,
              onDemand: true,
              discardSelector: ".discard-answer"
              ,immediatelyShowMarkdownHelp:true
              );



              );













              draft saved

              draft discarded


















              StackExchange.ready(
              function ()
              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325710%2fsending-input-from-single-spout-to-multiple-bolts-with-fields-grouping-in-apache%23new-answer', 'question_page');

              );

              Post as a guest















              Required, but never shown

























              2 Answers
              2






              active

              oldest

              votes








              2 Answers
              2






              active

              oldest

              votes









              active

              oldest

              votes






              active

              oldest

              votes









              0














              It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :




              This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).




              You can see it's example uses here:



               collector.emitDirect(getWordCountIndex(word),new Values(word));


              where getWordCountIndex returns the index of the component where this tuple will be processes.






              share|improve this answer



























                0














                It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :




                This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).




                You can see it's example uses here:



                 collector.emitDirect(getWordCountIndex(word),new Values(word));


                where getWordCountIndex returns the index of the component where this tuple will be processes.






                share|improve this answer

























                  0












                  0








                  0







                  It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :




                  This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).




                  You can see it's example uses here:



                   collector.emitDirect(getWordCountIndex(word),new Values(word));


                  where getWordCountIndex returns the index of the component where this tuple will be processes.






                  share|improve this answer













                  It seems one way to solved this problem is to use Direct grouping where the source decides which component will receive the tuple. :




                  This is a special kind of grouping. A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple. Direct groupings can only be declared on streams that have been declared as direct streams. Tuples emitted to a direct stream must be emitted using one of the [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect(int, int, java.util.List) methods. A bolt can get the task ids of its consumers by either using the provided TopologyContext or by keeping track of the output of the emit method in OutputCollector (which returns the task ids that the tuple was sent to).




                  You can see it's example uses here:



                   collector.emitDirect(getWordCountIndex(word),new Values(word));


                  where getWordCountIndex returns the index of the component where this tuple will be processes.







                  share|improve this answer












                  share|improve this answer



                  share|improve this answer










                  answered Nov 17 '18 at 10:01









                  SaurabhSaurabh

                  31k18103171




                  31k18103171























                      0














                      An alternative to using emitDirect as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.



                      For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping as follows:



                      public class ShuffleGrouping implements CustomStreamGrouping, Serializable 
                      private ArrayList<List<Integer>> choices;
                      private AtomicInteger current;

                      @Override
                      public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
                      choices = new ArrayList<List<Integer>>(targetTasks.size());
                      for (Integer i : targetTasks)
                      choices.add(Arrays.asList(i));

                      current = new AtomicInteger(0);
                      Collections.shuffle(choices, new Random());


                      @Override
                      public List<Integer> chooseTasks(int taskId, List<Object> values)
                      int rightNow;
                      int size = choices.size();
                      while (true)
                      rightNow = current.incrementAndGet();
                      if (rightNow < size)
                      return choices.get(rightNow);
                      else if (rightNow == size)
                      current.set(0);
                      return choices.get(0);

                      // race condition with another thread, and we lost. try again




                      Storm will call prepare to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:



                      TopologyBuilder tp = new TopologyBuilder();
                      tp.setSpout("spout", new MySpout(), 1);
                      tp.setBolt("bolt", new MyBolt())
                      .customGrouping("spout", new ShuffleGrouping());


                      Be aware that groupings need to be Serializable and thread safe.






                      share|improve this answer



























                        0














                        An alternative to using emitDirect as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.



                        For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping as follows:



                        public class ShuffleGrouping implements CustomStreamGrouping, Serializable 
                        private ArrayList<List<Integer>> choices;
                        private AtomicInteger current;

                        @Override
                        public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
                        choices = new ArrayList<List<Integer>>(targetTasks.size());
                        for (Integer i : targetTasks)
                        choices.add(Arrays.asList(i));

                        current = new AtomicInteger(0);
                        Collections.shuffle(choices, new Random());


                        @Override
                        public List<Integer> chooseTasks(int taskId, List<Object> values)
                        int rightNow;
                        int size = choices.size();
                        while (true)
                        rightNow = current.incrementAndGet();
                        if (rightNow < size)
                        return choices.get(rightNow);
                        else if (rightNow == size)
                        current.set(0);
                        return choices.get(0);

                        // race condition with another thread, and we lost. try again




                        Storm will call prepare to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:



                        TopologyBuilder tp = new TopologyBuilder();
                        tp.setSpout("spout", new MySpout(), 1);
                        tp.setBolt("bolt", new MyBolt())
                        .customGrouping("spout", new ShuffleGrouping());


                        Be aware that groupings need to be Serializable and thread safe.






                        share|improve this answer

























                          0












                          0








                          0







                          An alternative to using emitDirect as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.



                          For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping as follows:



                          public class ShuffleGrouping implements CustomStreamGrouping, Serializable 
                          private ArrayList<List<Integer>> choices;
                          private AtomicInteger current;

                          @Override
                          public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
                          choices = new ArrayList<List<Integer>>(targetTasks.size());
                          for (Integer i : targetTasks)
                          choices.add(Arrays.asList(i));

                          current = new AtomicInteger(0);
                          Collections.shuffle(choices, new Random());


                          @Override
                          public List<Integer> chooseTasks(int taskId, List<Object> values)
                          int rightNow;
                          int size = choices.size();
                          while (true)
                          rightNow = current.incrementAndGet();
                          if (rightNow < size)
                          return choices.get(rightNow);
                          else if (rightNow == size)
                          current.set(0);
                          return choices.get(0);

                          // race condition with another thread, and we lost. try again




                          Storm will call prepare to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:



                          TopologyBuilder tp = new TopologyBuilder();
                          tp.setSpout("spout", new MySpout(), 1);
                          tp.setBolt("bolt", new MyBolt())
                          .customGrouping("spout", new ShuffleGrouping());


                          Be aware that groupings need to be Serializable and thread safe.






                          share|improve this answer













                          An alternative to using emitDirect as described in this answer is to implement your own stream grouping. The complexity is about the same, but it allows you to reuse grouping logic across multiple bolts.



                          For example, the shuffle grouping in Storm is implemented as a CustomStreamGrouping as follows:



                          public class ShuffleGrouping implements CustomStreamGrouping, Serializable 
                          private ArrayList<List<Integer>> choices;
                          private AtomicInteger current;

                          @Override
                          public void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks)
                          choices = new ArrayList<List<Integer>>(targetTasks.size());
                          for (Integer i : targetTasks)
                          choices.add(Arrays.asList(i));

                          current = new AtomicInteger(0);
                          Collections.shuffle(choices, new Random());


                          @Override
                          public List<Integer> chooseTasks(int taskId, List<Object> values)
                          int rightNow;
                          int size = choices.size();
                          while (true)
                          rightNow = current.incrementAndGet();
                          if (rightNow < size)
                          return choices.get(rightNow);
                          else if (rightNow == size)
                          current.set(0);
                          return choices.get(0);

                          // race condition with another thread, and we lost. try again




                          Storm will call prepare to tell you the task ids your grouping is responsible for, as well as some context on the topology. When Storm emits a tuple from a bolt/spout where you're using this grouping, Storm will call chooseTasks which lets you define which tasks the tuple should go to. You would then use the grouping when building your topology as shown:



                          TopologyBuilder tp = new TopologyBuilder();
                          tp.setSpout("spout", new MySpout(), 1);
                          tp.setBolt("bolt", new MyBolt())
                          .customGrouping("spout", new ShuffleGrouping());


                          Be aware that groupings need to be Serializable and thread safe.







                          share|improve this answer












                          share|improve this answer



                          share|improve this answer










                          answered Nov 26 '18 at 18:59









                          Stig Rohde DøssingStig Rohde Døssing

                          1,871235




                          1,871235



























                              draft saved

                              draft discarded
















































                              Thanks for contributing an answer to Stack Overflow!


                              • Please be sure to answer the question. Provide details and share your research!

                              But avoid


                              • Asking for help, clarification, or responding to other answers.

                              • Making statements based on opinion; back them up with references or personal experience.

                              To learn more, see our tips on writing great answers.




                              draft saved


                              draft discarded














                              StackExchange.ready(
                              function ()
                              StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53325710%2fsending-input-from-single-spout-to-multiple-bolts-with-fields-grouping-in-apache%23new-answer', 'question_page');

                              );

                              Post as a guest















                              Required, but never shown





















































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown

































                              Required, but never shown














                              Required, but never shown












                              Required, but never shown







                              Required, but never shown







                              這個網誌中的熱門文章

                              How to read a connectionString WITH PROVIDER in .NET Core?

                              Node.js Script on GitHub Pages or Amazon S3

                              Museum of Modern and Contemporary Art of Trento and Rovereto