How do I write a futures::Stream to disk without storing it entirely in memory first?
up vote
0
down vote
favorite
There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?
The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all
method which takes an array of bytes, not a stream. How can I use the StreamingBody
, which implements futures::Stream
to stream the file to disk?
stream rust future
add a comment |
up vote
0
down vote
favorite
There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?
The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all
method which takes an array of bytes, not a stream. How can I use the StreamingBody
, which implements futures::Stream
to stream the file to disk?
stream rust future
for x in stream file.write_all(&x)
something like that...
– Stargateur
Nov 11 at 4:04
That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?
The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all
method which takes an array of bytes, not a stream. How can I use the StreamingBody
, which implements futures::Stream
to stream the file to disk?
stream rust future
There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?
The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all
method which takes an array of bytes, not a stream. How can I use the StreamingBody
, which implements futures::Stream
to stream the file to disk?
stream rust future
stream rust future
edited Nov 11 at 4:26
Shepmaster
144k11268400
144k11268400
asked Nov 11 at 2:46
Nicholas Bishop
542413
542413
for x in stream file.write_all(&x)
something like that...
– Stargateur
Nov 11 at 4:04
That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09
add a comment |
for x in stream file.write_all(&x)
something like that...
– Stargateur
Nov 11 at 4:04
That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09
for x in stream file.write_all(&x)
something like that...– Stargateur
Nov 11 at 4:04
for x in stream file.write_all(&x)
something like that...– Stargateur
Nov 11 at 4:04
That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09
That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09
add a comment |
1 Answer
1
active
oldest
votes
up vote
0
down vote
accepted
Since StreamingBody
implements Stream<Item = Vec<u8>, Error = Error>
, we can construct a MCVE that represents that:
extern crate futures; // 0.1.25
use futures::prelude::*, stream;
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
We can then get a "streaming body" somehow and use Stream::for_each
to process each element in the Stream
. Here, we just call write_all
with some provided output location:
use std::fs::File, io::Write;
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move
We can then write a little testing main:
fn main()
let mut file = Vec::new();
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
assert_eq!(file, b"0123456789ABCDEF");
Important notes about the quality of this naïve implementation:
The call to
write_all
may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.The usage of
Future::wait
forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.
See also:
- What is the best approach to encapsulate blocking I/O in future-rs?
- How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
One question about this. Where you callstreaming_body().for_each(...)
, is that more or less equivalent to doingfor chunk in streaming_body().wait() ...
, other than one using a closure and the other using an iterator?
– Nicholas Bishop
Nov 12 at 2:45
1
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinatorfor_each()
, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
– Sven Marnach
Nov 12 at 10:12
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11
@NicholasBishop Then both versions are fine. I'd probably useStream::wait()
in that case, since working with stream combinators can be cumbersome.
– Sven Marnach
Nov 12 at 15:27
1
Stream::wait
is fine for now, but it's being removed in the futures rework (just likeFuture::wait
). There will be a direct replacement forFuture::wait
, but I don't know of one forStream::wait
.
– Shepmaster
Nov 12 at 16:29
add a comment |
1 Answer
1
active
oldest
votes
1 Answer
1
active
oldest
votes
active
oldest
votes
active
oldest
votes
up vote
0
down vote
accepted
Since StreamingBody
implements Stream<Item = Vec<u8>, Error = Error>
, we can construct a MCVE that represents that:
extern crate futures; // 0.1.25
use futures::prelude::*, stream;
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
We can then get a "streaming body" somehow and use Stream::for_each
to process each element in the Stream
. Here, we just call write_all
with some provided output location:
use std::fs::File, io::Write;
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move
We can then write a little testing main:
fn main()
let mut file = Vec::new();
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
assert_eq!(file, b"0123456789ABCDEF");
Important notes about the quality of this naïve implementation:
The call to
write_all
may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.The usage of
Future::wait
forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.
See also:
- What is the best approach to encapsulate blocking I/O in future-rs?
- How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
One question about this. Where you callstreaming_body().for_each(...)
, is that more or less equivalent to doingfor chunk in streaming_body().wait() ...
, other than one using a closure and the other using an iterator?
– Nicholas Bishop
Nov 12 at 2:45
1
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinatorfor_each()
, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
– Sven Marnach
Nov 12 at 10:12
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11
@NicholasBishop Then both versions are fine. I'd probably useStream::wait()
in that case, since working with stream combinators can be cumbersome.
– Sven Marnach
Nov 12 at 15:27
1
Stream::wait
is fine for now, but it's being removed in the futures rework (just likeFuture::wait
). There will be a direct replacement forFuture::wait
, but I don't know of one forStream::wait
.
– Shepmaster
Nov 12 at 16:29
add a comment |
up vote
0
down vote
accepted
Since StreamingBody
implements Stream<Item = Vec<u8>, Error = Error>
, we can construct a MCVE that represents that:
extern crate futures; // 0.1.25
use futures::prelude::*, stream;
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
We can then get a "streaming body" somehow and use Stream::for_each
to process each element in the Stream
. Here, we just call write_all
with some provided output location:
use std::fs::File, io::Write;
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move
We can then write a little testing main:
fn main()
let mut file = Vec::new();
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
assert_eq!(file, b"0123456789ABCDEF");
Important notes about the quality of this naïve implementation:
The call to
write_all
may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.The usage of
Future::wait
forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.
See also:
- What is the best approach to encapsulate blocking I/O in future-rs?
- How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
One question about this. Where you callstreaming_body().for_each(...)
, is that more or less equivalent to doingfor chunk in streaming_body().wait() ...
, other than one using a closure and the other using an iterator?
– Nicholas Bishop
Nov 12 at 2:45
1
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinatorfor_each()
, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
– Sven Marnach
Nov 12 at 10:12
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11
@NicholasBishop Then both versions are fine. I'd probably useStream::wait()
in that case, since working with stream combinators can be cumbersome.
– Sven Marnach
Nov 12 at 15:27
1
Stream::wait
is fine for now, but it's being removed in the futures rework (just likeFuture::wait
). There will be a direct replacement forFuture::wait
, but I don't know of one forStream::wait
.
– Shepmaster
Nov 12 at 16:29
add a comment |
up vote
0
down vote
accepted
up vote
0
down vote
accepted
Since StreamingBody
implements Stream<Item = Vec<u8>, Error = Error>
, we can construct a MCVE that represents that:
extern crate futures; // 0.1.25
use futures::prelude::*, stream;
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
We can then get a "streaming body" somehow and use Stream::for_each
to process each element in the Stream
. Here, we just call write_all
with some provided output location:
use std::fs::File, io::Write;
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move
We can then write a little testing main:
fn main()
let mut file = Vec::new();
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
assert_eq!(file, b"0123456789ABCDEF");
Important notes about the quality of this naïve implementation:
The call to
write_all
may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.The usage of
Future::wait
forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.
See also:
- What is the best approach to encapsulate blocking I/O in future-rs?
- How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
Since StreamingBody
implements Stream<Item = Vec<u8>, Error = Error>
, we can construct a MCVE that represents that:
extern crate futures; // 0.1.25
use futures::prelude::*, stream;
type Error = Box<std::error::Error>;
fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)
We can then get a "streaming body" somehow and use Stream::for_each
to process each element in the Stream
. Here, we just call write_all
with some provided output location:
use std::fs::File, io::Write;
fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move
We can then write a little testing main:
fn main()
let mut file = Vec::new();
let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");
assert_eq!(file, b"0123456789ABCDEF");
Important notes about the quality of this naïve implementation:
The call to
write_all
may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.The usage of
Future::wait
forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.
See also:
- What is the best approach to encapsulate blocking I/O in future-rs?
- How do I synchronously return a value calculated in an asynchronous Future in stable Rust?
edited Nov 12 at 14:12
answered Nov 11 at 20:11
Shepmaster
144k11268400
144k11268400
One question about this. Where you callstreaming_body().for_each(...)
, is that more or less equivalent to doingfor chunk in streaming_body().wait() ...
, other than one using a closure and the other using an iterator?
– Nicholas Bishop
Nov 12 at 2:45
1
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinatorfor_each()
, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
– Sven Marnach
Nov 12 at 10:12
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11
@NicholasBishop Then both versions are fine. I'd probably useStream::wait()
in that case, since working with stream combinators can be cumbersome.
– Sven Marnach
Nov 12 at 15:27
1
Stream::wait
is fine for now, but it's being removed in the futures rework (just likeFuture::wait
). There will be a direct replacement forFuture::wait
, but I don't know of one forStream::wait
.
– Shepmaster
Nov 12 at 16:29
add a comment |
One question about this. Where you callstreaming_body().for_each(...)
, is that more or less equivalent to doingfor chunk in streaming_body().wait() ...
, other than one using a closure and the other using an iterator?
– Nicholas Bishop
Nov 12 at 2:45
1
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinatorfor_each()
, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
– Sven Marnach
Nov 12 at 10:12
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11
@NicholasBishop Then both versions are fine. I'd probably useStream::wait()
in that case, since working with stream combinators can be cumbersome.
– Sven Marnach
Nov 12 at 15:27
1
Stream::wait
is fine for now, but it's being removed in the futures rework (just likeFuture::wait
). There will be a direct replacement forFuture::wait
, but I don't know of one forStream::wait
.
– Shepmaster
Nov 12 at 16:29
One question about this. Where you call
streaming_body().for_each(...)
, is that more or less equivalent to doing for chunk in streaming_body().wait() ...
, other than one using a closure and the other using an iterator?– Nicholas Bishop
Nov 12 at 2:45
One question about this. Where you call
streaming_body().for_each(...)
, is that more or less equivalent to doing for chunk in streaming_body().wait() ...
, other than one using a closure and the other using an iterator?– Nicholas Bishop
Nov 12 at 2:45
1
1
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator
for_each()
, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)– Sven Marnach
Nov 12 at 10:12
@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator
for_each()
, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)– Sven Marnach
Nov 12 at 10:12
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11
For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11
@NicholasBishop Then both versions are fine. I'd probably use
Stream::wait()
in that case, since working with stream combinators can be cumbersome.– Sven Marnach
Nov 12 at 15:27
@NicholasBishop Then both versions are fine. I'd probably use
Stream::wait()
in that case, since working with stream combinators can be cumbersome.– Sven Marnach
Nov 12 at 15:27
1
1
Stream::wait
is fine for now, but it's being removed in the futures rework (just like Future::wait
). There will be a direct replacement for Future::wait
, but I don't know of one for Stream::wait
.– Shepmaster
Nov 12 at 16:29
Stream::wait
is fine for now, but it's being removed in the futures rework (just like Future::wait
). There will be a direct replacement for Future::wait
, but I don't know of one for Stream::wait
.– Shepmaster
Nov 12 at 16:29
add a comment |
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245412%2fhow-do-i-write-a-futuresstream-to-disk-without-storing-it-entirely-in-memory-f%23new-answer', 'question_page');
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function ()
StackExchange.helpers.onClickDraftSave('#login-link');
);
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
for x in stream file.write_all(&x)
something like that...– Stargateur
Nov 11 at 4:04
That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09