From b7caed895fffd20f863e238d18c59bba88577e17 Mon Sep 17 00:00:00 2001 From: troy Date: Sat, 3 May 2025 12:46:52 -0700 Subject: [PATCH] add support for files >5GB Fixes: #70 --- CHANGELOG.md | 3 + README.md | 13 ++ git-lfs-multiparts3upload | 155 ++++++++++++++++++ serverless.yml | 56 ++++++- src/batch/batch.ts | 213 ++++++++++++++++++++++--- src/completemultipartupload/handler.ts | 93 +++++++++++ src/util/util.ts | 4 + 7 files changed, 511 insertions(+), 26 deletions(-) create mode 100755 git-lfs-multiparts3upload create mode 100644 src/completemultipartupload/handler.ts diff --git a/CHANGELOG.md b/CHANGELOG.md index ddf971b..8cbaa52 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +### Added +- Support for files larger than 5GB + ### Fixed - Bumped aws sdk versions to fix tests diff --git a/README.md b/README.md index dea7218..f7a7456 100644 --- a/README.md +++ b/README.md @@ -62,6 +62,19 @@ git lfs install That's it. On push/pull, you'll be prompted for Cognito credentials. +### Handling Files >= 5GB + +The backend S3 storage service won't accept files larger than 5GB using Git LFS's normal basic transfer agent. When attempting to upload them, git-lfs-s3 will reject requests that don't claim support for its custom `multipart3upload` [transfer adapter](https://github.com/git-lfs/git-lfs/blob/main/docs/custom-transfers.md). + +A python script implementing this is [located here](./git-lfs-multiparts3upload). Download it, place it in your $PATH (e.g. `/usr/local/bin`), ensure it's executable, and configure your repo to use it: + +```bash +git config --add lfs.customtransfer.multipart3upload.path git-lfs-multiparts3upload +git config --add lfs.customtransfer.multipart3upload.direction upload +``` + +(or set the `--global` option to save the options in your user .gitconfig for use with all repositories and allow easy `git clone`ing of repos) + ## Further Customization Ideas * Add an API Gateway custom domain to the API to get a better URL diff --git a/git-lfs-multiparts3upload b/git-lfs-multiparts3upload new file mode 100755 index 0000000..042a756 --- /dev/null +++ b/git-lfs-multiparts3upload @@ -0,0 +1,155 @@ +#!/usr/bin/env python3 +""" +Git LFS multipart3upload transfer agent +""" + +import json +import sys +import tempfile +from typing import Dict, Any, Optional +import urllib.error +import urllib.request + +def write_error_log(message:str, exit=True) -> None: + with tempfile.NamedTemporaryFile(delete=False, prefix='git-lfs-multipart3upload-', suffix='.log') as tmp_file: + tmp_file.write(message.encode('utf-8')) + if exit: + sys.exit(1) + +class MultipartS3UploadTransferAgent: + def read_line(self) -> Optional[str]: + """Read a line from stdin.""" + try: + return sys.stdin.readline().strip() + except Exception as e: + write_error_log(f"Error reading from stdin: {e}") + + def write_line(self, data: Dict[str, Any]) -> None: + """Write a line to stdout.""" + try: + sys.stdout.write(json.dumps(data) + "\n") + sys.stdout.flush() + except Exception as e: + write_error_log(f"Error writing to stdout: {e}") + + def send_progress(self, oid: str, bytes_so_far: int, bytes_since_last: int) -> None: + """Send a progress update.""" + self.write_line({ + "event": "progress", + "oid": oid, + "bytesSoFar": bytes_so_far, + "bytesSinceLast": bytes_since_last, + }) + + def put_request(self, url: str, data: bytes) -> Dict[str, Any]: + """Make a PUT request with the given data.""" + try: + request = urllib.request.Request( + url, + data=data, + method='PUT' + ) + with urllib.request.urlopen(request) as response: + if response.status not in (200, 201): + raise urllib.error.HTTPError( + url, response.status, + f"PUT request failed with status {response.status}", + response.headers, None + ) + return response.headers + except urllib.error.URLError as e: + write_error_log(f"Failed to make PUT request: {e}") + sys.exit(1) + + def handle_upload(self, transfer_request: Dict[str, Any]) -> None: + """Handle an upload transfer request.""" + try: + oid = transfer_request["oid"] + size = transfer_request["size"] + multipart_s3_upload_href = json.loads(transfer_request["action"]["href"]) + presigned_urls = multipart_s3_upload_href["presignedurls"] + + part_number = 1 + total_uploaded = 0 + chunk_size = size // len(presigned_urls) + completed_parts = [] + if size % len(presigned_urls) != 0: + chunk_size = min(size // (len(presigned_urls) - 1), 5_000_000_000) # s3 max upload chunk of 5GB + + with open(transfer_request["path"], 'rb') as f: + for presigned_url in presigned_urls: + if part_number == len(presigned_urls): + chunk = f.read() # For the last part, read the remaining bytes + else: + chunk = f.read(chunk_size) + + part_upload_response_headers = self.put_request(presigned_url, chunk) + completed_parts.append({'ETag': part_upload_response_headers['ETag'], 'PartNumber': part_number}) + total_uploaded += len(chunk) + self.send_progress(oid, total_uploaded, len(chunk)) + part_number += 1 + + self.put_request( + multipart_s3_upload_href["completionurl"], + json.dumps({ + "MultipartUpload": {"Parts": completed_parts}, + "UploadId": multipart_s3_upload_href["uploadid"], + }).encode('utf-8')) + + self.write_line({ + "event": "complete", + "oid": oid + }) + + except Exception as e: + write_error_log(f"Error during upload: {e}", exit=False) + self.write_line({ + "event": "error", + "oid": transfer_request["oid"], + "error": { + "code": 1, + "message": str(e) + } + }) + + def run(self): + """Main event loop.""" + try: + init_line = self.read_line() + if not init_line: + return + + init_data = json.loads(init_line) + if init_data["event"] != "init": + raise ValueError("Expected init event") + if init_data["operation"] != "upload": + self.write_line({ + "event": "error", + "oid": init_data["oid"], + "error": { + "code": 32, + "message": "Unsupported operation" + } + }) + sys.exit(1) + + self.write_line({}) # Confirm initialization + + while True: + line = self.read_line() + if not line: + break + + transfer_request = json.loads(line) + + if transfer_request["event"] == "terminate": + break + elif transfer_request["event"] == "upload": + self.handle_upload(transfer_request) + + except Exception as e: + write_error_log(f"Fatal error: {e}") + +if __name__ == "__main__": + agent = MultipartS3UploadTransferAgent() + agent.run() diff --git a/serverless.yml b/serverless.yml index 05ee763..564d198 100644 --- a/serverless.yml +++ b/serverless.yml @@ -53,6 +53,7 @@ functions: resultTtlInSeconds: 0 identitySource: method.request.header.Authorization type: request + memorySize: 512 locks: handler: src/locks/locks.handler iamRoleStatements: @@ -89,14 +90,48 @@ functions: authorizer: *authorizer - http: path: locks - method: get # list locks + method: get # list locks integration: lambda-proxy authorizer: *authorizer - http: path: locks - method: post # create lock + method: post # create lock integration: lambda-proxy authorizer: *authorizer + completemultipartupload: + handler: src/completemultipartupload/handler.handler + iamRoleStatements: + - Action: + - s3:PutObject + Effect: "Allow" + Resource: + Fn::Join: + - "" + - - Fn::GetAtt: + - StorageBucket + - Arn + - "/*" + - Action: + - s3:DeleteObject + - s3:GetObject + Effect: "Allow" + Resource: + Fn::Join: + - "" + - - Fn::GetAtt: + - StorageBucket + - Arn + - "/*.multipartuploadcomplete" + events: + - s3: + bucket: + Ref: StorageBucket + event: s3:ObjectCreated:Put + existing: true + rules: + - suffix: .multipartuploadcomplete + memorySize: 512 + timeout: 900 # complete multipart upload can take minutes authorizer: handler: src/authorizer/authorizer.handler environment: @@ -128,10 +163,17 @@ resources: ResponseType: UNAUTHORIZED RestApiId: Ref: ApiGatewayRestApi - StatusCode: '401' + StatusCode: "401" StorageBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain + Properties: + LifecycleConfiguration: + Rules: + - AbortIncompleteMultipartUpload: + DaysAfterInitiation: 1 + Id: DeleteIncompleteMultipartUploads + Status: Enabled LockTable: Type: AWS::DynamoDB::Table DeletionPolicy: Retain @@ -154,7 +196,7 @@ resources: KeyType: HASH UserPool: Type: AWS::Cognito::UserPool - Properties: + Properties: AdminCreateUserConfig: AllowAdminCreateUserOnly: true UserPoolName: "${self:service}-${opt:stage}" @@ -162,12 +204,12 @@ resources: STAGE: "${opt:stage}" UserPoolClient: Type: AWS::Cognito::UserPoolClient - Properties: + Properties: ClientName: "${self:service}-${opt:stage}" - ExplicitAuthFlows: + ExplicitAuthFlows: - ADMIN_NO_SRP_AUTH GenerateSecret: false - SupportedIdentityProviders: + SupportedIdentityProviders: - COGNITO UserPoolId: Ref: UserPool diff --git a/src/batch/batch.ts b/src/batch/batch.ts index 08720c8..2fd56f2 100644 --- a/src/batch/batch.ts +++ b/src/batch/batch.ts @@ -13,17 +13,31 @@ import { import "source-map-support/register"; import { S3Client, + CreateMultipartUploadCommand, GetObjectCommand, HeadObjectCommand, PutObjectCommand, + UploadPartCommand, } from "@aws-sdk/client-s3"; import { getSignedUrl } from "@aws-sdk/s3-request-presigner"; -import { ISODateString } from "../util/util"; +import { + completedMultipartUploadNotificationObjectSuffix, + ISODateString, +} from "../util/util"; const bucketName = process.env.BUCKET_NAME; +if (!bucketName) throw new Error("BUCKET_NAME environment variable not set"); const urlExpiry = 21600; // 6 hours; max for url from metadata creds +const basicTransferUploadSizeLimit = 5_000_000_000; +const multipart3uploadTransferTypeName = "multipart3upload"; const s3Client = new S3Client({}); +type MultipartS3UploadHref = { + completionurl: string; + presignedurls: string[]; + uploadid: string; +}; + /** Generate RFC 3339 date string set out in the future when S3 presigned URLs will expire */ export function getExpiryString(): string { const expiryDate = new Date(); @@ -42,8 +56,8 @@ async function getBatchObject( try { await s3Client.send( new HeadObjectCommand({ - Bucket: bucketName!, - Key: oid!, + Bucket: bucketName, + Key: oid, }), ); @@ -112,13 +126,115 @@ async function getBatchObject( throw new Error("getBatchObject invoked with invalid parameters"); } +/** Generate pre-signed multipart upload requests for large objects */ +async function getBatchMultipartUploadObject( + oid: string, + size: number, +): Promise> { + const baseResponse = { oid, size }; + + try { + await s3Client.send( + new HeadObjectCommand({ + Bucket: bucketName, + Key: oid, + }), + ); + + // upload request for existing object (no-op) + return baseResponse; + } catch (err) { + if (err.name === "NotFound") { + // upload request for missing object + const mimeType = "application/octet-stream"; + + console.log( + `multipart upload: Creating multipart upload for oid: ${oid}`, + ); + const createMultipartUploadRes = s3Client.send( + new CreateMultipartUploadCommand({ + Bucket: bucketName, + ContentType: mimeType, + Key: oid, + }), + ); + + // Each part completion will update progress on the client + // so there's some advantage to smaller parts + // (with an AWS maximum of 10,000 parts) + const partSize = + Math.ceil(size / 50_000_000) > 10_000 ? 5_000_000_000 : 50_000_000; + + const uploadId = (await createMultipartUploadRes).UploadId; + const uploadUrlExpiry = getExpiryString(); + const presignedurls: Promise[] = []; + for (const partNumber of Array.from( + { length: Math.ceil(size / partSize) }, + (_, i) => i + 1, + )) { + presignedurls.push( + getSignedUrl( + s3Client, + new UploadPartCommand({ + Bucket: bucketName, + Key: oid, + PartNumber: partNumber, + UploadId: uploadId, + }), + { expiresIn: urlExpiry }, + ), + ); + } + + const completionurl = getSignedUrl( + s3Client, + new PutObjectCommand({ + Bucket: bucketName, + ContentType: mimeType, + Key: oid + completedMultipartUploadNotificationObjectSuffix, + }), + { expiresIn: urlExpiry }, + ); + + console.log("multipart upload: Awaiting url generation and returning"); + return { + ...baseResponse, + actions: { + upload: { + expires_at: uploadUrlExpiry, + header: { "Content-Type": mimeType }, + href: JSON.stringify({ + completionurl: await completionurl, + presignedurls: await Promise.all(presignedurls), + uploadid: uploadId, + } as MultipartS3UploadHref), + }, + }, + authenticated: true, + }; + } else { + throw err; + } + } +} + /** Iterate through requested objects and get the storage response for each of them */ -async function getBatchResponse(body: any): Promise> { +async function getBatchResponse( + body: any, + transferType: string, +): Promise> { const objects: Array> = []; - for (const entry of body.objects) { - objects.push(getBatchObject(body.operation, entry.oid, entry.size)); + + if (transferType == "basic") { + for (const entry of body.objects) { + objects.push(getBatchObject(body.operation, entry.oid, entry.size)); + } + } else if (transferType == multipart3uploadTransferTypeName) { + for (const entry of body.objects) { + objects.push(getBatchMultipartUploadObject(entry.oid, entry.size)); + } } - return { transfer: "basic", objects: await Promise.all(objects) }; + return { transfer: transferType, objects: await Promise.all(objects) }; } /** AWS Lambda entrypoint */ @@ -140,19 +256,78 @@ export const handler: APIGatewayProxyHandler = async ( }; } - if ("transfers" in body && !body.transfers.includes("basic")) { - console.log("Invalid transfer type requested"); - return { - body: JSON.stringify({ - errorType: "BadRequest", - message: "Invalid transfer type requested; only basic is supported", - }), - statusCode: 400, - }; - } + if (["download", "upload"].includes(body.operation)) { + if ( + body.operation === "download" && + "transfers" in body && + !body.transfers.includes("basic") + ) { + console.log("Invalid transfer type requested"); + return { + body: JSON.stringify({ + errorType: "BadRequest", + message: + "Invalid transfer type requested; only basic is supported for downloads", + }), + statusCode: 400, + }; + } + + let requiresMultiPartUpload = false; + + if (body.operation === "upload") { + requiresMultiPartUpload = body.objects.some( + (obj) => obj.size > basicTransferUploadSizeLimit, + ); + if ( + requiresMultiPartUpload && + body.objects.some((obj) => obj.size > 5_000_000_000_000) + ) { + console.log("Impressive - attempt to upload a file larger than 5TB!"); + return { + body: JSON.stringify({ + errorType: "BadRequest", + message: `Maximum S3 file upload size is 5TB`, + }), + statusCode: 400, + }; + } + if ( + requiresMultiPartUpload && + (!("transfers" in body) || + !body.transfers.includes(multipart3uploadTransferTypeName)) + ) { + console.log("Invalid transfer type requested"); + return { + body: JSON.stringify({ + errorType: "BadRequest", + message: `Invalid transfer type requested; >=5GB uploads require the ${multipart3uploadTransferTypeName} transfer type`, + }), + statusCode: 400, + }; + } else if ( + !requiresMultiPartUpload && + "transfers" in body && + !body.transfers.includes("basic") + ) { + console.debug( + "No large files being uploaded but no basic transfer type requested", + ); + return { + body: JSON.stringify({ + errorType: "BadRequest", + message: + "Invalid transfer type requested; upload requires basic transfer type", + }), + statusCode: 400, + }; + } + } - if (body.operation === "upload" || body.operation === "download") { - const batchResponse = await getBatchResponse(body); + const batchResponse = await getBatchResponse( + body, + requiresMultiPartUpload ? multipart3uploadTransferTypeName : "basic", + ); return { body: JSON.stringify(batchResponse), headers: { "Content-Type": "application/vnd.git-lfs+json" }, diff --git a/src/completemultipartupload/handler.ts b/src/completemultipartupload/handler.ts new file mode 100644 index 0000000..c4409f9 --- /dev/null +++ b/src/completemultipartupload/handler.ts @@ -0,0 +1,93 @@ +/** + * Manage S3 events signaling completion of multipart uploads. + * + * @packageDocumentation + */ + +import { S3Event, S3Handler } from "aws-lambda"; +import { + CompletedMultipartUpload, + CompleteMultipartUploadCommand, + DeleteObjectCommand, + GetObjectCommand, + S3Client, +} from "@aws-sdk/client-s3"; +import { completedMultipartUploadNotificationObjectSuffix } from "../util/util"; + +interface CompletedMultipartUploadNotification { + MultipartUpload: CompletedMultipartUpload; + UploadId: string; +} + +const s3Client = new S3Client({}); + +/** AWS Lambda entrypoint */ +export const handler: S3Handler = async (event: S3Event): Promise => { + const completionPromises: Promise[] = []; + event.Records.forEach(async (record) => { + completionPromises.push( + completeS3Upload(record.s3.bucket.name, record.s3.object.key), + ); + }); + + await Promise.all(completionPromises); +}; + +/** CompleteMultipartUpload and cleanup notification */ +async function completeS3Upload(bucket: string, key: string): Promise { + const completedMultipartUploadNotification = + await retrieveCompletedMultipartUploadNotification(bucket, key); + + await s3Client.send( + new CompleteMultipartUploadCommand({ + Bucket: bucket, + Key: key.replace( + RegExp(completedMultipartUploadNotificationObjectSuffix + "$"), + "", + ), + MultipartUpload: completedMultipartUploadNotification.MultipartUpload, + UploadId: completedMultipartUploadNotification.UploadId, + }), + ); + + await s3Client.send( + new DeleteObjectCommand({ + Bucket: bucket, + Key: key, + }), + ); +} + +/** Read file to string from S3 */ +async function retrieveObjectFromS3( + bucket: string, + key: string, +): Promise { + const response = await s3Client.send( + new GetObjectCommand({ + Bucket: bucket, + Key: key, + }), + ); + if (response.Body) { + return await response.Body.transformToString(); + } else { + throw new Error("Failed to retrieve S3 object"); + } +} + +async function retrieveCompletedMultipartUploadNotification( + bucket: string, + key: string, +): Promise { + const stringifiedNotification = await retrieveObjectFromS3(bucket, key); + + try { + const parsedNotification = JSON.parse( + stringifiedNotification, + ) as CompletedMultipartUploadNotification; + return parsedNotification; + } catch (error) { + throw new Error(`Failed to parse object contents as JSON: ${error}`); + } +} diff --git a/src/util/util.ts b/src/util/util.ts index e13f7d0..effa6fc 100644 --- a/src/util/util.ts +++ b/src/util/util.ts @@ -24,3 +24,7 @@ export function ISODateString(d: Date): string { "Z" ); } + +// Kept in sync with s3 event trigger in serverless.yml +export const completedMultipartUploadNotificationObjectSuffix = + ".multipartuploadcomplete";