How do I write a futures::Stream to disk without storing it entirely in memory first?









up vote
0
down vote

favorite












There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?










share|improve this question























  • for x in stream file.write_all(&x) something like that...
    – Stargateur
    Nov 11 at 4:04










  • That would require StreamingBody to be an iterator, which it is not.
    – Nicholas Bishop
    Nov 11 at 4:09














up vote
0
down vote

favorite












There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?










share|improve this question























  • for x in stream file.write_all(&x) something like that...
    – Stargateur
    Nov 11 at 4:04










  • That would require StreamingBody to be an iterator, which it is not.
    – Nicholas Bishop
    Nov 11 at 4:09












up vote
0
down vote

favorite









up vote
0
down vote

favorite











There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?










share|improve this question















There's an example of downloading a file with Rusoto S3 here:
How to save a file downloaded from S3 with Rusoto to my hard drive?



The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?







stream rust future






share|improve this question















share|improve this question













share|improve this question




share|improve this question








edited Nov 11 at 4:26









Shepmaster

144k11268400




144k11268400










asked Nov 11 at 2:46









Nicholas Bishop

542413




542413











  • for x in stream file.write_all(&x) something like that...
    – Stargateur
    Nov 11 at 4:04










  • That would require StreamingBody to be an iterator, which it is not.
    – Nicholas Bishop
    Nov 11 at 4:09
















  • for x in stream file.write_all(&x) something like that...
    – Stargateur
    Nov 11 at 4:04










  • That would require StreamingBody to be an iterator, which it is not.
    – Nicholas Bishop
    Nov 11 at 4:09















for x in stream file.write_all(&x) something like that...
– Stargateur
Nov 11 at 4:04




for x in stream file.write_all(&x) something like that...
– Stargateur
Nov 11 at 4:04












That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09




That would require StreamingBody to be an iterator, which it is not.
– Nicholas Bishop
Nov 11 at 4:09












1 Answer
1






active

oldest

votes

















up vote
0
down vote



accepted










Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move


We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer






















  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?
    – Nicholas Bishop
    Nov 12 at 2:45






  • 1




    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
    – Sven Marnach
    Nov 12 at 10:12










  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
    – Nicholas Bishop
    Nov 12 at 15:11










  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
    – Sven Marnach
    Nov 12 at 15:27






  • 1




    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
    – Shepmaster
    Nov 12 at 16:29










Your Answer






StackExchange.ifUsing("editor", function ()
StackExchange.using("externalEditor", function ()
StackExchange.using("snippets", function ()
StackExchange.snippets.init();
);
);
, "code-snippets");

StackExchange.ready(function()
var channelOptions =
tags: "".split(" "),
id: "1"
;
initTagRenderer("".split(" "), "".split(" "), channelOptions);

StackExchange.using("externalEditor", function()
// Have to fire editor after snippets, if snippets enabled
if (StackExchange.settings.snippets.snippetsEnabled)
StackExchange.using("snippets", function()
createEditor();
);

else
createEditor();

);

function createEditor()
StackExchange.prepareEditor(
heartbeatType: 'answer',
convertImagesToLinks: true,
noModals: true,
showLowRepImageUploadWarning: true,
reputationToPostImages: 10,
bindNavPrevention: true,
postfix: "",
imageUploader:
brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
allowUrls: true
,
onDemand: true,
discardSelector: ".discard-answer"
,immediatelyShowMarkdownHelp:true
);



);













 

draft saved


draft discarded


















StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245412%2fhow-do-i-write-a-futuresstream-to-disk-without-storing-it-entirely-in-memory-f%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown

























1 Answer
1






active

oldest

votes








1 Answer
1






active

oldest

votes









active

oldest

votes






active

oldest

votes








up vote
0
down vote



accepted










Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move


We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer






















  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?
    – Nicholas Bishop
    Nov 12 at 2:45






  • 1




    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
    – Sven Marnach
    Nov 12 at 10:12










  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
    – Nicholas Bishop
    Nov 12 at 15:11










  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
    – Sven Marnach
    Nov 12 at 15:27






  • 1




    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
    – Shepmaster
    Nov 12 at 16:29














