How to expire state of dropDuplicates in structured streaming to avoid OOM?










4















I want to count the unique access for each day using spark structured streaming, so I use the following code



.dropDuplicates("uuid")


and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:



.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")


but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.



So is there a perfect solution?










share|improve this question




























    4















    I want to count the unique access for each day using spark structured streaming, so I use the following code



    .dropDuplicates("uuid")


    and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:



    .withWatermark("timestamp", "1 day")
    .dropDuplicates("uuid", "timestamp")


    but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.



    So is there a perfect solution?










    share|improve this question


























      4












      4








      4








      I want to count the unique access for each day using spark structured streaming, so I use the following code



      .dropDuplicates("uuid")


      and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:



      .withWatermark("timestamp", "1 day")
      .dropDuplicates("uuid", "timestamp")


      but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.



      So is there a perfect solution?










      share|improve this question
















      I want to count the unique access for each day using spark structured streaming, so I use the following code



      .dropDuplicates("uuid")


      and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:



      .withWatermark("timestamp", "1 day")
      .dropDuplicates("uuid", "timestamp")


      but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.



      So is there a perfect solution?







      apache-spark duplicates apache-spark-sql out-of-memory spark-structured-streaming






      share|improve this question















      share|improve this question













      share|improve this question




      share|improve this question








      edited Oct 27 '18 at 13:13









      user6910411

      34.7k1082105




      34.7k1082105










      asked Aug 3 '17 at 3:27









      KevinKevin

      969




      969






















          3 Answers
          3






          active

          oldest

          votes


















          5














          After a few days effort I finally find out the way myself.



          While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:



          .select(
          window($"timestamp", "1 day"),
          $"timestamp",
          $"uuid"
          )
          .withWatermark("window", "1 day")
          .dropDuplicates("uuid", "window")


          Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.






          share|improve this answer






























            0














            Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
            buckets. Assumption is that event time is provided in milliseconds.



            // removes all duplicates that are in 15 minutes tumbling window.
            // doesn't remove duplicates that are in different 15 minutes windows !!!!
            public static Dataset<Row> removeDuplicates(Dataset<Row> df)
            // converts time in 15 minute buckets
            // timestamp - (timestamp % (15 * 60))
            Column bucketCol = functions.to_timestamp(
            col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
            df = df.withColumn("bucket", bucketCol);

            String windowDuration = "15 minutes";
            df = df.withWatermark("bucket", windowDuration)
            .dropDuplicates("uuid", "bucket");

            return df.drop("bucket");






            share|improve this answer
































              0














              I found out that window function didn't work so I chose to use window.start or window.end.



              .select(
              window($"timestamp", "1 day").start,
              $"timestamp",
              $"uuid"
              )
              .withWatermark("window", "1 day")
              .dropDuplicates("uuid", "window")





              share|improve this answer






















                Your Answer






                StackExchange.ifUsing("editor", function ()
                StackExchange.using("externalEditor", function ()
                StackExchange.using("snippets", function ()
                StackExchange.snippets.init();
                );
                );
                , "code-snippets");

                StackExchange.ready(function()
                var channelOptions =
                tags: "".split(" "),
                id: "1"
                ;
                initTagRenderer("".split(" "), "".split(" "), channelOptions);

                StackExchange.using("externalEditor", function()
                // Have to fire editor after snippets, if snippets enabled
                if (StackExchange.settings.snippets.snippetsEnabled)
                StackExchange.using("snippets", function()
                createEditor();
                );

                else
                createEditor();

                );

                function createEditor()
                StackExchange.prepareEditor(
                heartbeatType: 'answer',
                autoActivateHeartbeat: false,
                convertImagesToLinks: true,
                noModals: true,
                showLowRepImageUploadWarning: true,
                reputationToPostImages: 10,
                bindNavPrevention: true,
                postfix: "",
                imageUploader:
                brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
                contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
                allowUrls: true
                ,
                onDemand: true,
                discardSelector: ".discard-answer"
                ,immediatelyShowMarkdownHelp:true
                );



                );













                draft saved

                draft discarded


















                StackExchange.ready(
                function ()
                StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f45474270%2fhow-to-expire-state-of-dropduplicates-in-structured-streaming-to-avoid-oom%23new-answer', 'question_page');

                );

                Post as a guest















                Required, but never shown

























                3 Answers
                3






                active

                oldest

                votes








                3 Answers
                3






                active

                oldest

                votes









                active

                oldest

                votes






                active

                oldest

                votes









                5














                After a few days effort I finally find out the way myself.



                While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:



                .select(
                window($"timestamp", "1 day"),
                $"timestamp",
                $"uuid"
                )
                .withWatermark("window", "1 day")
                .dropDuplicates("uuid", "window")


                Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.






                share|improve this answer



























                  5














                  After a few days effort I finally find out the way myself.



                  While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:



                  .select(
                  window($"timestamp", "1 day"),
                  $"timestamp",
                  $"uuid"
                  )
                  .withWatermark("window", "1 day")
                  .dropDuplicates("uuid", "window")


                  Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.






                  share|improve this answer

























                    5












                    5








                    5







                    After a few days effort I finally find out the way myself.



                    While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:



                    .select(
                    window($"timestamp", "1 day"),
                    $"timestamp",
                    $"uuid"
                    )
                    .withWatermark("window", "1 day")
                    .dropDuplicates("uuid", "window")


                    Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.






                    share|improve this answer













                    After a few days effort I finally find out the way myself.



                    While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:



                    .select(
                    window($"timestamp", "1 day"),
                    $"timestamp",
                    $"uuid"
                    )
                    .withWatermark("window", "1 day")
                    .dropDuplicates("uuid", "window")


                    Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.







                    share|improve this answer












                    share|improve this answer



                    share|improve this answer










                    answered Aug 7 '17 at 9:31









                    KevinKevin

                    969




                    969























                        0














                        Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
                        buckets. Assumption is that event time is provided in milliseconds.



                        // removes all duplicates that are in 15 minutes tumbling window.
                        // doesn't remove duplicates that are in different 15 minutes windows !!!!
                        public static Dataset<Row> removeDuplicates(Dataset<Row> df)
                        // converts time in 15 minute buckets
                        // timestamp - (timestamp % (15 * 60))
                        Column bucketCol = functions.to_timestamp(
                        col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
                        df = df.withColumn("bucket", bucketCol);

                        String windowDuration = "15 minutes";
                        df = df.withWatermark("bucket", windowDuration)
                        .dropDuplicates("uuid", "bucket");

                        return df.drop("bucket");






                        share|improve this answer





























                          0














                          Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
                          buckets. Assumption is that event time is provided in milliseconds.



                          // removes all duplicates that are in 15 minutes tumbling window.
                          // doesn't remove duplicates that are in different 15 minutes windows !!!!
                          public static Dataset<Row> removeDuplicates(Dataset<Row> df)
                          // converts time in 15 minute buckets
                          // timestamp - (timestamp % (15 * 60))
                          Column bucketCol = functions.to_timestamp(
                          col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
                          df = df.withColumn("bucket", bucketCol);

                          String windowDuration = "15 minutes";
                          df = df.withWatermark("bucket", windowDuration)
                          .dropDuplicates("uuid", "bucket");

                          return df.drop("bucket");






                          share|improve this answer



























                            0












                            0








                            0







                            Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
                            buckets. Assumption is that event time is provided in milliseconds.



                            // removes all duplicates that are in 15 minutes tumbling window.
                            // doesn't remove duplicates that are in different 15 minutes windows !!!!
                            public static Dataset<Row> removeDuplicates(Dataset<Row> df)
                            // converts time in 15 minute buckets
                            // timestamp - (timestamp % (15 * 60))
                            Column bucketCol = functions.to_timestamp(
                            col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
                            df = df.withColumn("bucket", bucketCol);

                            String windowDuration = "15 minutes";
                            df = df.withWatermark("bucket", windowDuration)
                            .dropDuplicates("uuid", "bucket");

                            return df.drop("bucket");






                            share|improve this answer















                            Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
                            buckets. Assumption is that event time is provided in milliseconds.



                            // removes all duplicates that are in 15 minutes tumbling window.
                            // doesn't remove duplicates that are in different 15 minutes windows !!!!
                            public static Dataset<Row> removeDuplicates(Dataset<Row> df)
                            // converts time in 15 minute buckets
                            // timestamp - (timestamp % (15 * 60))
                            Column bucketCol = functions.to_timestamp(
                            col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
                            df = df.withColumn("bucket", bucketCol);

                            String windowDuration = "15 minutes";
                            df = df.withWatermark("bucket", windowDuration)
                            .dropDuplicates("uuid", "bucket");

                            return df.drop("bucket");







                            share|improve this answer














                            share|improve this answer



                            share|improve this answer








                            edited Nov 14 '18 at 21:12

























                            answered Nov 14 '18 at 21:07









                            dejandejan

                            1028




                            1028





















                                0














                                I found out that window function didn't work so I chose to use window.start or window.end.



                                .select(
                                window($"timestamp", "1 day").start,
                                $"timestamp",
                                $"uuid"
                                )
                                .withWatermark("window", "1 day")
                                .dropDuplicates("uuid", "window")





                                share|improve this answer



























                                  0














                                  I found out that window function didn't work so I chose to use window.start or window.end.



                                  .select(
                                  window($"timestamp", "1 day").start,
                                  $"timestamp",
                                  $"uuid"
                                  )
                                  .withWatermark("window", "1 day")
                                  .dropDuplicates("uuid", "window")





                                  share|improve this answer

























                                    0












                                    0








                                    0







                                    I found out that window function didn't work so I chose to use window.start or window.end.



                                    .select(
                                    window($"timestamp", "1 day").start,
                                    $"timestamp",
                                    $"uuid"
                                    )
                                    .withWatermark("window", "1 day")
                                    .dropDuplicates("uuid", "window")





                                    share|improve this answer













                                    I found out that window function didn't work so I chose to use window.start or window.end.



                                    .select(
                                    window($"timestamp", "1 day").start,
                                    $"timestamp",
                                    $"uuid"
                                    )
                                    .withWatermark("window", "1 day")
                                    .dropDuplicates("uuid", "window")






                                    share|improve this answer












                                    share|improve this answer



                                    share|improve this answer










                                    answered Nov 18 '18 at 2:58









                                    Jun HeJun He

                                    1




                                    1



























                                        draft saved

                                        draft discarded
















































                                        Thanks for contributing an answer to Stack Overflow!


                                        • Please be sure to answer the question. Provide details and share your research!

                                        But avoid


                                        • Asking for help, clarification, or responding to other answers.

                                        • Making statements based on opinion; back them up with references or personal experience.

                                        To learn more, see our tips on writing great answers.




                                        draft saved


                                        draft discarded














                                        StackExchange.ready(
                                        function ()
                                        StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f45474270%2fhow-to-expire-state-of-dropduplicates-in-structured-streaming-to-avoid-oom%23new-answer', 'question_page');

                                        );

                                        Post as a guest















                                        Required, but never shown





















































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown

































                                        Required, but never shown














                                        Required, but never shown












                                        Required, but never shown







                                        Required, but never shown







                                        這個網誌中的熱門文章

                                        Barbados

                                        How to read a connectionString WITH PROVIDER in .NET Core?

                                        Node.js Script on GitHub Pages or Amazon S3