transfer.sh/server/storage/s3.go
Michael Manganiello fc844ac341
Upgrade aws-sdk-go to v2 (#559)
* Upgrade aws-sdk-go to v2

`aws-sdk-go-v2` is the newer SDK version, replacing the one being used
at the moment by the project.

This change maintains full compatibility with existing flags and
configurations, and only replaces the underlying library.

* Simplify and isolate AWS config logic
2023-05-24 18:37:06 +09:00

191 lines
4.6 KiB
Go

package storage
import (
"context"
"errors"
"fmt"
"io"
"log"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)
// S3Storage is a storage backed by AWS S3
type S3Storage struct {
Storage
bucket string
s3 *s3.Client
logger *log.Logger
purgeDays time.Duration
noMultipart bool
}
// NewS3Storage is the factory for S3Storage
func NewS3Storage(ctx context.Context, accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
cfg, err := getAwsConfig(ctx, accessKey, secretKey)
if err != nil {
return nil, err
}
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.Region = region
o.UsePathStyle = forcePathStyle
if len(endpoint) > 0 {
o.EndpointResolver = s3.EndpointResolverFromURL(endpoint)
}
})
return &S3Storage{
bucket: bucketName,
s3: client,
logger: logger,
noMultipart: disableMultipart,
purgeDays: time.Duration(purgeDays*24) * time.Hour,
}, nil
}
// Type returns the storage type
func (s *S3Storage) Type() string {
return "s3"
}
// Head retrieves content length of a file from storage
func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
headRequest := &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
// content type , content length
response, err := s.s3.HeadObject(ctx, headRequest)
if err != nil {
return
}
contentLength = uint64(response.ContentLength)
return
}
// Purge cleans up the storage
func (s *S3Storage) Purge(context.Context, time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}
// IsNotExist indicates if a file doesn't exist on storage
func (s *S3Storage) IsNotExist(err error) bool {
if err == nil {
return false
}
var nkerr *types.NoSuchKey
return errors.As(err, &nkerr)
}
// Get retrieves a file from storage
func (s *S3Storage) Get(ctx context.Context, token string, filename string, rng *Range) (reader io.ReadCloser, contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
getRequest := &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
if rng != nil {
getRequest.Range = aws.String(rng.Range())
}
response, err := s.s3.GetObject(ctx, getRequest)
if err != nil {
return
}
contentLength = uint64(response.ContentLength)
if rng != nil && response.ContentRange != nil {
rng.SetContentRange(*response.ContentRange)
}
reader = response.Body
return
}
// Delete removes a file from storage
func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
deleteRequest := &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(metadata),
}
_, err = s.s3.DeleteObject(ctx, deleteRequest)
if err != nil {
return
}
key := fmt.Sprintf("%s/%s", token, filename)
deleteRequest = &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
_, err = s.s3.DeleteObject(ctx, deleteRequest)
return
}
// Put saves a file on storage
func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, _ uint64) (err error) {
key := fmt.Sprintf("%s/%s", token, filename)
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
var concurrency int
if !s.noMultipart {
concurrency = 20
} else {
concurrency = 1
}
// Create an uploader with the session and custom options
uploader := manager.NewUploader(s.s3, func(u *manager.Uploader) {
u.Concurrency = concurrency // default is 5
u.LeavePartsOnError = false
})
var expire *time.Time
if s.purgeDays.Hours() > 0 {
expire = aws.Time(time.Now().Add(s.purgeDays))
}
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
Expires: expire,
ContentType: aws.String(contentType),
})
return
}
func (s *S3Storage) IsRangeSupported() bool { return true }
func getAwsConfig(ctx context.Context, accessKey, secretKey string) (aws.Config, error) {
return config.LoadDefaultConfig(ctx,
config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
Value: aws.Credentials{
AccessKeyID: accessKey,
SecretAccessKey: secretKey,
SessionToken: "",
},
}),
)
}