public class StreamTransferManager extends Object
The data is split into chunks and uploaded using the multipart upload API by one or more separate threads.
After creating an instance with details of the upload, use getMultiPartOutputStreams()
to get a list
of MultiPartOutputStream
s. When you finish writing data, call MultiPartOutputStream.close()
.
Parts will be uploaded to S3 as you write.
Once all streams have been closed, call complete()
. Alternatively you can call
abort()
at any point if needed.
Here is an example. A lot of the code relates to setting up threads for creating data unrelated to the library. The essential parts are commented.
AmazonS3Client client = new AmazonS3Client(awsCreds);
// Setting up
int numStreams = 2;
final StreamTransferManager manager = new StreamTransferManager(bucket, key, client)
.numStreams(numStreams)
.numUploadThreads(2)
.queueCapacity(2)
.partSize(10);
final List<MultiPartOutputStream> streams = manager.getMultiPartOutputStreams();
ExecutorService pool = Executors.newFixedThreadPool(numStreams);
for (int i = 0; i < numStreams; i++) {
final int streamIndex = i;
pool.submit(new Runnable() {
public void run() {
try {
MultiPartOutputStream outputStream = streams.get(streamIndex);
for (int lineNum = 0; lineNum < 1000000; lineNum++) {
String line = generateData(streamIndex, lineNum);
// Writing data and potentially sending off a part
outputStream.write(line.getBytes());
}
// The stream must be closed once all the data has been written
outputStream.close();
} catch (Exception e) {
// Aborts all uploads
manager.abort(e);
}
}
});
}
pool.shutdown();
pool.awaitTermination(5, TimeUnit.SECONDS);
// Finishing off
manager.complete();
The final file on S3 will then usually be the result of concatenating all the data written to each stream,
in the order that the streams were in in the list obtained from getMultiPartOutputStreams()
. However this
may not be true if multiple streams are used and some of them produce less than 5 MB of data. This is because the multipart
upload API does not allow the uploading of more than one part smaller than 5 MB, which leads to fundamental limits
on what this class can accomplish. If order of data is important to you, then either use only one stream or ensure
that you write at least 5 MB to every stream.
While performing the multipart upload this class will create instances of InitiateMultipartUploadRequest
,
UploadPartRequest
, and CompleteMultipartUploadRequest
, fill in the essential details, and send them
off. If you need to add additional details then override the appropriate customise*Request
methods and
set the required properties within. Note that if no data is written (i.e. the object body is empty) then a normal (not multipart) upload will be performed and customisePutEmptyObjectRequest
will be called instead.
This class does not perform retries when uploading. If an exception is thrown at any stage the upload will be aborted and the
exception rethrown, wrapped in a RuntimeException
.
You can configure the upload process by calling any of the chaining setter methods numStreams(int)
, numUploadThreads(int)
, queueCapacity(int)
, or partSize(long)
before calling getMultiPartOutputStreams
. Parts that have been produced sit in a queue of specified capacity while they wait for a thread to upload them.
The worst case memory usage is (numUploadThreads + queueCapacity) * partSize + numStreams * (partSize + 6MB)
,
while higher values for these first three parameters may lead to better resource usage and throughput.
If you are uploading very large files, you may need to increase the part size - see partSize(long)
for details.
Modifier and Type | Field and Description |
---|---|
protected String |
bucketName |
protected boolean |
checkIntegrity |
protected int |
numStreams |
protected int |
numUploadThreads |
protected int |
partSize |
protected String |
putKey |
protected int |
queueCapacity |
protected com.amazonaws.services.s3.AmazonS3 |
s3Client |
protected String |
uploadId |
Constructor and Description |
---|
StreamTransferManager(String bucketName,
String putKey,
com.amazonaws.services.s3.AmazonS3 s3Client) |
StreamTransferManager(String bucketName,
String putKey,
com.amazonaws.services.s3.AmazonS3 s3Client,
int numStreams,
int numUploadThreads,
int queueCapacity,
int partSize)
Deprecated.
|
Modifier and Type | Method and Description |
---|---|
void |
abort()
Aborts the upload.
|
RuntimeException |
abort(Throwable t)
Aborts the upload and rethrows the argument, wrapped in a RuntimeException if necessary.
|
StreamTransferManager |
checkIntegrity(boolean checkIntegrity)
Sets whether a data integrity check should be performed during and after upload.
|
void |
complete()
Blocks while waiting for the threads uploading the contents of the streams returned
by
getMultiPartOutputStreams() to finish, then sends a request to S3 to complete
the upload. |
void |
customiseCompleteRequest(com.amazonaws.services.s3.model.CompleteMultipartUploadRequest request) |
void |
customiseInitiateRequest(com.amazonaws.services.s3.model.InitiateMultipartUploadRequest request) |
void |
customisePutEmptyObjectRequest(com.amazonaws.services.s3.model.PutObjectRequest request) |
void |
customiseUploadPartRequest(com.amazonaws.services.s3.model.UploadPartRequest request) |
List<MultiPartOutputStream> |
getMultiPartOutputStreams()
Get the list of output streams to write to.
|
StreamTransferManager |
numStreams(int numStreams)
Sets the number of
MultiPartOutputStream s that will be created and returned by
getMultiPartOutputStreams() for you to write to. |
StreamTransferManager |
numUploadThreads(int numUploadThreads)
Sets the number of threads that will be created to upload the data in parallel to S3.
|
StreamTransferManager |
partSize(long partSize)
Sets the size in MB of the parts to be uploaded to S3.
|
StreamTransferManager |
queueCapacity(int queueCapacity)
Sets the capacity of the queue where completed parts from the output streams will sit
waiting to be taken by the upload threads.
|
String |
toString() |
protected final String bucketName
protected final String putKey
protected final com.amazonaws.services.s3.AmazonS3 s3Client
protected String uploadId
protected int numStreams
protected int numUploadThreads
protected int queueCapacity
protected int partSize
protected boolean checkIntegrity
public StreamTransferManager(String bucketName, String putKey, com.amazonaws.services.s3.AmazonS3 s3Client)
@Deprecated public StreamTransferManager(String bucketName, String putKey, com.amazonaws.services.s3.AmazonS3 s3Client, int numStreams, int numUploadThreads, int queueCapacity, int partSize)
StreamTransferManager(String, String, AmazonS3)
and then chain the desired setters.public StreamTransferManager numStreams(int numStreams)
MultiPartOutputStream
s that will be created and returned by
getMultiPartOutputStreams()
for you to write to.
By default this is 1, increase it if you want to write to multiple streams from different threads in parallel.
If you are writing large files with many streams, you may need to increase the part size
to avoid running out of part numbers - see partSize(long)
for more details.
Each stream may hold up to partSize(long)
+ 6MB
in memory at a time.
StreamTransferManager
for chaining.IllegalArgumentException
- if the argument is less than 1.IllegalStateException
- if getMultiPartOutputStreams()
has already
been called, initiating the upload.public StreamTransferManager numUploadThreads(int numUploadThreads)
By default this is 1, increase it if uploading is a speed bottleneck and you have network bandwidth to spare.
Each thread may hold up to partSize(long)
in memory at a time.
StreamTransferManager
for chaining.IllegalArgumentException
- if the argument is less than 1.IllegalStateException
- if getMultiPartOutputStreams()
has already
been called, initiating the upload.public StreamTransferManager queueCapacity(int queueCapacity)
By default this is 1, increase it if you want to help your threads which write to the streams be consistently busy instead of blocking waiting for upload threads.
Each part sitting in the queue will hold partSize(long)
bytes
in memory at a time.
StreamTransferManager
for chaining.IllegalArgumentException
- if the argument is less than 1.IllegalStateException
- if getMultiPartOutputStreams()
has already
been called, initiating the upload.public StreamTransferManager partSize(long partSize)
By default this is 5, which is the minimum that AWS allows. You may need to increase it if you are uploading very large files or writing to many output streams.
AWS allows up to 10,000 parts to be uploaded for a single object, and each part must be
identified by a unique number from 1 to 10,000. These part numbers are allocated evenly
by the manager to each output stream. Therefore the maximum amount of data that can be
written to a stream is 10000/numStreams * partSize
. If you try to write more,
an IndexOutOfBoundsException
will be thrown.
The total object size can be at most 5 TB, so if you're using just one stream,
there is no reason to set this higher than 525. If you're using more streams, you may want
a higher value in case some streams get more data than others.
Increasing this value will of course increase memory usage.
StreamTransferManager
for chaining.IllegalArgumentException
- if the argument is less than 5.IllegalArgumentException
- if the resulting part size in bytes cannot fit in a 32 bit int.IllegalStateException
- if getMultiPartOutputStreams()
has already
been called, initiating the upload.public StreamTransferManager checkIntegrity(boolean checkIntegrity)
By default this is disabled.
The integrity check consists of two steps. First, each uploaded part is verified by setting the Content-MD5 header for Amazon S3 to check against its own hash. If they don't match, the AWS SDK will throw an exception. The header value is the base64-encoded 128-bit MD5 digest of the part body.
The second step is to ensure integrity of the final object merged from the uploaded parts. This is achieved by comparing the expected ETag value with the actual ETag returned by S3. However, the ETag value is not a MD5 hash. When S3 combines the parts of a multipart upload into the final object, the ETag value is set to the hex-encoded MD5 hash of the concatenated binary-encoded MD5 hashes of each part followed by "-" and the number of parts, for instance:
57f456164b0e5f365aaf9bb549731f32-95Note that AWS doesn't document this, so their hashing algorithm might change without notice which would lead to false alarm exceptions. If the ETags don't match, an
IntegrityCheckException
will be thrown after completing
the upload. This will not abort or revert the upload.checkIntegrity
- true
if data integrity should be checkedStreamTransferManager
for chaining.IllegalStateException
- if getMultiPartOutputStreams()
has already
been called, initiating the upload.public List<MultiPartOutputStream> getMultiPartOutputStreams()
The first call to this method initiates the multipart upload. All setter methods must be called before this.
public void complete()
getMultiPartOutputStreams()
to finish, then sends a request to S3 to complete
the upload. For the former to complete, it's essential that every stream is closed, otherwise the upload
threads will block forever waiting for more data.public RuntimeException abort(Throwable t)
throw abort(e)
to make it clear to the compiler and readers that the code
stops here.public void abort()
public void customiseInitiateRequest(com.amazonaws.services.s3.model.InitiateMultipartUploadRequest request)
public void customiseUploadPartRequest(com.amazonaws.services.s3.model.UploadPartRequest request)
public void customiseCompleteRequest(com.amazonaws.services.s3.model.CompleteMultipartUploadRequest request)
public void customisePutEmptyObjectRequest(com.amazonaws.services.s3.model.PutObjectRequest request)
Copyright © 2020. All rights reserved.