Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truly stream chunks so the entire blob doesn't need to be kept in memory #11009

Closed
jabbera opened this issue Apr 22, 2020 · 25 comments
Closed

Truly stream chunks so the entire blob doesn't need to be kept in memory #11009

jabbera opened this issue Apr 22, 2020 · 25 comments
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. feature-request This issue requires a new behavior in the product in order be resolved. Service Attention Workflow: This issue is responsible by Azure service team. Storage Storage Service (Queues, Blobs, Files)
Milestone

Comments

@jabbera
Copy link
Contributor

jabbera commented Apr 22, 2020

Is your feature request related to a problem? Please describe.
I have large gz files I need to stream (10-50GB). I don't want or have the memory to download the blob into memory first. gz is a streaming format so I only need chunks at a time.

Describe the solution you'd like
Something like this. Note the AzureBlobStream implementation that only keeps 1 chunk in memory at a time. It would be nice if StorageStreamDownloader just acted like a stream and behaved this way.

linecount = 0
stream = source_blob.download_blob()
istream = AzureBlobStream(stream) # As chunks are downloaded they are discarded from memory
with io.BufferedReader(gzip.GzipFile(mode="rb", fileobj=istream), buffer_size=istream.block_size) as f:
    next(f)
    for line in f:
        linecount +=1
print(f"read: {linecount} lines")

# Treat a StorageStreamDownloader as a forward read only stream
class AzureBlobStream:
    def __init__(self, ssd: StorageStreamDownloader):
        self.ssd = ssd
        self.chunk_iterator = self.ssd.chunks()
        self.current_buffer = []
        self.current_position_in_buffer = 0
        self.blob_length = ssd.size
        self.bytes_read = 0
        self.block_size = 100 * 1024 * 1024

    def __get_next_buffer(self):
        next_buffer = self.chunk_iterator.next()
        self.current_position_in_buffer = 0
        return next_buffer


    def __read_intern(self, size):
        buffer_length = len(self.current_buffer)
        if buffer_length - self.current_position_in_buffer >= size:
            ret_val = self.current_buffer[self.current_position_in_buffer: self.current_position_in_buffer + size]
            self.current_position_in_buffer = self.current_position_in_buffer + size
            return ret_val, 0
        else:
            ret_val = self.current_buffer[self.current_position_in_buffer:]
            bytes_needed = size - (buffer_length - self.current_position_in_buffer)
            return ret_val, bytes_needed

    def read(self, size):
        if self.bytes_read >= self.blob_length:
            return b''
        if size > self.blob_length - self.bytes_read:
            size = self.blob_length - self.bytes_read
        ret_val = bytearray()
        left_to_read = size
        while left_to_read != 0:
            additional_bytes, left_to_read = self.__read_intern(left_to_read)
            ret_val.extend(additional_bytes)
            if left_to_read != 0:
                self.current_buffer = self.__get_next_buffer()
        self.bytes_read = self.bytes_read + len(ret_val)
        return ret_val

Describe alternatives you've considered
Downloading 10's of GB into memory.

Additional context
None.

Edit: updated code to work....

@ghost ghost added the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Apr 22, 2020
@ghost ghost added customer-reported Issues that are reported by GitHub users external to the Azure organization. question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Apr 22, 2020
@kaerm kaerm added Service Attention Workflow: This issue is responsible by Azure service team. Storage Storage Service (Queues, Blobs, Files) labels Apr 23, 2020
@ghost
Copy link

ghost commented Apr 23, 2020

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @xgithubtriage.

@ghost ghost removed the needs-triage Workflow: This is a new issue that needs to be triaged to the appropriate team. label Apr 23, 2020
@amishra-dev amishra-dev added feature-request This issue requires a new behavior in the product in order be resolved. and removed question The issue doesn't require a change to the product in order to be resolved. Most issues start as that labels Apr 29, 2020
@amishra-dev
Copy link

Hi @jabbera
Thanks for the feature request. We will add this to the backlog.

Thanks
Ashish

@lmazuel lmazuel added this to the Backlog milestone May 4, 2020
@lmazuel lmazuel added the Client This issue points to a problem in the data-plane of the library. label May 5, 2020
@mrbrown-09
Copy link

Bump I've been trying to find a way for the python api to stream, and I haven't seen a way to do it yet, or maybe I missed it. The chunks() method on the api documentation doesn't have any description, so I'm thankful someone figured this out.

@shahdadpuri-varun
Copy link

Thanks @jabbera for this request. I too was struggling to figure out this download_blob() functionality.
@amishra-dev Any update when this will be as part of official SDK?

