transfer.sh/server/storage.go
2021-01-11 16:02:00 +01:00

740 lines
17 KiB
Go

package server
import (
"encoding/json"
"errors"
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"storj.io/common/storj"
"storj.io/uplink"
)
type Storage interface {
Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error)
Head(token string, filename string) (contentLength uint64, err error)
Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
Delete(token string, filename string) error
IsNotExist(err error) bool
Purge(days time.Duration) error
Type() string
}
type LocalStorage struct {
Storage
basedir string
logger *log.Logger
}
func NewLocalStorage(basedir string, logger *log.Logger) (*LocalStorage, error) {
return &LocalStorage{basedir: basedir, logger: logger}, nil
}
func (s *LocalStorage) Type() string {
return "local"
}
func (s *LocalStorage) Head(token string, filename string) (contentLength uint64, err error) {
path := filepath.Join(s.basedir, token, filename)
var fi os.FileInfo
if fi, err = os.Lstat(path); err != nil {
return
}
contentLength = uint64(fi.Size())
return
}
func (s *LocalStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
path := filepath.Join(s.basedir, token, filename)
// content type , content length
if reader, err = os.Open(path); err != nil {
return
}
var fi os.FileInfo
if fi, err = os.Lstat(path); err != nil {
return
}
contentLength = uint64(fi.Size())
return
}
func (s *LocalStorage) Delete(token string, filename string) (err error) {
metadata := filepath.Join(s.basedir, token, fmt.Sprintf("%s.metadata", filename))
os.Remove(metadata)
path := filepath.Join(s.basedir, token, filename)
err = os.Remove(path)
return
}
func (s *LocalStorage) Purge(days time.Duration) (err error) {
err = filepath.Walk(s.basedir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if info.ModTime().Before(time.Now().Add(-1 * days)) {
err = os.Remove(path)
return err
}
return nil
})
return
}
func (s *LocalStorage) IsNotExist(err error) bool {
if err == nil {
return false
}
return os.IsNotExist(err)
}
func (s *LocalStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
var f io.WriteCloser
var err error
path := filepath.Join(s.basedir, token)
if err = os.MkdirAll(path, 0700); err != nil && !os.IsExist(err) {
return err
}
if f, err = os.OpenFile(filepath.Join(path, filename), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600); err != nil {
return err
}
defer f.Close()
if _, err = io.Copy(f, reader); err != nil {
return err
}
return nil
}
type S3Storage struct {
Storage
bucket string
session *session.Session
s3 *s3.S3
logger *log.Logger
purgeDays time.Duration
noMultipart bool
}
func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
return &S3Storage{
bucket: bucketName,
s3: s3.New(sess),
session: sess,
logger: logger,
noMultipart: disableMultipart,
purgeDays: time.Duration(purgeDays*24) * time.Hour,
}, nil
}
func (s *S3Storage) Type() string {
return "s3"
}
func (s *S3Storage) Head(token string, filename string) (contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
headRequest := &s3.HeadObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
// content type , content length
response, err := s.s3.HeadObject(headRequest)
if err != nil {
return
}
if response.ContentLength != nil {
contentLength = uint64(*response.ContentLength)
}
return
}
func (s *S3Storage) Purge(days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}
func (s *S3Storage) IsNotExist(err error) bool {
if err == nil {
return false
}
if aerr, ok := err.(awserr.Error); ok {
switch aerr.Code() {
case s3.ErrCodeNoSuchKey:
return true
}
}
return false
}
func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
key := fmt.Sprintf("%s/%s", token, filename)
getRequest := &s3.GetObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
response, err := s.s3.GetObject(getRequest)
if err != nil {
return
}
if response.ContentLength != nil {
contentLength = uint64(*response.ContentLength)
}
reader = response.Body
return
}
func (s *S3Storage) Delete(token string, filename string) (err error) {
metadata := fmt.Sprintf("%s/%s.metadata", token, filename)
deleteRequest := &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(metadata),
}
_, err = s.s3.DeleteObject(deleteRequest)
if err != nil {
return
}
key := fmt.Sprintf("%s/%s", token, filename)
deleteRequest = &s3.DeleteObjectInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
}
_, err = s.s3.DeleteObject(deleteRequest)
return
}
func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := fmt.Sprintf("%s/%s", token, filename)
s.logger.Printf("Uploading file %s to S3 Bucket", filename)
var concurrency int
if !s.noMultipart {
concurrency = 20
} else {
concurrency = 1
}
// Create an uploader with the session and custom options
uploader := s3manager.NewUploader(s.session, func(u *s3manager.Uploader) {
u.Concurrency = concurrency // default is 5
u.LeavePartsOnError = false
})
_, err = uploader.Upload(&s3manager.UploadInput{
Bucket: aws.String(s.bucket),
Key: aws.String(key),
Body: reader,
Expires: aws.Time(time.Now().Add(s.purgeDays)),
})
return
}
type GDrive struct {
service *drive.Service
rootId string
basedir string
localConfigPath string
chunkSize int
logger *log.Logger
}
func NewGDriveStorage(clientJsonFilepath string, localConfigPath string, basedir string, chunkSize int, logger *log.Logger) (*GDrive, error) {
b, err := ioutil.ReadFile(clientJsonFilepath)
if err != nil {
return nil, err
}
// If modifying these scopes, delete your previously saved client_secret.json.
config, err := google.ConfigFromJSON(b, drive.DriveScope, drive.DriveMetadataScope)
if err != nil {
return nil, err
}
srv, err := drive.New(getGDriveClient(config, localConfigPath, logger))
if err != nil {
return nil, err
}
chunkSize = chunkSize * 1024 * 1024
storage := &GDrive{service: srv, basedir: basedir, rootId: "", localConfigPath: localConfigPath, chunkSize: chunkSize, logger: logger}
err = storage.setupRoot()
if err != nil {
return nil, err
}
return storage, nil
}
const GDriveRootConfigFile = "root_id.conf"
const GDriveTokenJsonFile = "token.json"
const GDriveDirectoryMimeType = "application/vnd.google-apps.folder"
func (s *GDrive) setupRoot() error {
rootFileConfig := filepath.Join(s.localConfigPath, GDriveRootConfigFile)
rootId, err := ioutil.ReadFile(rootFileConfig)
if err != nil && !os.IsNotExist(err) {
return err
}
if string(rootId) != "" {
s.rootId = string(rootId)
return nil
}
dir := &drive.File{
Name: s.basedir,
MimeType: GDriveDirectoryMimeType,
}
di, err := s.service.Files.Create(dir).Fields("id").Do()
if err != nil {
return err
}
s.rootId = di.Id
err = ioutil.WriteFile(rootFileConfig, []byte(s.rootId), os.FileMode(0600))
if err != nil {
return err
}
return nil
}
func (s *GDrive) hasChecksum(f *drive.File) bool {
return f.Md5Checksum != ""
}
func (s *GDrive) list(nextPageToken string, q string) (*drive.FileList, error) {
return s.service.Files.List().Fields("nextPageToken, files(id, name, mimeType)").Q(q).PageToken(nextPageToken).Do()
}
func (s *GDrive) findId(filename string, token string) (string, error) {
filename = strings.Replace(filename, `'`, `\'`, -1)
filename = strings.Replace(filename, `"`, `\"`, -1)
fileId, tokenId, nextPageToken := "", "", ""
q := fmt.Sprintf("'%s' in parents and name='%s' and mimeType='%s' and trashed=false", s.rootId, token, GDriveDirectoryMimeType)
l, err := s.list(nextPageToken, q)
if err != nil {
return "", err
}
for 0 < len(l.Files) {
for _, fi := range l.Files {
tokenId = fi.Id
break
}
if l.NextPageToken == "" {
break
}
l, err = s.list(l.NextPageToken, q)
}
if filename == "" {
return tokenId, nil
} else if tokenId == "" {
return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
}
q = fmt.Sprintf("'%s' in parents and name='%s' and mimeType!='%s' and trashed=false", tokenId, filename, GDriveDirectoryMimeType)
l, err = s.list(nextPageToken, q)
if err != nil {
return "", err
}
for 0 < len(l.Files) {
for _, fi := range l.Files {
fileId = fi.Id
break
}
if l.NextPageToken == "" {
break
}
l, err = s.list(l.NextPageToken, q)
}
if fileId == "" {
return "", fmt.Errorf("Cannot find file %s/%s", token, filename)
}
return fileId, nil
}
func (s *GDrive) Type() string {
return "gdrive"
}
func (s *GDrive) Head(token string, filename string) (contentLength uint64, err error) {
var fileId string
fileId, err = s.findId(filename, token)
if err != nil {
return
}
var fi *drive.File
if fi, err = s.service.Files.Get(fileId).Fields("size").Do(); err != nil {
return
}
contentLength = uint64(fi.Size)
return
}
func (s *GDrive) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
var fileId string
fileId, err = s.findId(filename, token)
if err != nil {
return
}
var fi *drive.File
fi, err = s.service.Files.Get(fileId).Fields("size", "md5Checksum").Do()
if !s.hasChecksum(fi) {
err = fmt.Errorf("Cannot find file %s/%s", token, filename)
return
}
contentLength = uint64(fi.Size)
ctx := context.Background()
var res *http.Response
res, err = s.service.Files.Get(fileId).Context(ctx).Download()
if err != nil {
return
}
reader = res.Body
return
}
func (s *GDrive) Delete(token string, filename string) (err error) {
metadata, _ := s.findId(fmt.Sprintf("%s.metadata", filename), token)
s.service.Files.Delete(metadata).Do()
var fileId string
fileId, err = s.findId(filename, token)
if err != nil {
return
}
err = s.service.Files.Delete(fileId).Do()
return
}
func (s *GDrive) Purge(days time.Duration) (err error) {
nextPageToken := ""
expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
q := fmt.Sprintf("'%s' in parents and modifiedTime < '%s' and mimeType!='%s' and trashed=false", s.rootId, expirationDate, GDriveDirectoryMimeType)
l, err := s.list(nextPageToken, q)
if err != nil {
return err
}
for 0 < len(l.Files) {
for _, fi := range l.Files {
err = s.service.Files.Delete(fi.Id).Do()
if err != nil {
return
}
}
if l.NextPageToken == "" {
break
}
l, err = s.list(l.NextPageToken, q)
}
return
}
func (s *GDrive) IsNotExist(err error) bool {
if err != nil {
if e, ok := err.(*googleapi.Error); ok {
return e.Code == http.StatusNotFound
}
}
return false
}
func (s *GDrive) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error {
dirId, err := s.findId("", token)
if err != nil {
return err
}
if dirId == "" {
dir := &drive.File{
Name: token,
Parents: []string{s.rootId},
MimeType: GDriveDirectoryMimeType,
}
di, err := s.service.Files.Create(dir).Fields("id").Do()
if err != nil {
return err
}
dirId = di.Id
}
// Instantiate empty drive file
dst := &drive.File{
Name: filename,
Parents: []string{dirId},
MimeType: contentType,
}
ctx := context.Background()
_, err = s.service.Files.Create(dst).Context(ctx).Media(reader, googleapi.ChunkSize(s.chunkSize)).Do()
if err != nil {
return err
}
return nil
}
// Retrieve a token, saves the token, then returns the generated client.
func getGDriveClient(config *oauth2.Config, localConfigPath string, logger *log.Logger) *http.Client {
tokenFile := filepath.Join(localConfigPath, GDriveTokenJsonFile)
tok, err := gDriveTokenFromFile(tokenFile)
if err != nil {
tok = getGDriveTokenFromWeb(config, logger)
saveGDriveToken(tokenFile, tok, logger)
}
return config.Client(context.Background(), tok)
}
// Request a token from the web, then returns the retrieved token.
func getGDriveTokenFromWeb(config *oauth2.Config, logger *log.Logger) *oauth2.Token {
authURL := config.AuthCodeURL("state-token", oauth2.AccessTypeOffline)
fmt.Printf("Go to the following link in your browser then type the "+
"authorization code: \n%v\n", authURL)
var authCode string
if _, err := fmt.Scan(&authCode); err != nil {
logger.Fatalf("Unable to read authorization code %v", err)
}
tok, err := config.Exchange(context.TODO(), authCode)
if err != nil {
logger.Fatalf("Unable to retrieve token from web %v", err)
}
return tok
}
// Retrieves a token from a local file.
func gDriveTokenFromFile(file string) (*oauth2.Token, error) {
f, err := os.Open(file)
defer f.Close()
if err != nil {
return nil, err
}
tok := &oauth2.Token{}
err = json.NewDecoder(f).Decode(tok)
return tok, err
}
// Saves a token to a file path.
func saveGDriveToken(path string, token *oauth2.Token, logger *log.Logger) {
logger.Printf("Saving credential file to: %s\n", path)
f, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
defer f.Close()
if err != nil {
logger.Fatalf("Unable to cache oauth token: %v", err)
}
json.NewEncoder(f).Encode(token)
}
type StorjStorage struct {
Storage
project *uplink.Project
bucket *uplink.Bucket
purgeDays time.Duration
logger *log.Logger
}
func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
var instance StorjStorage
var err error
ctx := context.TODO()
parsedAccess, err := uplink.ParseAccess(access)
if err != nil {
return nil, err
}
instance.project, err = uplink.OpenProject(ctx, parsedAccess)
if err != nil {
return nil, err
}
instance.bucket, err = instance.project.EnsureBucket(ctx, bucket)
if err != nil {
//Ignoring the error to return the one that occurred first, but try to clean up.
_ = instance.project.Close()
return nil, err
}
instance.purgeDays = time.Duration(purgeDays*24) * time.Hour
instance.logger = logger
return &instance, nil
}
func (s *StorjStorage) Type() string {
return "storj"
}
func (s *StorjStorage) Head(token string, filename string) (contentLength uint64, err error) {
key := storj.JoinPaths(token, filename)
ctx := context.TODO()
obj, err := s.project.StatObject(ctx, s.bucket.Name, key)
if err != nil {
return 0, err
}
contentLength = uint64(obj.System.ContentLength)
return
}
func (s *StorjStorage) Get(token string, filename string) (reader io.ReadCloser, contentLength uint64, err error) {
key := storj.JoinPaths(token, filename)
s.logger.Printf("Getting file %s from Storj Bucket", filename)
ctx := context.TODO()
download, err := s.project.DownloadObject(ctx, s.bucket.Name, key, nil)
if err != nil {
return nil, 0, err
}
contentLength = uint64(download.Info().System.ContentLength)
reader = download
return
}
func (s *StorjStorage) Delete(token string, filename string) (err error) {
key := storj.JoinPaths(token, filename)
s.logger.Printf("Deleting file %s from Storj Bucket", filename)
ctx := context.TODO()
_, err = s.project.DeleteObject(ctx, s.bucket.Name, key)
return
}
func (s *StorjStorage) Purge(days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}
func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := storj.JoinPaths(token, filename)
s.logger.Printf("Uploading file %s to Storj Bucket", filename)
ctx := context.TODO()
writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)})
if err != nil {
return err
}
n, err := io.Copy(writer, reader)
if err != nil || uint64(n) != contentLength {
//Ignoring the error to return the one that occurred first, but try to clean up.
_ = writer.Abort()
return err
}
err = writer.SetCustomMetadata(ctx, uplink.CustomMetadata{"content-type": contentType})
if err != nil {
//Ignoring the error to return the one that occurred first, but try to clean up.
_ = writer.Abort()
return err
}
err = writer.Commit()
return err
}
func (s *StorjStorage) IsNotExist(err error) bool {
return errors.Is(err, uplink.ErrObjectNotFound)
}