up vote
0
down vote



accepted










Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move


We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer






















  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?
    – Nicholas Bishop
    Nov 12 at 2:45






  • 1




    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
    – Sven Marnach
    Nov 12 at 10:12










  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
    – Nicholas Bishop
    Nov 12 at 15:11










  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
    – Sven Marnach
    Nov 12 at 15:27






  • 1




    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
    – Shepmaster
    Nov 12 at 16:29












up vote
0
down vote



accepted







up vote
0
down vote



accepted






Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move


We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?





share|improve this answer














Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:



extern crate futures; // 0.1.25

use futures::prelude::*, stream;

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> b.to_owned());
stream::iter_ok(iter_of_owned_bytes)



We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:



use std::fs::File, io::Write;

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error>
streaming_body().for_each(move


We can then write a little testing main:



fn main() 
let mut file = Vec::new();


let fut = save_to_disk(&mut file);
fut.wait().expect("Could not drive future");


assert_eq!(file, b"0123456789ABCDEF");



Important notes about the quality of this naïve implementation:



  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.


  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.


See also:



  • What is the best approach to encapsulate blocking I/O in future-rs?

  • How do I synchronously return a value calculated in an asynchronous Future in stable Rust?






share|improve this answer














share|improve this answer



share|improve this answer








edited Nov 12 at 14:12

























answered Nov 11 at 20:11









Shepmaster

144k11268400




144k11268400











  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?
    – Nicholas Bishop
    Nov 12 at 2:45






  • 1




    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
    – Sven Marnach
    Nov 12 at 10:12










  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
    – Nicholas Bishop
    Nov 12 at 15:11










  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
    – Sven Marnach
    Nov 12 at 15:27






  • 1




    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
    – Shepmaster
    Nov 12 at 16:29
















  • One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?
    – Nicholas Bishop
    Nov 12 at 2:45






  • 1




    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
    – Sven Marnach
    Nov 12 at 10:12










  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
    – Nicholas Bishop
    Nov 12 at 15:11










  • @NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
    – Sven Marnach
    Nov 12 at 15:27






  • 1




    Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
    – Shepmaster
    Nov 12 at 16:29















One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?
– Nicholas Bishop
Nov 12 at 2:45




One question about this. Where you call streaming_body().for_each(...), is that more or less equivalent to doing for chunk in streaming_body().wait() ... , other than one using a closure and the other using an iterator?
– Nicholas Bishop
Nov 12 at 2:45




1




1




@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
– Sven Marnach
Nov 12 at 10:12




@NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator for_each(), on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.)
– Sven Marnach
Nov 12 at 10:12












For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11




For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is.
– Nicholas Bishop
Nov 12 at 15:11












@NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
– Sven Marnach
Nov 12 at 15:27




@NicholasBishop Then both versions are fine. I'd probably use Stream::wait() in that case, since working with stream combinators can be cumbersome.
– Sven Marnach
Nov 12 at 15:27




1




1




Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
– Shepmaster
Nov 12 at 16:29




Stream::wait is fine for now, but it's being removed in the futures rework (just like Future::wait). There will be a direct replacement for Future::wait, but I don't know of one for Stream::wait.
– Shepmaster
Nov 12 at 16:29

















 

draft saved


draft discarded















































 


draft saved


draft discarded














StackExchange.ready(
function ()
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53245412%2fhow-do-i-write-a-futuresstream-to-disk-without-storing-it-entirely-in-memory-f%23new-answer', 'question_page');

);

Post as a guest















Required, but never shown





















































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown

































Required, but never shown














Required, but never shown












Required, but never shown







Required, but never shown







這個網誌中的熱門文章

How to read a connectionString WITH PROVIDER in .NET Core?

Guadeloupe

Node.js Script on GitHub Pages or Amazon S3