2022-07-13 06:26:42 +02:00
|
|
|
package storage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"errors"
|
|
|
|
"io"
|
|
|
|
"log"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"storj.io/common/fpath"
|
|
|
|
"storj.io/common/storj"
|
|
|
|
"storj.io/uplink"
|
|
|
|
)
|
|
|
|
|
|
|
|
// StorjStorage is a storage backed by Storj
|
|
|
|
type StorjStorage struct {
|
|
|
|
Storage
|
|
|
|
project *uplink.Project
|
|
|
|
bucket *uplink.Bucket
|
|
|
|
purgeDays time.Duration
|
|
|
|
logger *log.Logger
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewStorjStorage is the factory for StorjStorage
|
2023-05-19 07:51:40 +02:00
|
|
|
func NewStorjStorage(ctx context.Context, access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
|
2022-07-13 06:26:42 +02:00
|
|
|
var instance StorjStorage
|
|
|
|
var err error
|
|
|
|
|
2023-05-19 07:51:40 +02:00
|
|
|
ctx = fpath.WithTempData(ctx, "", true)
|
2022-07-13 06:26:42 +02:00
|
|
|
|
|
|
|
uplConf := &uplink.Config{
|
|
|
|
UserAgent: "transfer-sh",
|
|
|
|
}
|
|
|
|
|
|
|
|
parsedAccess, err := uplink.ParseAccess(access)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
instance.project, err = uplConf.OpenProject(ctx, parsedAccess)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
|
|
|
|
if err != nil {
|
|
|
|
//Ignoring the error to return the one that occurred first, but try to clean up.
|
|
|
|
_ = instance.project.Close()
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
instance.purgeDays = time.Duration(purgeDays*24) * time.Hour
|
|
|
|
|
|
|
|
instance.logger = logger
|
|
|
|
|
|
|
|
return &instance, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Type returns the storage type
|
|
|
|
func (s *StorjStorage) Type() string {
|
|
|
|
return "storj"
|
|
|
|
}
|
|
|
|
|
|
|
|
// Head retrieves content length of a file from storage
|
|
|
|
func (s *StorjStorage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
|
|
|
key := storj.JoinPaths(token, filename)
|
|
|
|
|
|
|
|
obj, err := s.project.StatObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
|
|
|
|
if err != nil {
|
|
|
|
return 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
contentLength = uint64(obj.System.ContentLength)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get retrieves a file from storage
|
2023-03-10 17:41:43 +01:00
|
|
|
func (s *StorjStorage) Get(ctx context.Context, token string, filename string, rng *Range) (reader io.ReadCloser, contentLength uint64, err error) {
|
2022-07-13 06:26:42 +02:00
|
|
|
key := storj.JoinPaths(token, filename)
|
|
|
|
|
|
|
|
s.logger.Printf("Getting file %s from Storj Bucket", filename)
|
|
|
|
|
2023-03-10 17:41:43 +01:00
|
|
|
var options *uplink.DownloadOptions
|
|
|
|
if rng != nil {
|
|
|
|
options = new(uplink.DownloadOptions)
|
|
|
|
options.Offset = int64(rng.Start)
|
|
|
|
if rng.Limit > 0 {
|
|
|
|
options.Length = int64(rng.Limit)
|
|
|
|
} else {
|
|
|
|
options.Length = -1
|
|
|
|
}
|
|
|
|
}
|
2022-07-14 18:02:18 +02:00
|
|
|
|
2023-03-10 17:41:43 +01:00
|
|
|
download, err := s.project.DownloadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, options)
|
2022-07-13 06:26:42 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, 0, err
|
|
|
|
}
|
|
|
|
|
|
|
|
contentLength = uint64(download.Info().System.ContentLength)
|
2023-03-10 17:41:43 +01:00
|
|
|
if rng != nil {
|
|
|
|
contentLength = rng.AcceptLength(contentLength)
|
|
|
|
}
|
2022-07-13 06:26:42 +02:00
|
|
|
|
|
|
|
reader = download
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Delete removes a file from storage
|
|
|
|
func (s *StorjStorage) Delete(ctx context.Context, token string, filename string) (err error) {
|
|
|
|
key := storj.JoinPaths(token, filename)
|
|
|
|
|
|
|
|
s.logger.Printf("Deleting file %s from Storj Bucket", filename)
|
|
|
|
|
|
|
|
_, err = s.project.DeleteObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
|
|
|
|
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Purge cleans up the storage
|
2022-07-14 18:02:18 +02:00
|
|
|
func (s *StorjStorage) Purge(context.Context, time.Duration) (err error) {
|
2022-07-13 06:26:42 +02:00
|
|
|
// NOOP expiration is set at upload time
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Put saves a file on storage
|
|
|
|
func (s *StorjStorage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
|
|
|
|
key := storj.JoinPaths(token, filename)
|
|
|
|
|
|
|
|
s.logger.Printf("Uploading file %s to Storj Bucket", filename)
|
|
|
|
|
|
|
|
var uploadOptions *uplink.UploadOptions
|
|
|
|
if s.purgeDays.Hours() > 0 {
|
|
|
|
uploadOptions = &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)}
|
|
|
|
}
|
|
|
|
|
|
|
|
writer, err := s.project.UploadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, uploadOptions)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
n, err := io.Copy(writer, reader)
|
|
|
|
if err != nil || uint64(n) != contentLength {
|
|
|
|
//Ignoring the error to return the one that occurred first, but try to clean up.
|
|
|
|
_ = writer.Abort()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
|
|
|
|
if err != nil {
|
|
|
|
//Ignoring the error to return the one that occurred first, but try to clean up.
|
|
|
|
_ = writer.Abort()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
err = writer.Commit()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-03-10 17:41:43 +01:00
|
|
|
func (s *StorjStorage) IsRangeSupported() bool { return true }
|
|
|
|
|
2022-07-13 06:26:42 +02:00
|
|
|
// IsNotExist indicates if a file doesn't exist on storage
|
|
|
|
func (s *StorjStorage) IsNotExist(err error) bool {
|
|
|
|
return errors.Is(err, uplink.ErrObjectNotFound)
|
|
|
|
}
|