@tasherif-msft
Copy link
Contributor

Hi @jabbera and @shahdadpuri-varun, sorry on the delay on this. This feature is already available in the SDK.
Once you call download_blob() you can then call chunks() on the StorageStreamDownloader handle. So for example:

blob_stream = bc.download_blob()
for chunk in blob_stream.chunks():
    # Insert what you want to do here.

I will close this issue for now. Let me know if you have any other questions!
We intend to improve our docstrings to highlight this feature.

@ollytheninja
Copy link

ollytheninja commented Apr 20, 2021

Hi @jabbera, thanks for the example code, it's really helped me out.

To say "This feature is already available in the SDK" is over selling it a little.
So many things in Python expect a streamable or "File Like" object that they can read from.
@jabbera summed it up in one of their comments in the example code above: "Treat a StorageStreamDownloader as a forward read only stream".
Implementing your own chunking is great but it seems like a cruel joke on developers to not have the StorageStreamDownloader function like a stream.
If StorageStreamDownloader had a read() method it could be passed straight into other python libraries far more easily.

For example at the moment if I want to copy files from my Azure blob store to a partner's AWS S3 I would expect to be able to do:

        az_blob_stream = azure.download_blob() # This is a StorageStreamDownloader object
        aws.upload_fileobj(az_blob_stream)

In reality this requires doing:

        az_blob_stream = azure.download_blob()
        aws.upload_fileobj(az_blob_stream.read_all())

Which loads the entire file into memory and then uploads it to AWS S3.
After finding this issue and using the provided example code I can do:

        az_blob_stream = azure.download_blob()
        real_stream = AzureBlobStream(az_blob_stream)
        aws.upload_fileobj(real_stream)

Which is almost (but not quite!) the first example and notably uploads and downloads at the same time, using significantly less memory for large files.

@jabbera
Copy link
Contributor Author

jabbera commented Apr 20, 2021

I missed that this was closed. @ollytheninja reiterated my point. This SDK is striving to be as pythonic as possible and that chunks api is about as far from pythonic as possible. StorageStreamDownloader, despite its name, is not a python stream.

@jabbera
Copy link
Contributor Author

jabbera commented Apr 20, 2021

@tasherif-msft

Once you call download_blob() you can then call chunks() on the StorageStreamDownloader handle. So for example:

If you look at my sample code I use the api you described here so I know it exists. The issue is StorageStreamDownloader isn't an actual python stream so it's useless across 99 percent of the python io ecosystem unless you want to download the entire blob into memory. (Hint, we don't want one of those fancy 200TB blobs you just released sitting in ram if we are copying it somewhere:-))

@ericthomas1
Copy link

ericthomas1 commented Jun 22, 2021

@ollytheninja & @jabbera ,

I'm facing the same issue. Function works well as long as filesize remains < App Service Plan RAM size. At ~90% memory utilization, the Function crashes.

Oddly enough, my use case is the same too: Move data from Azure Storage to AWS S3 bucket!

  • I grab Blob from Storage using:
    • Note: The max_single_get_size has been a nice performance improvement; reduced Blob download times.
blob_client = BlobClient.from_blob_url(event.get_json()["blobUrl"], credentials, max_single_get_size = 256*1024*1024, max_chunk_get_size = 128*1024*1024)
blob_data = blob_client.download_blob().readall()`
  • Load Blob into memory using: blob_bytes = io.BytesIO(blob_data)
  • Move Blob to AWS using:
    • Note: This config screams!
config = boto3.s3.transfer.TransferConfig(multipart_threshold=1024*25, max_concurrency=10, multipart_chunksize=1024*25, use_threads=True)
s3.Bucket(s3_bucket).upload_fileobj(blob_byte_stream, Key = aws_dir, Config = config)

I think you're proposing something like this, no? Passing ("streaming") chunks end to end across the wire?
image

I'm pretty new to Python so trying to determine from your sample code above whether you were successful or whether this is not possible.

Please enlighten me!

@jabbera
Copy link
Contributor Author

jabbera commented Jun 22, 2021

That's basically what my function does. It will basically keep at most 2x chunk size in memory at one time.

@ollytheninja
Copy link

Correct, currently it's doing what you illustrated on the first line - while it does the fetching of the file in chunks it doesn't expose those chunks as a stream, meaning that you cannot process the file in a streaming fashion, it will pull down the entire file before passing it on. The culprit is that readall() call

This is especially confusing when the "StorageStreamDownloader" returns a file-like python object and not a stream-like python object.

Exposing a stream like object, that buffers two chunks and fetches another when the first starts being read means processing a file uses [3*chunksize] and not [filesize] memory.

This is not only useful for the example here of transferring files out to another provider but also when (for example) processing frames in a video, searching large log files etc.

@ollytheninja
Copy link

If I get some time I'll see about making a pull request and a new issue for this, @tasherif-msft what are your views on reopening this? Or should we raise a new issue?

@ericthomas1
Copy link

@jabbera and @ollytheninja ,

Thanks for the continued engagement on the topic, though I'm still a little unclear.

@jabbera , sounds like you are saying that your code does not load the entire file into memory, but rather max 2 chunks and passes them completely down the line (sounds like streaming).

Whereas @ollytheninja , you are saying ...it will pull down the entire file before passing it on.

Is it actually possible to accomplish the second illustration above:

  • GET chunk(s) --> | process chunck(s) in memory | --> POST chunk(s) to external provider ?

@jabbera
Copy link
Contributor Author

jabbera commented Jun 24, 2021

My code only keeps 1 chunk in a buffer and whatever your read size is.

@ericthomas1
Copy link

ericthomas1 commented Jun 24, 2021

Ok cool. Have you tried / are you able to POST chunks-out as part of say, an upload to S3 without holding the entire file in memory?

@jabbera
Copy link
Contributor Author

jabbera commented Jun 25, 2021

No. I use it to stream really large text compressed text files (30-40GB compressed), decompress, and parse into a more usable format.

@ericthomas1
Copy link

Could we get some more samples in the docs for iterating over chunks()?

The section is empty at the moment:
image

@virtualdvid
Copy link

How can I increase the BUFFER_SIZE for the chunks? Is there any docs?

@Lorisyy
Copy link

Lorisyy commented Jul 12, 2022

I'm facing a similar situation, Thanks if anyone who could fix this :)

@jabbera
Copy link
Contributor Author

jabbera commented Jul 12, 2022

I'm shocked this is still open. Native python stream functionality should be core to this library.

@jalauzon-msft
Copy link
Member

Hi all, apologies that this thread has gone quiet for some time. It is true that chunks is still currently our supported way to stream data back without loading into memory. There is also readinto available now which you can look into which may help in certain scenarios.

@virtualdvid David, you can control the buffer size for chunks using the max_chunk_get_size keyword argument on all client constructors.

:keyword int max_chunk_get_size: The maximum chunk size used for downloading a blob. Defaults to 4*1024*1024,
or 4MB.

That being said, a little while ago I did start the work to add a proper read method to StorageStreamDownloader but have been too busy to finish it up. I hope to get back to that soon and have it in in the next couple of releases. Here is the PR in Draft form: #24275

@garashov
Copy link

garashov commented Oct 4, 2022

@jalauzon-msft
I am trying to access the blob file, divide it into the chunks of 1kb files and upload them in another folder.
For that, I used download_blob() and then chunks() method. As you mentioned, in that case it divides the file into 4 mb files by default. Instead, I tried to use "max_chunk_get_size" argument with value 1024 and received the following error:

  • TypeError: chunks() got an unexpected keyword argument 'max_chunk_get_size'
    Where could I find appropriate argument for chunk size?

@garashov
Copy link

garashov commented Oct 4, 2022

I resolved the issue from the previous comment by adding "max_chunk_get_size" argument in BlobService Client:

  • blob_service_client = BlobServiceClient(account_url= acc_url, credential=storage_account_key, max_chunk_get_size = 1024)
    And then using download_blob and chunks methods:
  • blob_client.download_blob().chunks()

@ericthomas1
Copy link

ericthomas1 commented Oct 4, 2022

I'm shocked this is still open. Native python stream functionality should be core to this library.

Could someone update us on the status of this issue?

@jalauzon-msft
Copy link
Member

Hi @ericthomas1, #24275 was recently merged which added a standard read method to the StrageStreamDownloader class. This will allow you to read an arbitrary size chunk of data from the downloader so the data can be streamed in a more Pythonic way. This is currently released in our latest beta version, 12.14.0b1 and 12.14.0b1. The plan is for this to be in our next full release which is tentatively scheduled for early this month.

In the meantime, or as an alternative, the chunks API, which exists today, can be used to stream data back. See this sample for how that can be done. Thanks.

@github-actions github-actions bot locked and limited conversation to collaborators Apr 12, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. feature-request This issue requires a new behavior in the product in order be resolved. Service Attention Workflow: This issue is responsible by Azure service team. Storage Storage Service (Queues, Blobs, Files)
Projects
None yet
Development

No branches or pull requests