mirror of
https://github.com/dutchcoders/transfer.sh.git
synced 2024-11-29 15:40:19 +01:00
fc844ac341
* Upgrade aws-sdk-go to v2 `aws-sdk-go-v2` is the newer SDK version, replacing the one being used at the moment by the project. This change maintains full compatibility with existing flags and configurations, and only replaces the underlying library. * Simplify and isolate AWS config logic
191 lines
4.6 KiB
Go
191 lines
4.6 KiB
Go
package storage
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
)
|
|
|
|
// S3Storage is a storage backed by AWS S3
|
|
type S3Storage struct {
|
|
Storage
|
|
bucket string
|
|
s3 *s3.Client
|
|
logger *log.Logger
|
|
purgeDays time.Duration
|
|
noMultipart bool
|
|
}
|
|
|
|
// NewS3Storage is the factory for S3Storage
|
|
func NewS3Storage(ctx context.Context, accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
|
|
cfg, err := getAwsConfig(ctx, accessKey, secretKey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
|
o.Region = region
|
|
o.UsePathStyle = forcePathStyle
|
|
if len(endpoint) > 0 {
|
|
o.EndpointResolver = s3.EndpointResolverFromURL(endpoint)
|
|
}
|
|
})
|
|
|
|
return &S3Storage{
|
|
bucket: bucketName,
|
|
s3: client,
|
|
logger: logger,
|
|
noMultipart: disableMultipart,
|
|
purgeDays: time.Duration(purgeDays*24) * time.Hour,
|
|
}, nil
|
|
}
|
|
|
|
// Type returns the storage type
|
|
func (s *S3Storage) Type() string {
|
|
return "s3"
|
|
}
|
|
|
|
// Head retrieves content length of a file from storage
|
|
func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
|
|
headRequest := &s3.HeadObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
}
|
|
|
|
// content type , content length
|
|
response, err := s.s3.HeadObject(ctx, headRequest)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
contentLength = uint64(response.ContentLength)
|
|
|
|
return
|
|
}
|
|
|
|
// Purge cleans up the storage
|
|
func (s *S3Storage) Purge(context.Context, time.Duration) (err error) {
|
|
// NOOP expiration is set at upload time
|
|
return nil
|
|
}
|
|
|
|
// IsNotExist indicates if a file doesn't exist on storage
|
|
func (s *S3Storage) IsNotExist(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
|
|
var nkerr *types.NoSuchKey
|
|
return errors.As(err, &nkerr)
|
|
}
|
|
|
|
// Get retrieves a file from storage
|
|
func (s *S3Storage) Get(ctx context.Context, token string, filename string, rng *Range) (reader io.ReadCloser, contentLength uint64, err error) {
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
|
|
getRequest := &s3.GetObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
}
|
|
|
|
if rng != nil {
|
|
getRequest.Range = aws.String(rng.Range())
|
|
}
|
|
|
|
response, err := s.s3.GetObject(ctx, getRequest)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
contentLength = uint64(response.ContentLength)
|
|
if rng != nil && response.ContentRange != nil {
|
|
rng.SetContentRange(*response.ContentRange)
|
|
}
|
|
|
|
reader = response.Body
|
|
return
|
|
}
|
|
|
|
// Delete removes a file from storage
|
|
func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
|
|
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
|
|
deleteRequest := &s3.DeleteObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(metadata),
|
|
}
|
|
|
|
_, err = s.s3.DeleteObject(ctx, deleteRequest)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
deleteRequest = &s3.DeleteObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
}
|
|
|
|
_, err = s.s3.DeleteObject(ctx, deleteRequest)
|
|
|
|
return
|
|
}
|
|
|
|
// Put saves a file on storage
|
|
func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, _ uint64) (err error) {
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
|
|
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
|
|
var concurrency int
|
|
if !s.noMultipart {
|
|
concurrency = 20
|
|
} else {
|
|
concurrency = 1
|
|
}
|
|
|
|
// Create an uploader with the session and custom options
|
|
uploader := manager.NewUploader(s.s3, func(u *manager.Uploader) {
|
|
u.Concurrency = concurrency // default is 5
|
|
u.LeavePartsOnError = false
|
|
})
|
|
|
|
var expire *time.Time
|
|
if s.purgeDays.Hours() > 0 {
|
|
expire = aws.Time(time.Now().Add(s.purgeDays))
|
|
}
|
|
|
|
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
|
|
Bucket: aws.String(s.bucket),
|
|
Key: aws.String(key),
|
|
Body: reader,
|
|
Expires: expire,
|
|
ContentType: aws.String(contentType),
|
|
})
|
|
|
|
return
|
|
}
|
|
|
|
func (s *S3Storage) IsRangeSupported() bool { return true }
|
|
|
|
func getAwsConfig(ctx context.Context, accessKey, secretKey string) (aws.Config, error) {
|
|
return config.LoadDefaultConfig(ctx,
|
|
config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
|
|
Value: aws.Credentials{
|
|
AccessKeyID: accessKey,
|
|
SecretAccessKey: secretKey,
|
|
SessionToken: "",
|
|
},
|
|
}),
|
|
)
|
|
}
|