How to expire state of dropDuplicates in structured streaming to avoid OOM?
I want to count the unique access for each day using spark structured streaming, so I use the following code
.dropDuplicates("uuid")
and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.
So is there a perfect solution?
apache-spark duplicates apache-spark-sql out-of-memory spark-structured-streaming
add a comment |
I want to count the unique access for each day using spark structured streaming, so I use the following code
.dropDuplicates("uuid")
and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.
So is there a perfect solution?
apache-spark duplicates apache-spark-sql out-of-memory spark-structured-streaming
add a comment |
I want to count the unique access for each day using spark structured streaming, so I use the following code
.dropDuplicates("uuid")
and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.
So is there a perfect solution?
apache-spark duplicates apache-spark-sql out-of-memory spark-structured-streaming
I want to count the unique access for each day using spark structured streaming, so I use the following code
.dropDuplicates("uuid")
and in the next day the state maintained for today should be dropped so that I can get the right count of unique access of the next day and avoid OOM. The spark document indicates using dropDuplicates with watermark, for example:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
but the watermark column must be specified in dropDuplicates. In such case the uuid and timestamp will be used as a combined key to deduplicate elements with the same uuid and timestamp, which is not what I expected.
So is there a perfect solution?
apache-spark duplicates apache-spark-sql out-of-memory spark-structured-streaming
apache-spark duplicates apache-spark-sql out-of-memory spark-structured-streaming
edited Oct 27 '18 at 13:13
user6910411
34.7k1082105
34.7k1082105
asked Aug 3 '17 at 3:27
KevinKevin
969
969
add a comment |
add a comment |
3 Answers
3
active
oldest
votes
After a few days effort I finally find out the way myself.
While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.
add a comment |
Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
buckets. Assumption is that event time is provided in milliseconds.
// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df)
// converts time in 15 minute buckets
// timestamp - (timestamp % (15 * 60))
Column bucketCol = functions.to_timestamp(
col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
df = df.withColumn("bucket", bucketCol);
String windowDuration = "15 minutes";
df = df.withWatermark("bucket", windowDuration)
.dropDuplicates("uuid", "bucket");
return df.drop("bucket");
add a comment |
I found out that window function didn't work so I chose to use window.start or window.end.
.select(
window($"timestamp", "1 day").start,
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
add a comment |
Your Answer
StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");
StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);
StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);
else
createEditor();
);
function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
autoActivateHeartbeat: false,
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);
);
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f45474270%2fhow-to-expire-state-of-dropduplicates-in-structured-streaming-to-avoid-oom%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
3 Answers
3
active
oldest
votes
3 Answers
3
active
oldest
votes
active
oldest
votes
active
oldest
votes
After a few days effort I finally find out the way myself.
While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.
add a comment |
After a few days effort I finally find out the way myself.
While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.
add a comment |
After a few days effort I finally find out the way myself.
While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.
After a few days effort I finally find out the way myself.
While studying the source code of watermark and dropDuplicates, I discovered that besides an eventTime column, watermark also supports window column, so we can use the following code:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Since all events in the same day have the same window, this will produce the same results as using only uuid to deduplicate. Hopes can help someone.
answered Aug 7 '17 at 9:31
KevinKevin
969
969
add a comment |
add a comment |
Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
buckets. Assumption is that event time is provided in milliseconds.
// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df)
// converts time in 15 minute buckets
// timestamp - (timestamp % (15 * 60))
Column bucketCol = functions.to_timestamp(
col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
df = df.withColumn("bucket", bucketCol);
String windowDuration = "15 minutes";
df = df.withWatermark("bucket", windowDuration)
.dropDuplicates("uuid", "bucket");
return df.drop("bucket");
add a comment |
Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
buckets. Assumption is that event time is provided in milliseconds.
// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df)
// converts time in 15 minute buckets
// timestamp - (timestamp % (15 * 60))
Column bucketCol = functions.to_timestamp(
col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
df = df.withColumn("bucket", bucketCol);
String windowDuration = "15 minutes";
df = df.withWatermark("bucket", windowDuration)
.dropDuplicates("uuid", "bucket");
return df.drop("bucket");
add a comment |
Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
buckets. Assumption is that event time is provided in milliseconds.
// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df)
// converts time in 15 minute buckets
// timestamp - (timestamp % (15 * 60))
Column bucketCol = functions.to_timestamp(
col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
df = df.withColumn("bucket", bucketCol);
String windowDuration = "15 minutes";
df = df.withWatermark("bucket", windowDuration)
.dropDuplicates("uuid", "bucket");
return df.drop("bucket");
Below is the modification of the procedure proposed in Spark documentation. Trick is to manipulate event time i.e. put event time in
buckets. Assumption is that event time is provided in milliseconds.
// removes all duplicates that are in 15 minutes tumbling window.
// doesn't remove duplicates that are in different 15 minutes windows !!!!
public static Dataset<Row> removeDuplicates(Dataset<Row> df)
// converts time in 15 minute buckets
// timestamp - (timestamp % (15 * 60))
Column bucketCol = functions.to_timestamp(
col("event_time").divide(1000).minus((col("event_time").divide(1000)).mod(15*60)));
df = df.withColumn("bucket", bucketCol);
String windowDuration = "15 minutes";
df = df.withWatermark("bucket", windowDuration)
.dropDuplicates("uuid", "bucket");
return df.drop("bucket");
edited Nov 14 '18 at 21:12
answered Nov 14 '18 at 21:07
dejandejan
1028
1028
add a comment |
add a comment |
I found out that window function didn't work so I chose to use window.start or window.end.
.select(
window($"timestamp", "1 day").start,
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
add a comment |
I found out that window function didn't work so I chose to use window.start or window.end.
.select(
window($"timestamp", "1 day").start,
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
add a comment |
I found out that window function didn't work so I chose to use window.start or window.end.
.select(
window($"timestamp", "1 day").start,
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
I found out that window function didn't work so I chose to use window.start or window.end.
.select(
window($"timestamp", "1 day").start,
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
answered Nov 18 '18 at 2:58
Jun HeJun He
1
1
add a comment |
add a comment |
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f45474270%2fhow-to-expire-state-of-dropduplicates-in-structured-streaming-to-avoid-oom%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown