mirror of
https://github.com/dutchcoders/transfer.sh.git
synced 2024-11-26 14:10:18 +01:00
server: reorganize storage layer into more clear subfolder (#496)
* server: reorganize storage layer into more clear subfolder * cmd: fix version command
This commit is contained in:
parent
35e794220b
commit
21812d3efc
10 changed files with 892 additions and 854 deletions
25
cmd/cmd.go
25
cmd/cmd.go
|
@ -2,6 +2,7 @@ package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/dutchcoders/transfer.sh/server/storage"
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -299,8 +300,8 @@ type Cmd struct {
|
||||||
*cli.App
|
*cli.App
|
||||||
}
|
}
|
||||||
|
|
||||||
func versionAction(c *cli.Context) {
|
func versionCommand(ctx *cli.Context) {
|
||||||
fmt.Println(color.YellowString(fmt.Sprintf("transfer.sh %s: Easy file sharing from the command line", Version)))
|
fmt.Println(color.YellowString("transfer.sh %s: Easy file sharing from the command line", Version))
|
||||||
}
|
}
|
||||||
|
|
||||||
// New is the factory for transfer.sh
|
// New is the factory for transfer.sh
|
||||||
|
@ -318,7 +319,7 @@ func New() *Cmd {
|
||||||
app.Commands = []cli.Command{
|
app.Commands = []cli.Command{
|
||||||
{
|
{
|
||||||
Name: "version",
|
Name: "version",
|
||||||
Action: versionAction,
|
Action: versionCommand,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -327,7 +328,7 @@ func New() *Cmd {
|
||||||
}
|
}
|
||||||
|
|
||||||
app.Action = func(c *cli.Context) {
|
app.Action = func(c *cli.Context) {
|
||||||
options := []server.OptionFn{}
|
var options []server.OptionFn
|
||||||
if v := c.String("listener"); v != "" {
|
if v := c.String("listener"); v != "" {
|
||||||
options = append(options, server.Listener(v))
|
options = append(options, server.Listener(v))
|
||||||
}
|
}
|
||||||
|
@ -463,10 +464,10 @@ func New() *Cmd {
|
||||||
panic("secret-key not set.")
|
panic("secret-key not set.")
|
||||||
} else if bucket := c.String("bucket"); bucket == "" {
|
} else if bucket := c.String("bucket"); bucket == "" {
|
||||||
panic("bucket not set.")
|
panic("bucket not set.")
|
||||||
} else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket, purgeDays, c.String("s3-region"), c.String("s3-endpoint"), c.Bool("s3-no-multipart"), c.Bool("s3-path-style"), logger); err != nil {
|
} else if store, err := storage.NewS3Storage(accessKey, secretKey, bucket, purgeDays, c.String("s3-region"), c.String("s3-endpoint"), c.Bool("s3-no-multipart"), c.Bool("s3-path-style"), logger); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
} else {
|
} else {
|
||||||
options = append(options, server.UseStorage(storage))
|
options = append(options, server.UseStorage(store))
|
||||||
}
|
}
|
||||||
case "gdrive":
|
case "gdrive":
|
||||||
chunkSize := c.Int("gdrive-chunk-size")
|
chunkSize := c.Int("gdrive-chunk-size")
|
||||||
|
@ -477,28 +478,28 @@ func New() *Cmd {
|
||||||
panic("local-config-path not set.")
|
panic("local-config-path not set.")
|
||||||
} else if basedir := c.String("basedir"); basedir == "" {
|
} else if basedir := c.String("basedir"); basedir == "" {
|
||||||
panic("basedir not set.")
|
panic("basedir not set.")
|
||||||
} else if storage, err := server.NewGDriveStorage(clientJSONFilepath, localConfigPath, basedir, chunkSize, logger); err != nil {
|
} else if store, err := storage.NewGDriveStorage(clientJSONFilepath, localConfigPath, basedir, chunkSize, logger); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
} else {
|
} else {
|
||||||
options = append(options, server.UseStorage(storage))
|
options = append(options, server.UseStorage(store))
|
||||||
}
|
}
|
||||||
case "storj":
|
case "storj":
|
||||||
if access := c.String("storj-access"); access == "" {
|
if access := c.String("storj-access"); access == "" {
|
||||||
panic("storj-access not set.")
|
panic("storj-access not set.")
|
||||||
} else if bucket := c.String("storj-bucket"); bucket == "" {
|
} else if bucket := c.String("storj-bucket"); bucket == "" {
|
||||||
panic("storj-bucket not set.")
|
panic("storj-bucket not set.")
|
||||||
} else if storage, err := server.NewStorjStorage(access, bucket, purgeDays, logger); err != nil {
|
} else if store, err := storage.NewStorjStorage(access, bucket, purgeDays, logger); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
} else {
|
} else {
|
||||||
options = append(options, server.UseStorage(storage))
|
options = append(options, server.UseStorage(store))
|
||||||
}
|
}
|
||||||
case "local":
|
case "local":
|
||||||
if v := c.String("basedir"); v == "" {
|
if v := c.String("basedir"); v == "" {
|
||||||
panic("basedir not set.")
|
panic("basedir not set.")
|
||||||
} else if storage, err := server.NewLocalStorage(v, logger); err != nil {
|
} else if store, err := storage.NewLocalStorage(v, logger); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
} else {
|
} else {
|
||||||
options = append(options, server.UseStorage(storage))
|
options = append(options, server.UseStorage(store))
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
panic("Provider not set or invalid.")
|
panic("Provider not set or invalid.")
|
||||||
|
|
|
@ -36,6 +36,7 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"github.com/dutchcoders/transfer.sh/server/storage"
|
||||||
"html"
|
"html"
|
||||||
html_template "html/template"
|
html_template "html/template"
|
||||||
"io"
|
"io"
|
||||||
|
@ -459,7 +460,7 @@ func (s *Server) putHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
contentLength := r.ContentLength
|
contentLength := r.ContentLength
|
||||||
|
|
||||||
defer CloseCheck(r.Body.Close)
|
defer storage.CloseCheck(r.Body.Close)
|
||||||
|
|
||||||
file, err := ioutil.TempFile(s.tempPath, "transfer-")
|
file, err := ioutil.TempFile(s.tempPath, "transfer-")
|
||||||
defer s.cleanTmpFile(file)
|
defer s.cleanTmpFile(file)
|
||||||
|
@ -692,7 +693,7 @@ func (s *Server) checkMetadata(ctx context.Context, token, filename string, incr
|
||||||
var metadata metadata
|
var metadata metadata
|
||||||
|
|
||||||
r, _, err := s.storage.Get(ctx, token, fmt.Sprintf("%s.metadata", filename))
|
r, _, err := s.storage.Get(ctx, token, fmt.Sprintf("%s.metadata", filename))
|
||||||
defer CloseCheck(r.Close)
|
defer storage.CloseCheck(r.Close)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return metadata, err
|
return metadata, err
|
||||||
|
@ -728,7 +729,7 @@ func (s *Server) checkDeletionToken(ctx context.Context, deletionToken, token, f
|
||||||
var metadata metadata
|
var metadata metadata
|
||||||
|
|
||||||
r, _, err := s.storage.Get(ctx, token, fmt.Sprintf("%s.metadata", filename))
|
r, _, err := s.storage.Get(ctx, token, fmt.Sprintf("%s.metadata", filename))
|
||||||
defer CloseCheck(r.Close)
|
defer storage.CloseCheck(r.Close)
|
||||||
|
|
||||||
if s.storage.IsNotExist(err) {
|
if s.storage.IsNotExist(err) {
|
||||||
return errors.New("metadata doesn't exist")
|
return errors.New("metadata doesn't exist")
|
||||||
|
@ -806,7 +807,7 @@ func (s *Server) zipHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, _, err := s.storage.Get(r.Context(), token, filename)
|
reader, _, err := s.storage.Get(r.Context(), token, filename)
|
||||||
defer CloseCheck(reader.Close)
|
defer storage.CloseCheck(reader.Close)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if s.storage.IsNotExist(err) {
|
if s.storage.IsNotExist(err) {
|
||||||
|
@ -859,10 +860,10 @@ func (s *Server) tarGzHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
commonHeader(w, tarfilename)
|
commonHeader(w, tarfilename)
|
||||||
|
|
||||||
gw := gzip.NewWriter(w)
|
gw := gzip.NewWriter(w)
|
||||||
defer CloseCheck(gw.Close)
|
defer storage.CloseCheck(gw.Close)
|
||||||
|
|
||||||
zw := tar.NewWriter(gw)
|
zw := tar.NewWriter(gw)
|
||||||
defer CloseCheck(zw.Close)
|
defer storage.CloseCheck(zw.Close)
|
||||||
|
|
||||||
for _, key := range strings.Split(files, ",") {
|
for _, key := range strings.Split(files, ",") {
|
||||||
key = resolveKey(key, s.proxyPath)
|
key = resolveKey(key, s.proxyPath)
|
||||||
|
@ -876,7 +877,7 @@ func (s *Server) tarGzHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, contentLength, err := s.storage.Get(r.Context(), token, filename)
|
reader, contentLength, err := s.storage.Get(r.Context(), token, filename)
|
||||||
defer CloseCheck(reader.Close)
|
defer storage.CloseCheck(reader.Close)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if s.storage.IsNotExist(err) {
|
if s.storage.IsNotExist(err) {
|
||||||
|
@ -920,7 +921,7 @@ func (s *Server) tarHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
commonHeader(w, tarfilename)
|
commonHeader(w, tarfilename)
|
||||||
|
|
||||||
zw := tar.NewWriter(w)
|
zw := tar.NewWriter(w)
|
||||||
defer CloseCheck(zw.Close)
|
defer storage.CloseCheck(zw.Close)
|
||||||
|
|
||||||
for _, key := range strings.Split(files, ",") {
|
for _, key := range strings.Split(files, ",") {
|
||||||
key = resolveKey(key, s.proxyPath)
|
key = resolveKey(key, s.proxyPath)
|
||||||
|
@ -934,7 +935,7 @@ func (s *Server) tarHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
reader, contentLength, err := s.storage.Get(r.Context(), token, filename)
|
reader, contentLength, err := s.storage.Get(r.Context(), token, filename)
|
||||||
defer CloseCheck(reader.Close)
|
defer storage.CloseCheck(reader.Close)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if s.storage.IsNotExist(err) {
|
if s.storage.IsNotExist(err) {
|
||||||
|
@ -1018,7 +1019,7 @@ func (s *Server) getHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
contentType := metadata.ContentType
|
contentType := metadata.ContentType
|
||||||
reader, contentLength, err := s.storage.Get(r.Context(), token, filename)
|
reader, contentLength, err := s.storage.Get(r.Context(), token, filename)
|
||||||
defer CloseCheck(reader.Close)
|
defer storage.CloseCheck(reader.Close)
|
||||||
|
|
||||||
if s.storage.IsNotExist(err) {
|
if s.storage.IsNotExist(err) {
|
||||||
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
|
http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
|
||||||
|
|
|
@ -52,6 +52,7 @@ import (
|
||||||
"golang.org/x/crypto/acme/autocert"
|
"golang.org/x/crypto/acme/autocert"
|
||||||
|
|
||||||
web "github.com/dutchcoders/transfer.sh-web"
|
web "github.com/dutchcoders/transfer.sh-web"
|
||||||
|
"github.com/dutchcoders/transfer.sh/server/storage"
|
||||||
assetfs "github.com/elazarl/go-bindata-assetfs"
|
assetfs "github.com/elazarl/go-bindata-assetfs"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -243,7 +244,7 @@ func EnableProfiler() OptionFn {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UseStorage set storage to use
|
// UseStorage set storage to use
|
||||||
func UseStorage(s Storage) OptionFn {
|
func UseStorage(s storage.Storage) OptionFn {
|
||||||
return func(srvr *Server) {
|
return func(srvr *Server) {
|
||||||
srvr.storage = s
|
srvr.storage = s
|
||||||
}
|
}
|
||||||
|
@ -332,7 +333,7 @@ type Server struct {
|
||||||
purgeDays time.Duration
|
purgeDays time.Duration
|
||||||
purgeInterval time.Duration
|
purgeInterval time.Duration
|
||||||
|
|
||||||
storage Storage
|
storage storage.Storage
|
||||||
|
|
||||||
forceHTTPS bool
|
forceHTTPS bool
|
||||||
|
|
||||||
|
|
|
@ -1,812 +0,0 @@
|
||||||
package server
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
|
||||||
"log"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"path/filepath"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
|
||||||
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
|
||||||
"github.com/aws/aws-sdk-go/service/s3"
|
|
||||||
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
|
||||||
"golang.org/x/oauth2"
|
|
||||||
"golang.org/x/oauth2/google"
|
|
||||||
drive "google.golang.org/api/drive/v3"
|
|
||||||
"google.golang.org/api/googleapi"
|
|
||||||
|
|
||||||
"storj.io/common/fpath"
|
|
||||||
"storj.io/common/storj"
|
|
||||||
"storj.io/uplink"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Storage is the interface for storage operation
|
|
||||||
type Storage interface {
|
|
||||||
// Get retrieves a file from storage
|
|
||||||
Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
|
|
||||||
// Head retrieves content length of a file from storage
|
|
||||||
Head(ctx context.Context, token string, filename string) (contentLength uint64, err error)
|
|
||||||
// Put saves a file on storage
|
|
||||||
Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
|
|
||||||
// Delete removes a file from storage
|
|
||||||
Delete(ctx context.Context, token string, filename string) error
|
|
||||||
// IsNotExist indicates if a file doesn't exist on storage
|
|
||||||
IsNotExist(err error) bool
|
|
||||||
// Purge cleans up the storage
|
|
||||||
Purge(ctx context.Context, days time.Duration) error
|
|
||||||
|
|
||||||
// Type returns the storage type
|
|
||||||
Type() string
|
|
||||||
}
|
|
||||||
|
|
||||||
// LocalStorage is a local storage
|
|
||||||
type LocalStorage struct {
|
|
||||||
Storage
|
|
||||||
basedir string
|
|
||||||
logger *log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewLocalStorage is the factory for LocalStorage
|
|
||||||
func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
|
|
||||||
return &LocalStorage{basedir: basedir, logger: logger}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Type returns the storage type
|
|
||||||
func (s *LocalStorage) Type() string {
|
|
||||||
return "local"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Head retrieves content length of a file from storage
|
|
||||||
func (s *LocalStorage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
|
||||||
path := filepath.Join(s.basedir, token, filename)
|
|
||||||
|
|
||||||
var fi os.FileInfo
|
|
||||||
if fi, err = os.Lstat(path); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
contentLength = uint64(fi.Size())
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves a file from storage
|
|
||||||
func (s *LocalStorage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
|
||||||
path := filepath.Join(s.basedir, token, filename)
|
|
||||||
|
|
||||||
// content type , content length
|
|
||||||
if reader, err = os.Open(path); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var fi os.FileInfo
|
|
||||||
if fi, err = os.Lstat(path); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
contentLength = uint64(fi.Size())
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete removes a file from storage
|
|
||||||
func (s *LocalStorage) Delete(ctx context.Context, token string, filename string) (err error) {
|
|
||||||
metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
|
|
||||||
_ = os.Remove(metadata)
|
|
||||||
|
|
||||||
path := filepath.Join(s.basedir, token, filename)
|
|
||||||
err = os.Remove(path)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Purge cleans up the storage
|
|
||||||
func (s *LocalStorage) Purge(ctx context.Context, days time.Duration) (err error) {
|
|
||||||
err = filepath.Walk(s.basedir,
|
|
||||||
func(path string, info os.FileInfo, err error) error {
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if info.IsDir() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.ModTime().Before(time.Now().Add(-1 * days)) {
|
|
||||||
err = os.Remove(path)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotExist indicates if a file doesn't exist on storage
|
|
||||||
func (s *LocalStorage) IsNotExist(err error) bool {
|
|
||||||
if err == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
return os.IsNotExist(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put saves a file on storage
|
|
||||||
func (s *LocalStorage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
|
|
||||||
var f io.WriteCloser
|
|
||||||
var err error
|
|
||||||
|
|
||||||
path := filepath.Join(s.basedir, token)
|
|
||||||
|
|
||||||
if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
|
||||||
defer CloseCheck(f.Close)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = io.Copy(f, reader); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// S3Storage is a storage backed by AWS S3
|
|
||||||
type S3Storage struct {
|
|
||||||
Storage
|
|
||||||
bucket string
|
|
||||||
session *session.Session
|
|
||||||
s3 *s3.S3
|
|
||||||
logger *log.Logger
|
|
||||||
purgeDays time.Duration
|
|
||||||
noMultipart bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewS3Storage is the factory for S3Storage
|
|
||||||
func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
|
|
||||||
sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
|
|
||||||
|
|
||||||
return &S3Storage{
|
|
||||||
bucket: bucketName,
|
|
||||||
s3: s3.New(sess),
|
|
||||||
session: sess,
|
|
||||||
logger: logger,
|
|
||||||
noMultipart: disableMultipart,
|
|
||||||
purgeDays: time.Duration(purgeDays*24) * time.Hour,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Type returns the storage type
|
|
||||||
func (s *S3Storage) Type() string {
|
|
||||||
return "s3"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Head retrieves content length of a file from storage
|
|
||||||
func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
|
||||||
key := fmt.Sprintf("%s/%s", token, filename)
|
|
||||||
|
|
||||||
headRequest := &s3.HeadObjectInput{
|
|
||||||
Bucket: aws.String(s.bucket),
|
|
||||||
Key: aws.String(key),
|
|
||||||
}
|
|
||||||
|
|
||||||
// content type , content length
|
|
||||||
response, err := s.s3.HeadObjectWithContext(ctx, headRequest)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.ContentLength != nil {
|
|
||||||
contentLength = uint64(*response.ContentLength)
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Purge cleans up the storage
|
|
||||||
func (s *S3Storage) Purge(ctx context.Context, days time.Duration) (err error) {
|
|
||||||
// NOOP expiration is set at upload time
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotExist indicates if a file doesn't exist on storage
|
|
||||||
func (s *S3Storage) IsNotExist(err error) bool {
|
|
||||||
if err == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if aerr, ok := err.(awserr.Error); ok {
|
|
||||||
switch aerr.Code() {
|
|
||||||
case s3.ErrCodeNoSuchKey:
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves a file from storage
|
|
||||||
func (s *S3Storage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
|
||||||
key := fmt.Sprintf("%s/%s", token, filename)
|
|
||||||
|
|
||||||
getRequest := &s3.GetObjectInput{
|
|
||||||
Bucket: aws.String(s.bucket),
|
|
||||||
Key: aws.String(key),
|
|
||||||
}
|
|
||||||
|
|
||||||
response, err := s.s3.GetObjectWithContext(ctx, getRequest)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if response.ContentLength != nil {
|
|
||||||
contentLength = uint64(*response.ContentLength)
|
|
||||||
}
|
|
||||||
|
|
||||||
reader = response.Body
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete removes a file from storage
|
|
||||||
func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
|
|
||||||
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
|
|
||||||
deleteRequest := &s3.DeleteObjectInput{
|
|
||||||
Bucket: aws.String(s.bucket),
|
|
||||||
Key: aws.String(metadata),
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = s.s3.DeleteObjectWithContext(ctx, deleteRequest)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
key := fmt.Sprintf("%s/%s", token, filename)
|
|
||||||
deleteRequest = &s3.DeleteObjectInput{
|
|
||||||
Bucket: aws.String(s.bucket),
|
|
||||||
Key: aws.String(key),
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = s.s3.DeleteObjectWithContext(ctx, deleteRequest)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put saves a file on storage
|
|
||||||
func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
|
|
||||||
key := fmt.Sprintf("%s/%s", token, filename)
|
|
||||||
|
|
||||||
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
|
|
||||||
var concurrency int
|
|
||||||
if !s.noMultipart {
|
|
||||||
concurrency = 20
|
|
||||||
} else {
|
|
||||||
concurrency = 1
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create an uploader with the session and custom options
|
|
||||||
uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
|
|
||||||
u.Concurrency = concurrency // default is 5
|
|
||||||
u.LeavePartsOnError = false
|
|
||||||
})
|
|
||||||
|
|
||||||
var expire *time.Time
|
|
||||||
if s.purgeDays.Hours() > 0 {
|
|
||||||
expire = aws.Time(time.Now().Add(s.purgeDays))
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
|
||||||
Bucket: aws.String(s.bucket),
|
|
||||||
Key: aws.String(key),
|
|
||||||
Body: reader,
|
|
||||||
Expires: expire,
|
|
||||||
})
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// GDrive is a storage backed by GDrive
|
|
||||||
type GDrive struct {
|
|
||||||
service *drive.Service
|
|
||||||
rootID string
|
|
||||||
basedir string
|
|
||||||
localConfigPath string
|
|
||||||
chunkSize int
|
|
||||||
logger *log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewGDriveStorage is the factory for GDrive
|
|
||||||
func NewGDriveStorage(clientJSONFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
|
|
||||||
b, err := ioutil.ReadFile(clientJSONFilepath)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// If modifying these scopes, delete your previously saved client_secret.json.
|
|
||||||
config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// ToDo: Upgrade deprecated version
|
|
||||||
srv, err := drive.New(getGDriveClient(context.TODO(), config, localConfigPath, logger)) // nolint: staticcheck
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
chunkSize = chunkSize * 1024 * 1024
|
|
||||||
storage := &GDrive{service: srv, basedir: basedir, rootID: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
|
|
||||||
err = storage.setupRoot()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return storage, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
const gdriveRootConfigFile = "root_id.conf"
|
|
||||||
const gdriveTokenJSONFile = "token.json"
|
|
||||||
const gdriveDirectoryMimeType = "application/vnd.google-apps.folder"
|
|
||||||
|
|
||||||
func (s *GDrive) setupRoot() error {
|
|
||||||
rootFileConfig := filepath.Join(s.localConfigPath, gdriveRootConfigFile)
|
|
||||||
|
|
||||||
rootID, err := ioutil.ReadFile(rootFileConfig)
|
|
||||||
if err != nil && !os.IsNotExist(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if string(rootID) != "" {
|
|
||||||
s.rootID = string(rootID)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
dir := &drive.File{
|
|
||||||
Name: s.basedir,
|
|
||||||
MimeType: gdriveDirectoryMimeType,
|
|
||||||
}
|
|
||||||
|
|
||||||
di, err := s.service.Files.Create(dir).Fields("id").Do()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
s.rootID = di.Id
|
|
||||||
err = ioutil.WriteFile(rootFileConfig, []byte(s.rootID), os.FileMode(0600))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GDrive) hasChecksum(f *drive.File) bool {
|
|
||||||
return f.Md5Checksum != ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
|
|
||||||
return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *GDrive) findID(filename string, token string) (string, error) {
|
|
||||||
filename = strings.Replace(filename, `'`, `\'`, -1)
|
|
||||||
filename = strings.Replace(filename, `"`, `\"`, -1)
|
|
||||||
|
|
||||||
fileID, tokenID, nextPageToken := "", "", ""
|
|
||||||
|
|
||||||
q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootID, token, gdriveDirectoryMimeType)
|
|
||||||
l, err := s.list(nextPageToken, q)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
for 0 < len(l.Files) {
|
|
||||||
for _, fi := range l.Files {
|
|
||||||
tokenID = fi.Id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if l.NextPageToken == "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
l, err = s.list(l.NextPageToken, q)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if filename == "" {
|
|
||||||
return tokenID, nil
|
|
||||||
} else if tokenID == "" {
|
|
||||||
return "", fmt.Errorf("cannot find file %s/%s", token, filename)
|
|
||||||
}
|
|
||||||
|
|
||||||
q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenID, filename, gdriveDirectoryMimeType)
|
|
||||||
l, err = s.list(nextPageToken, q)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
for 0 < len(l.Files) {
|
|
||||||
for _, fi := range l.Files {
|
|
||||||
|
|
||||||
fileID = fi.Id
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if l.NextPageToken == "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
l, err = s.list(l.NextPageToken, q)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if fileID == "" {
|
|
||||||
return "", fmt.Errorf("cannot find file %s/%s", token, filename)
|
|
||||||
}
|
|
||||||
|
|
||||||
return fileID, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Type returns the storage type
|
|
||||||
func (s *GDrive) Type() string {
|
|
||||||
return "gdrive"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Head retrieves content length of a file from storage
|
|
||||||
func (s *GDrive) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
|
||||||
var fileID string
|
|
||||||
fileID, err = s.findID(filename, token)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var fi *drive.File
|
|
||||||
if fi, err = s.service.Files.Get(fileID).Fields("size").Do(); err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
contentLength = uint64(fi.Size)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves a file from storage
|
|
||||||
func (s *GDrive) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
|
||||||
var fileID string
|
|
||||||
fileID, err = s.findID(filename, token)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var fi *drive.File
|
|
||||||
fi, err = s.service.Files.Get(fileID).Fields("size", "md5Checksum").Do()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if !s.hasChecksum(fi) {
|
|
||||||
err = fmt.Errorf("cannot find file %s/%s", token, filename)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
contentLength = uint64(fi.Size)
|
|
||||||
|
|
||||||
var res *http.Response
|
|
||||||
res, err = s.service.Files.Get(fileID).Context(ctx).Download()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
reader = res.Body
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete removes a file from storage
|
|
||||||
func (s *GDrive) Delete(ctx context.Context, token string, filename string) (err error) {
|
|
||||||
metadata, _ := s.findID(fmt.Sprintf("%s.metadata", filename), token)
|
|
||||||
_ = s.service.Files.Delete(metadata).Do()
|
|
||||||
|
|
||||||
var fileID string
|
|
||||||
fileID, err = s.findID(filename, token)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s.service.Files.Delete(fileID).Do()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Purge cleans up the storage
|
|
||||||
func (s *GDrive) Purge(ctx context.Context, days time.Duration) (err error) {
|
|
||||||
nextPageToken := ""
|
|
||||||
|
|
||||||
expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
|
|
||||||
q := fmt.Sprintf("'%s' in parents and modifiedTime < '%s' and mimeType!='%s' and trashed=false", s.rootID, expirationDate, gdriveDirectoryMimeType)
|
|
||||||
l, err := s.list(nextPageToken, q)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
for 0 < len(l.Files) {
|
|
||||||
for _, fi := range l.Files {
|
|
||||||
err = s.service.Files.Delete(fi.Id).Do()
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if l.NextPageToken == "" {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
l, err = s.list(l.NextPageToken, q)
|
|
||||||
if err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotExist indicates if a file doesn't exist on storage
|
|
||||||
func (s *GDrive) IsNotExist(err error) bool {
|
|
||||||
if err == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
if e, ok := err.(*googleapi.Error); ok {
|
|
||||||
return e.Code == http.StatusNotFound
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put saves a file on storage
|
|
||||||
func (s *GDrive) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
|
|
||||||
dirID, err := s.findID("", token)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if dirID == "" {
|
|
||||||
dir := &drive.File{
|
|
||||||
Name: token,
|
|
||||||
Parents: []string{s.rootID},
|
|
||||||
MimeType: gdriveDirectoryMimeType,
|
|
||||||
Size: int64(contentLength),
|
|
||||||
}
|
|
||||||
|
|
||||||
di, err := s.service.Files.Create(dir).Fields("id").Do()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
dirID = di.Id
|
|
||||||
}
|
|
||||||
|
|
||||||
// Instantiate empty drive file
|
|
||||||
dst := &drive.File{
|
|
||||||
Name: filename,
|
|
||||||
Parents: []string{dirID},
|
|
||||||
MimeType: contentType,
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieve a token, saves the token, then returns the generated client.
|
|
||||||
func getGDriveClient(ctx context.Context, config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
|
|
||||||
tokenFile := filepath.Join(localConfigPath, gdriveTokenJSONFile)
|
|
||||||
tok, err := gDriveTokenFromFile(tokenFile)
|
|
||||||
if err != nil {
|
|
||||||
tok = getGDriveTokenFromWeb(ctx, config, logger)
|
|
||||||
saveGDriveToken(tokenFile, tok, logger)
|
|
||||||
}
|
|
||||||
|
|
||||||
return config.Client(ctx, tok)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Request a token from the web, then returns the retrieved token.
|
|
||||||
func getGDriveTokenFromWeb(ctx context.Context, config *oauth2.Config, logger *log.Logger) *oauth2.Token {
|
|
||||||
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
|
|
||||||
fmt.Printf("Go to the following link in your browser then type the "+
|
|
||||||
"authorization code: \n%v\n", authURL)
|
|
||||||
|
|
||||||
var authCode string
|
|
||||||
if _, err := fmt.Scan(&authCode); err != nil {
|
|
||||||
logger.Fatalf("Unable to read authorization code %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
tok, err := config.Exchange(ctx, authCode)
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatalf("Unable to retrieve token from web %v", err)
|
|
||||||
}
|
|
||||||
return tok
|
|
||||||
}
|
|
||||||
|
|
||||||
// Retrieves a token from a local file.
|
|
||||||
func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
|
|
||||||
f, err := os.Open(file)
|
|
||||||
defer CloseCheck(f.Close)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
tok := &oauth2.Token{}
|
|
||||||
err = json.NewDecoder(f).Decode(tok)
|
|
||||||
return tok, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Saves a token to a file path.
|
|
||||||
func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
|
|
||||||
logger.Printf("Saving credential file to: %s\n", path)
|
|
||||||
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
|
||||||
defer CloseCheck(f.Close)
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatalf("Unable to cache oauth token: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = json.NewEncoder(f).Encode(token)
|
|
||||||
if err != nil {
|
|
||||||
logger.Fatalf("Unable to encode oauth token: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// StorjStorage is a storage backed by Storj
|
|
||||||
type StorjStorage struct {
|
|
||||||
Storage
|
|
||||||
project *uplink.Project
|
|
||||||
bucket *uplink.Bucket
|
|
||||||
purgeDays time.Duration
|
|
||||||
logger *log.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewStorjStorage is the factory for StorjStorage
|
|
||||||
func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
|
|
||||||
var instance StorjStorage
|
|
||||||
var err error
|
|
||||||
|
|
||||||
pCtx := context.TODO()
|
|
||||||
|
|
||||||
ctx := fpath.WithTempData(pCtx, "", true)
|
|
||||||
|
|
||||||
uplConf := &uplink.Config{
|
|
||||||
UserAgent: "transfer-sh",
|
|
||||||
}
|
|
||||||
|
|
||||||
parsedAccess, err := uplink.ParseAccess(access)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
instance.project, err = uplConf.OpenProject(ctx, parsedAccess)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
|
|
||||||
if err != nil {
|
|
||||||
//Ignoring the error to return the one that occurred first, but try to clean up.
|
|
||||||
_ = instance.project.Close()
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
instance.purgeDays = time.Duration(purgeDays*24) * time.Hour
|
|
||||||
|
|
||||||
instance.logger = logger
|
|
||||||
|
|
||||||
return &instance, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Type returns the storage type
|
|
||||||
func (s *StorjStorage) Type() string {
|
|
||||||
return "storj"
|
|
||||||
}
|
|
||||||
|
|
||||||
// Head retrieves content length of a file from storage
|
|
||||||
func (s *StorjStorage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
|
||||||
key := storj.JoinPaths(token, filename)
|
|
||||||
|
|
||||||
obj, err := s.project.StatObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
|
|
||||||
if err != nil {
|
|
||||||
return 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
contentLength = uint64(obj.System.ContentLength)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves a file from storage
|
|
||||||
func (s *StorjStorage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
|
||||||
key := storj.JoinPaths(token, filename)
|
|
||||||
|
|
||||||
s.logger.Printf("Getting file %s from Storj Bucket", filename)
|
|
||||||
|
|
||||||
download, err := s.project.DownloadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
contentLength = uint64(download.Info().System.ContentLength)
|
|
||||||
|
|
||||||
reader = download
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete removes a file from storage
|
|
||||||
func (s *StorjStorage) Delete(ctx context.Context, token string, filename string) (err error) {
|
|
||||||
key := storj.JoinPaths(token, filename)
|
|
||||||
|
|
||||||
s.logger.Printf("Deleting file %s from Storj Bucket", filename)
|
|
||||||
|
|
||||||
_, err = s.project.DeleteObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Purge cleans up the storage
|
|
||||||
func (s *StorjStorage) Purge(ctx context.Context, days time.Duration) (err error) {
|
|
||||||
// NOOP expiration is set at upload time
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Put saves a file on storage
|
|
||||||
func (s *StorjStorage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
|
|
||||||
key := storj.JoinPaths(token, filename)
|
|
||||||
|
|
||||||
s.logger.Printf("Uploading file %s to Storj Bucket", filename)
|
|
||||||
|
|
||||||
var uploadOptions *uplink.UploadOptions
|
|
||||||
if s.purgeDays.Hours() > 0 {
|
|
||||||
uploadOptions = &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)}
|
|
||||||
}
|
|
||||||
|
|
||||||
writer, err := s.project.UploadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, uploadOptions)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := io.Copy(writer, reader)
|
|
||||||
if err != nil || uint64(n) != contentLength {
|
|
||||||
//Ignoring the error to return the one that occurred first, but try to clean up.
|
|
||||||
_ = writer.Abort()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
|
|
||||||
if err != nil {
|
|
||||||
//Ignoring the error to return the one that occurred first, but try to clean up.
|
|
||||||
_ = writer.Abort()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = writer.Commit()
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// IsNotExist indicates if a file doesn't exist on storage
|
|
||||||
func (s *StorjStorage) IsNotExist(err error) bool {
|
|
||||||
return errors.Is(err, uplink.ErrObjectNotFound)
|
|
||||||
}
|
|
33
server/storage/common.go
Normal file
33
server/storage/common.go
Normal file
|
@ -0,0 +1,33 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Storage is the interface for storage operation
|
||||||
|
type Storage interface {
|
||||||
|
// Get retrieves a file from storage
|
||||||
|
Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
|
||||||
|
// Head retrieves content length of a file from storage
|
||||||
|
Head(ctx context.Context, token string, filename string) (contentLength uint64, err error)
|
||||||
|
// Put saves a file on storage
|
||||||
|
Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
|
||||||
|
// Delete removes a file from storage
|
||||||
|
Delete(ctx context.Context, token string, filename string) error
|
||||||
|
// IsNotExist indicates if a file doesn't exist on storage
|
||||||
|
IsNotExist(err error) bool
|
||||||
|
// Purge cleans up the storage
|
||||||
|
Purge(ctx context.Context, days time.Duration) error
|
||||||
|
|
||||||
|
// Type returns the storage type
|
||||||
|
Type() string
|
||||||
|
}
|
||||||
|
|
||||||
|
func CloseCheck(f func() error) {
|
||||||
|
if err := f(); err != nil {
|
||||||
|
fmt.Println("Received close error:", err)
|
||||||
|
}
|
||||||
|
}
|
377
server/storage/gdrive.go
Normal file
377
server/storage/gdrive.go
Normal file
|
@ -0,0 +1,377 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"io/ioutil"
|
||||||
|
"log"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/oauth2"
|
||||||
|
"golang.org/x/oauth2/google"
|
||||||
|
drive "google.golang.org/api/drive/v3"
|
||||||
|
"google.golang.org/api/googleapi"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GDrive is a storage backed by GDrive
|
||||||
|
type GDrive struct {
|
||||||
|
service *drive.Service
|
||||||
|
rootID string
|
||||||
|
basedir string
|
||||||
|
localConfigPath string
|
||||||
|
chunkSize int
|
||||||
|
logger *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewGDriveStorage is the factory for GDrive
|
||||||
|
func NewGDriveStorage(clientJSONFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
|
||||||
|
b, err := ioutil.ReadFile(clientJSONFilepath)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// If modifying these scopes, delete your previously saved client_secret.json.
|
||||||
|
config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ToDo: Upgrade deprecated version
|
||||||
|
srv, err := drive.New(getGDriveClient(context.TODO(), config, localConfigPath, logger)) // nolint: staticcheck
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkSize = chunkSize * 1024 * 1024
|
||||||
|
storage := &GDrive{service: srv, basedir: basedir, rootID: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
|
||||||
|
err = storage.setupRoot()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return storage, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const gdriveRootConfigFile = "root_id.conf"
|
||||||
|
const gdriveTokenJSONFile = "token.json"
|
||||||
|
const gdriveDirectoryMimeType = "application/vnd.google-apps.folder"
|
||||||
|
|
||||||
|
func (s *GDrive) setupRoot() error {
|
||||||
|
rootFileConfig := filepath.Join(s.localConfigPath, gdriveRootConfigFile)
|
||||||
|
|
||||||
|
rootID, err := ioutil.ReadFile(rootFileConfig)
|
||||||
|
if err != nil && !os.IsNotExist(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(rootID) != "" {
|
||||||
|
s.rootID = string(rootID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
dir := &drive.File{
|
||||||
|
Name: s.basedir,
|
||||||
|
MimeType: gdriveDirectoryMimeType,
|
||||||
|
}
|
||||||
|
|
||||||
|
di, err := s.service.Files.Create(dir).Fields("id").Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.rootID = di.Id
|
||||||
|
err = ioutil.WriteFile(rootFileConfig, []byte(s.rootID), os.FileMode(0600))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GDrive) hasChecksum(f *drive.File) bool {
|
||||||
|
return f.Md5Checksum != ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
|
||||||
|
return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *GDrive) findID(filename string, token string) (string, error) {
|
||||||
|
filename = strings.Replace(filename, `'`, `\'`, -1)
|
||||||
|
filename = strings.Replace(filename, `"`, `\"`, -1)
|
||||||
|
|
||||||
|
fileID, tokenID, nextPageToken := "", "", ""
|
||||||
|
|
||||||
|
q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootID, token, gdriveDirectoryMimeType)
|
||||||
|
l, err := s.list(nextPageToken, q)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
for 0 < len(l.Files) {
|
||||||
|
for _, fi := range l.Files {
|
||||||
|
tokenID = fi.Id
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.NextPageToken == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
l, err = s.list(l.NextPageToken, q)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if filename == "" {
|
||||||
|
return tokenID, nil
|
||||||
|
} else if tokenID == "" {
|
||||||
|
return "", fmt.Errorf("cannot find file %s/%s", token, filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenID, filename, gdriveDirectoryMimeType)
|
||||||
|
l, err = s.list(nextPageToken, q)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
for 0 < len(l.Files) {
|
||||||
|
for _, fi := range l.Files {
|
||||||
|
|
||||||
|
fileID = fi.Id
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.NextPageToken == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
l, err = s.list(l.NextPageToken, q)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if fileID == "" {
|
||||||
|
return "", fmt.Errorf("cannot find file %s/%s", token, filename)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fileID, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the storage type
|
||||||
|
func (s *GDrive) Type() string {
|
||||||
|
return "gdrive"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head retrieves content length of a file from storage
|
||||||
|
func (s *GDrive) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
||||||
|
var fileID string
|
||||||
|
fileID, err = s.findID(filename, token)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var fi *drive.File
|
||||||
|
if fi, err = s.service.Files.Get(fileID).Fields("size").Do(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contentLength = uint64(fi.Size)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves a file from storage
|
||||||
|
func (s *GDrive) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
||||||
|
var fileID string
|
||||||
|
fileID, err = s.findID(filename, token)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var fi *drive.File
|
||||||
|
fi, err = s.service.Files.Get(fileID).Fields("size", "md5Checksum").Do()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !s.hasChecksum(fi) {
|
||||||
|
err = fmt.Errorf("cannot find file %s/%s", token, filename)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contentLength = uint64(fi.Size)
|
||||||
|
|
||||||
|
var res *http.Response
|
||||||
|
res, err = s.service.Files.Get(fileID).Context(ctx).Download()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
reader = res.Body
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes a file from storage
|
||||||
|
func (s *GDrive) Delete(ctx context.Context, token string, filename string) (err error) {
|
||||||
|
metadata, _ := s.findID(fmt.Sprintf("%s.metadata", filename), token)
|
||||||
|
_ = s.service.Files.Delete(metadata).Do()
|
||||||
|
|
||||||
|
var fileID string
|
||||||
|
fileID, err = s.findID(filename, token)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
err = s.service.Files.Delete(fileID).Do()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purge cleans up the storage
|
||||||
|
func (s *GDrive) Purge(ctx context.Context, days time.Duration) (err error) {
|
||||||
|
nextPageToken := ""
|
||||||
|
|
||||||
|
expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
|
||||||
|
q := fmt.Sprintf("'%s' in parents and modifiedTime < '%s' and mimeType!='%s' and trashed=false", s.rootID, expirationDate, gdriveDirectoryMimeType)
|
||||||
|
l, err := s.list(nextPageToken, q)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for 0 < len(l.Files) {
|
||||||
|
for _, fi := range l.Files {
|
||||||
|
err = s.service.Files.Delete(fi.Id).Do()
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.NextPageToken == "" {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
l, err = s.list(l.NextPageToken, q)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsNotExist indicates if a file doesn't exist on storage
|
||||||
|
func (s *GDrive) IsNotExist(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if e, ok := err.(*googleapi.Error); ok {
|
||||||
|
return e.Code == http.StatusNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put saves a file on storage
|
||||||
|
func (s *GDrive) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
|
||||||
|
dirID, err := s.findID("", token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if dirID == "" {
|
||||||
|
dir := &drive.File{
|
||||||
|
Name: token,
|
||||||
|
Parents: []string{s.rootID},
|
||||||
|
MimeType: gdriveDirectoryMimeType,
|
||||||
|
Size: int64(contentLength),
|
||||||
|
}
|
||||||
|
|
||||||
|
di, err := s.service.Files.Create(dir).Fields("id").Do()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
dirID = di.Id
|
||||||
|
}
|
||||||
|
|
||||||
|
// Instantiate empty drive file
|
||||||
|
dst := &drive.File{
|
||||||
|
Name: filename,
|
||||||
|
Parents: []string{dirID},
|
||||||
|
MimeType: contentType,
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieve a token, saves the token, then returns the generated client.
|
||||||
|
func getGDriveClient(ctx context.Context, config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
|
||||||
|
tokenFile := filepath.Join(localConfigPath, gdriveTokenJSONFile)
|
||||||
|
tok, err := gDriveTokenFromFile(tokenFile)
|
||||||
|
if err != nil {
|
||||||
|
tok = getGDriveTokenFromWeb(ctx, config, logger)
|
||||||
|
saveGDriveToken(tokenFile, tok, logger)
|
||||||
|
}
|
||||||
|
|
||||||
|
return config.Client(ctx, tok)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Request a token from the web, then returns the retrieved token.
|
||||||
|
func getGDriveTokenFromWeb(ctx context.Context, config *oauth2.Config, logger *log.Logger) *oauth2.Token {
|
||||||
|
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
|
||||||
|
fmt.Printf("Go to the following link in your browser then type the "+
|
||||||
|
"authorization code: \n%v\n", authURL)
|
||||||
|
|
||||||
|
var authCode string
|
||||||
|
if _, err := fmt.Scan(&authCode); err != nil {
|
||||||
|
logger.Fatalf("Unable to read authorization code %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tok, err := config.Exchange(ctx, authCode)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("Unable to retrieve token from web %v", err)
|
||||||
|
}
|
||||||
|
return tok
|
||||||
|
}
|
||||||
|
|
||||||
|
// Retrieves a token from a local file.
|
||||||
|
func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
|
||||||
|
f, err := os.Open(file)
|
||||||
|
defer CloseCheck(f.Close)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
tok := &oauth2.Token{}
|
||||||
|
err = json.NewDecoder(f).Decode(tok)
|
||||||
|
return tok, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Saves a token to a file path.
|
||||||
|
func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
|
||||||
|
logger.Printf("Saving credential file to: %s\n", path)
|
||||||
|
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
||||||
|
defer CloseCheck(f.Close)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("Unable to cache oauth token: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = json.NewEncoder(f).Encode(token)
|
||||||
|
if err != nil {
|
||||||
|
logger.Fatalf("Unable to encode oauth token: %v", err)
|
||||||
|
}
|
||||||
|
}
|
127
server/storage/local.go
Normal file
127
server/storage/local.go
Normal file
|
@ -0,0 +1,127 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LocalStorage is a local storage
|
||||||
|
type LocalStorage struct {
|
||||||
|
Storage
|
||||||
|
basedir string
|
||||||
|
logger *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLocalStorage is the factory for LocalStorage
|
||||||
|
func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
|
||||||
|
return &LocalStorage{basedir: basedir, logger: logger}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the storage type
|
||||||
|
func (s *LocalStorage) Type() string {
|
||||||
|
return "local"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head retrieves content length of a file from storage
|
||||||
|
func (s *LocalStorage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
||||||
|
path := filepath.Join(s.basedir, token, filename)
|
||||||
|
|
||||||
|
var fi os.FileInfo
|
||||||
|
if fi, err = os.Lstat(path); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contentLength = uint64(fi.Size())
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves a file from storage
|
||||||
|
func (s *LocalStorage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
||||||
|
path := filepath.Join(s.basedir, token, filename)
|
||||||
|
|
||||||
|
// content type , content length
|
||||||
|
if reader, err = os.Open(path); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var fi os.FileInfo
|
||||||
|
if fi, err = os.Lstat(path); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
contentLength = uint64(fi.Size())
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes a file from storage
|
||||||
|
func (s *LocalStorage) Delete(ctx context.Context, token string, filename string) (err error) {
|
||||||
|
metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
|
||||||
|
_ = os.Remove(metadata)
|
||||||
|
|
||||||
|
path := filepath.Join(s.basedir, token, filename)
|
||||||
|
err = os.Remove(path)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purge cleans up the storage
|
||||||
|
func (s *LocalStorage) Purge(ctx context.Context, days time.Duration) (err error) {
|
||||||
|
err = filepath.Walk(s.basedir,
|
||||||
|
func(path string, info os.FileInfo, err error) error {
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if info.IsDir() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.ModTime().Before(time.Now().Add(-1 * days)) {
|
||||||
|
err = os.Remove(path)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsNotExist indicates if a file doesn't exist on storage
|
||||||
|
func (s *LocalStorage) IsNotExist(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return os.IsNotExist(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put saves a file on storage
|
||||||
|
func (s *LocalStorage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
|
||||||
|
var f io.WriteCloser
|
||||||
|
var err error
|
||||||
|
|
||||||
|
path := filepath.Join(s.basedir, token)
|
||||||
|
|
||||||
|
if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
|
||||||
|
defer CloseCheck(f.Close)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = io.Copy(f, reader); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
178
server/storage/s3.go
Normal file
178
server/storage/s3.go
Normal file
|
@ -0,0 +1,178 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/aws/aws-sdk-go/aws"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||||
|
"github.com/aws/aws-sdk-go/aws/session"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3"
|
||||||
|
"github.com/aws/aws-sdk-go/service/s3/s3manager"
|
||||||
|
)
|
||||||
|
|
||||||
|
// S3Storage is a storage backed by AWS S3
|
||||||
|
type S3Storage struct {
|
||||||
|
Storage
|
||||||
|
bucket string
|
||||||
|
session *session.Session
|
||||||
|
s3 *s3.S3
|
||||||
|
logger *log.Logger
|
||||||
|
purgeDays time.Duration
|
||||||
|
noMultipart bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewS3Storage is the factory for S3Storage
|
||||||
|
func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
|
||||||
|
sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
|
||||||
|
|
||||||
|
return &S3Storage{
|
||||||
|
bucket: bucketName,
|
||||||
|
s3: s3.New(sess),
|
||||||
|
session: sess,
|
||||||
|
logger: logger,
|
||||||
|
noMultipart: disableMultipart,
|
||||||
|
purgeDays: time.Duration(purgeDays*24) * time.Hour,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the storage type
|
||||||
|
func (s *S3Storage) Type() string {
|
||||||
|
return "s3"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head retrieves content length of a file from storage
|
||||||
|
func (s *S3Storage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
||||||
|
key := fmt.Sprintf("%s/%s", token, filename)
|
||||||
|
|
||||||
|
headRequest := &s3.HeadObjectInput{
|
||||||
|
Bucket: aws.String(s.bucket),
|
||||||
|
Key: aws.String(key),
|
||||||
|
}
|
||||||
|
|
||||||
|
// content type , content length
|
||||||
|
response, err := s.s3.HeadObjectWithContext(ctx, headRequest)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.ContentLength != nil {
|
||||||
|
contentLength = uint64(*response.ContentLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purge cleans up the storage
|
||||||
|
func (s *S3Storage) Purge(ctx context.Context, days time.Duration) (err error) {
|
||||||
|
// NOOP expiration is set at upload time
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsNotExist indicates if a file doesn't exist on storage
|
||||||
|
func (s *S3Storage) IsNotExist(err error) bool {
|
||||||
|
if err == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if aerr, ok := err.(awserr.Error); ok {
|
||||||
|
switch aerr.Code() {
|
||||||
|
case s3.ErrCodeNoSuchKey:
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves a file from storage
|
||||||
|
func (s *S3Storage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
||||||
|
key := fmt.Sprintf("%s/%s", token, filename)
|
||||||
|
|
||||||
|
getRequest := &s3.GetObjectInput{
|
||||||
|
Bucket: aws.String(s.bucket),
|
||||||
|
Key: aws.String(key),
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := s.s3.GetObjectWithContext(ctx, getRequest)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if response.ContentLength != nil {
|
||||||
|
contentLength = uint64(*response.ContentLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
reader = response.Body
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes a file from storage
|
||||||
|
func (s *S3Storage) Delete(ctx context.Context, token string, filename string) (err error) {
|
||||||
|
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
|
||||||
|
deleteRequest := &s3.DeleteObjectInput{
|
||||||
|
Bucket: aws.String(s.bucket),
|
||||||
|
Key: aws.String(metadata),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.s3.DeleteObjectWithContext(ctx, deleteRequest)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
key := fmt.Sprintf("%s/%s", token, filename)
|
||||||
|
deleteRequest = &s3.DeleteObjectInput{
|
||||||
|
Bucket: aws.String(s.bucket),
|
||||||
|
Key: aws.String(key),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = s.s3.DeleteObjectWithContext(ctx, deleteRequest)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put saves a file on storage
|
||||||
|
func (s *S3Storage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
|
||||||
|
key := fmt.Sprintf("%s/%s", token, filename)
|
||||||
|
|
||||||
|
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
|
||||||
|
var concurrency int
|
||||||
|
if !s.noMultipart {
|
||||||
|
concurrency = 20
|
||||||
|
} else {
|
||||||
|
concurrency = 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an uploader with the session and custom options
|
||||||
|
uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
|
||||||
|
u.Concurrency = concurrency // default is 5
|
||||||
|
u.LeavePartsOnError = false
|
||||||
|
})
|
||||||
|
|
||||||
|
var expire *time.Time
|
||||||
|
if s.purgeDays.Hours() > 0 {
|
||||||
|
expire = aws.Time(time.Now().Add(s.purgeDays))
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = uploader.UploadWithContext(ctx, &s3manager.UploadInput{
|
||||||
|
Bucket: aws.String(s.bucket),
|
||||||
|
Key: aws.String(key),
|
||||||
|
Body: reader,
|
||||||
|
Expires: expire,
|
||||||
|
})
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func getAwsSession(accessKey, secretKey, region, endpoint string, forcePathStyle bool) *session.Session {
|
||||||
|
return session.Must(session.NewSession(&aws.Config{
|
||||||
|
Region: aws.String(region),
|
||||||
|
Endpoint: aws.String(endpoint),
|
||||||
|
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
|
||||||
|
S3ForcePathStyle: aws.Bool(forcePathStyle),
|
||||||
|
}))
|
||||||
|
}
|
150
server/storage/storj.go
Normal file
150
server/storage/storj.go
Normal file
|
@ -0,0 +1,150 @@
|
||||||
|
package storage
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"log"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"storj.io/common/fpath"
|
||||||
|
"storj.io/common/storj"
|
||||||
|
"storj.io/uplink"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StorjStorage is a storage backed by Storj
|
||||||
|
type StorjStorage struct {
|
||||||
|
Storage
|
||||||
|
project *uplink.Project
|
||||||
|
bucket *uplink.Bucket
|
||||||
|
purgeDays time.Duration
|
||||||
|
logger *log.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewStorjStorage is the factory for StorjStorage
|
||||||
|
func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
|
||||||
|
var instance StorjStorage
|
||||||
|
var err error
|
||||||
|
|
||||||
|
pCtx := context.TODO()
|
||||||
|
|
||||||
|
ctx := fpath.WithTempData(pCtx, "", true)
|
||||||
|
|
||||||
|
uplConf := &uplink.Config{
|
||||||
|
UserAgent: "transfer-sh",
|
||||||
|
}
|
||||||
|
|
||||||
|
parsedAccess, err := uplink.ParseAccess(access)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
instance.project, err = uplConf.OpenProject(ctx, parsedAccess)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
|
||||||
|
if err != nil {
|
||||||
|
//Ignoring the error to return the one that occurred first, but try to clean up.
|
||||||
|
_ = instance.project.Close()
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
instance.purgeDays = time.Duration(purgeDays*24) * time.Hour
|
||||||
|
|
||||||
|
instance.logger = logger
|
||||||
|
|
||||||
|
return &instance, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Type returns the storage type
|
||||||
|
func (s *StorjStorage) Type() string {
|
||||||
|
return "storj"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Head retrieves content length of a file from storage
|
||||||
|
func (s *StorjStorage) Head(ctx context.Context, token string, filename string) (contentLength uint64, err error) {
|
||||||
|
key := storj.JoinPaths(token, filename)
|
||||||
|
|
||||||
|
obj, err := s.project.StatObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
|
||||||
|
if err != nil {
|
||||||
|
return 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
contentLength = uint64(obj.System.ContentLength)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get retrieves a file from storage
|
||||||
|
func (s *StorjStorage) Get(ctx context.Context, token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
|
||||||
|
key := storj.JoinPaths(token, filename)
|
||||||
|
|
||||||
|
s.logger.Printf("Getting file %s from Storj Bucket", filename)
|
||||||
|
|
||||||
|
download, err := s.project.DownloadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
contentLength = uint64(download.Info().System.ContentLength)
|
||||||
|
|
||||||
|
reader = download
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete removes a file from storage
|
||||||
|
func (s *StorjStorage) Delete(ctx context.Context, token string, filename string) (err error) {
|
||||||
|
key := storj.JoinPaths(token, filename)
|
||||||
|
|
||||||
|
s.logger.Printf("Deleting file %s from Storj Bucket", filename)
|
||||||
|
|
||||||
|
_, err = s.project.DeleteObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Purge cleans up the storage
|
||||||
|
func (s *StorjStorage) Purge(ctx context.Context, days time.Duration) (err error) {
|
||||||
|
// NOOP expiration is set at upload time
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put saves a file on storage
|
||||||
|
func (s *StorjStorage) Put(ctx context.Context, token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
|
||||||
|
key := storj.JoinPaths(token, filename)
|
||||||
|
|
||||||
|
s.logger.Printf("Uploading file %s to Storj Bucket", filename)
|
||||||
|
|
||||||
|
var uploadOptions *uplink.UploadOptions
|
||||||
|
if s.purgeDays.Hours() > 0 {
|
||||||
|
uploadOptions = &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)}
|
||||||
|
}
|
||||||
|
|
||||||
|
writer, err := s.project.UploadObject(fpath.WithTempData(ctx, "", true), s.bucket.Name, key, uploadOptions)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := io.Copy(writer, reader)
|
||||||
|
if err != nil || uint64(n) != contentLength {
|
||||||
|
//Ignoring the error to return the one that occurred first, but try to clean up.
|
||||||
|
_ = writer.Abort()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
|
||||||
|
if err != nil {
|
||||||
|
//Ignoring the error to return the one that occurred first, but try to clean up.
|
||||||
|
_ = writer.Abort()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = writer.Commit()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsNotExist indicates if a file doesn't exist on storage
|
||||||
|
func (s *StorjStorage) IsNotExist(err error) bool {
|
||||||
|
return errors.Is(err, uplink.ErrObjectNotFound)
|
||||||
|
}
|
|
@ -34,21 +34,9 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"github.com/aws/aws-sdk-go/aws"
|
|
||||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
|
||||||
"github.com/aws/aws-sdk-go/aws/session"
|
|
||||||
"github.com/golang/gddo/httputil/header"
|
"github.com/golang/gddo/httputil/header"
|
||||||
)
|
)
|
||||||
|
|
||||||
func getAwsSession(accessKey, secretKey, region, endpoint string, forcePathStyle bool) *session.Session {
|
|
||||||
return session.Must(session.NewSession(&aws.Config{
|
|
||||||
Region: aws.String(region),
|
|
||||||
Endpoint: aws.String(endpoint),
|
|
||||||
Credentials: credentials.NewStaticCredentials(accessKey, secretKey, ""),
|
|
||||||
S3ForcePathStyle: aws.Bool(forcePathStyle),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
|
|
||||||
func formatNumber(format string, s uint64) string {
|
func formatNumber(format string, s uint64) string {
|
||||||
return renderFloat(format, float64(s))
|
return renderFloat(format, float64(s))
|
||||||
}
|
}
|
||||||
|
@ -279,9 +267,3 @@ func formatSize(size int64) string {
|
||||||
getSuffix := suffixes[int(math.Floor(base))]
|
getSuffix := suffixes[int(math.Floor(base))]
|
||||||
return fmt.Sprintf("%s %s", strconv.FormatFloat(newVal, 'f', -1, 64), getSuffix)
|
return fmt.Sprintf("%s %s", strconv.FormatFloat(newVal, 'f', -1, 64), getSuffix)
|
||||||
}
|
}
|
||||||
|
|
||||||
func CloseCheck(f func() error) {
|
|
||||||
if err := f(); err != nil {
|
|
||||||
fmt.Println("Received close error:", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue