2022-07-13 06:26:42 +02:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
2023-05-24 11:37:06 +02:00
|
|
|
"errors"
|
2022-07-13 06:26:42 +02:00
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"log"
|
|
|
|
"time"
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
"github.com/aws/aws-sdk-go-v2/aws"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/config"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/credentials"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/service/s3/types"
|
2022-07-13 06:26:42 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
// S3Storage is a storage backed by AWS S3
|
|
|
|
type S3Storage struct {
|
|
|
|
Storage
|
|
|
|
bucket string
|
2023-05-24 11:37:06 +02:00
|
|
|
s3 *s3.Client
|
2022-07-13 06:26:42 +02:00
|
|
|
logger *log.Logger
|
|
|
|
purgeDays time.Duration
|
|
|
|
noMultipart bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewS3Storage is the factory for S3Storage
|
2023-05-24 11:37:06 +02:00
|
|
|
func NewS3Storage(ctx context.Context, accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
|
|
|
|
cfg, err := getAwsConfig(ctx, accessKey, secretKey)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
client := s3.NewFromConfig(cfg, func(o *s3.Options) {
|
|
|
|
o.Region = region
|
|
|
|
o.UsePathStyle = forcePathStyle
|
|
|
|
if len(endpoint) > 0 {
|
|
|
|
o.EndpointResolver = s3.EndpointResolverFromURL(endpoint)
|
|
|
|
}
|
|
|
|
})
|
2022-07-13 06:26:42 +02:00
|
|
|
|
|
|
|
return &S3Storage{
|
|
|
|
bucket: bucketName,
|
2023-05-24 11:37:06 +02:00
|
|
|
s3: client,
|
2022-07-13 06:26:42 +02:00
|
|
|
logger: logger,
|
|
|
|
noMultipart: disableMultipart,
|
|
|
|
purgeDays: time.Duration(purgeDays*24) * time.Hour,
|
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Type returns the storage type
|
|
|
|
func (s *S3Storage) Type() string {
|
|
|
|
return "s3"
|
|
|
|
}
|
|
|
|
|
|
|
|
// Head retrieves content length of a file from storage
|
|
|
|
func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
|
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
|
|
|
|
|
|
headRequest := &s3.HeadObjectInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
}
|
|
|
|
|
|
|
|
// content type , content length
|
2023-05-24 11:37:06 +02:00
|
|
|
response, err := s.s3.HeadObject(ctx, headRequest)
|
2022-07-13 06:26:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
contentLength = uint64(response.ContentLength)
|
2022-07-13 06:26:42 +02:00
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Purge cleans up the storage
|
2022-07-14 18:02:18 +02:00
|
|
|
func (s *S3Storage) Purge(context.Context, time.Duration) (err error) {
|
2022-07-13 06:26:42 +02:00
|
|
|
// NOOP expiration is set at upload time
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// IsNotExist indicates if a file doesn't exist on storage
|
|
|
|
func (s *S3Storage) IsNotExist(err error) bool {
|
|
|
|
if err == nil {
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
var nkerr *types.NoSuchKey
|
|
|
|
return errors.As(err, &nkerr)
|
2022-07-13 06:26:42 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Get retrieves a file from storage
|
2023-03-10 17:41:43 +01:00
|
|
|
func (s *S3Storage) Get(ctx context.Context, token string, filename string, rng *Range) (reader io.ReadCloser, contentLength uint64, err error) {
|
2022-07-13 06:26:42 +02:00
|
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
|
|
|
|
|
|
getRequest := &s3.GetObjectInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
}
|
|
|
|
|
2023-03-10 17:41:43 +01:00
|
|
|
if rng != nil {
|
|
|
|
getRequest.Range = aws.String(rng.Range())
|
|
|
|
}
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
response, err := s.s3.GetObject(ctx, getRequest)
|
2022-07-13 06:26:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
contentLength = uint64(response.ContentLength)
|
2023-03-10 17:41:43 +01:00
|
|
|
if rng != nil && response.ContentRange != nil {
|
|
|
|
rng.SetContentRange(*response.ContentRange)
|
|
|
|
}
|
2022-07-13 06:26:42 +02:00
|
|
|
|
|
|
|
reader = response.Body
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete removes a file from storage
|
|
|
|
func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
|
|
|
|
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
|
|
|
|
deleteRequest := &s3.DeleteObjectInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(metadata),
|
|
|
|
}
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
_, err = s.s3.DeleteObject(ctx, deleteRequest)
|
2022-07-13 06:26:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
|
|
deleteRequest = &s3.DeleteObjectInput{
|
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
}
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
_, err = s.s3.DeleteObject(ctx, deleteRequest)
|
2022-07-13 06:26:42 +02:00
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Put saves a file on storage
|
2022-07-14 18:02:18 +02:00
|
|
|
func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, _ uint64) (err error) {
|
2022-07-13 06:26:42 +02:00
|
|
|
key := fmt.Sprintf("%s/%s", token, filename)
|
|
|
|
|
|
|
|
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
|
|
|
|
var concurrency int
|
|
|
|
if !s.noMultipart {
|
|
|
|
concurrency = 20
|
|
|
|
} else {
|
|
|
|
concurrency = 1
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create an uploader with the session and custom options
|
2023-05-24 11:37:06 +02:00
|
|
|
uploader := manager.NewUploader(s.s3, func(u *manager.Uploader) {
|
2022-07-13 06:26:42 +02:00
|
|
|
u.Concurrency = concurrency // default is 5
|
|
|
|
u.LeavePartsOnError = false
|
|
|
|
})
|
|
|
|
|
|
|
|
var expire *time.Time
|
|
|
|
if s.purgeDays.Hours() > 0 {
|
|
|
|
expire = aws.Time(time.Now().Add(s.purgeDays))
|
|
|
|
}
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
_, err = uploader.Upload(ctx, &s3.PutObjectInput{
|
2022-07-14 18:02:18 +02:00
|
|
|
Bucket: aws.String(s.bucket),
|
|
|
|
Key: aws.String(key),
|
|
|
|
Body: reader,
|
|
|
|
Expires: expire,
|
|
|
|
ContentType: aws.String(contentType),
|
2022-07-13 06:26:42 +02:00
|
|
|
})
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2023-03-10 17:41:43 +01:00
|
|
|
func (s *S3Storage) IsRangeSupported() bool { return true }
|
|
|
|
|
2023-05-24 11:37:06 +02:00
|
|
|
func getAwsConfig(ctx context.Context, accessKey, secretKey string) (aws.Config, error) {
|
|
|
|
return config.LoadDefaultConfig(ctx,
|
|
|
|
config.WithCredentialsProvider(credentials.StaticCredentialsProvider{
|
|
|
|
Value: aws.Credentials{
|
|
|
|
AccessKeyID: accessKey,
|
|
|
|
SecretAccessKey: secretKey,
|
|
|
|
SessionToken: "",
|
|
|
|
},
|
|
|
|
}),
|
|
|
|
)
|
2022-07-13 06:26:42 +02:00
|
|
|
}
|