PURGE FEATURE

This commit is contained in:
Andrea Spacca 2021-01-05 17:23:47 +01:00
parent 2721ece759
commit 6ac6c8fa99
5 changed files with 135 additions and 14 deletions

View file

@ -113,6 +113,8 @@ cors-domains | comma separated list of domains for CORS, setting it enable CORS
clamav-host | host for clamav feature | | CLAMAV_HOST | clamav-host | host for clamav feature | | CLAMAV_HOST |
rate-limit | request per minute | | RATE_LIMIT | rate-limit | request per minute | | RATE_LIMIT |
max-upload-size | max upload size in kilobytes | | MAX_UPLOAD_SIZE | max-upload-size | max upload size in kilobytes | | MAX_UPLOAD_SIZE |
purge-days | number of days after the uploads are purged automatically | | PURGE_DAYS |
purge-interval | interval in hours to run the automatic purge for (not applicable to S3 and Storj) | | PURGE_INTERVAL |
If you want to use TLS using lets encrypt certificates, set lets-encrypt-hosts to your domain, set tls-listener to :443 and enable force-https. If you want to use TLS using lets encrypt certificates, set lets-encrypt-hosts to your domain, set tls-listener to :443 and enable force-https.

View file

@ -191,6 +191,18 @@ var globalFlags = []cli.Flag{
Value: 0, Value: 0,
EnvVar: "RATE_LIMIT", EnvVar: "RATE_LIMIT",
}, },
cli.IntFlag{
Name: "purge-days",
Usage: "number of days after uploads are purged automatically",
Value: 0,
EnvVar: "PURGE_DAYS",
},
cli.IntFlag{
Name: "purge-interval",
Usage: "interval in hours to run the automatic purge for",
Value: 0,
EnvVar: "PURGE_INTERVAL",
},
cli.Int64Flag{ cli.Int64Flag{
Name: "max-upload-size", Name: "max-upload-size",
Usage: "max limit for upload, in kilobytes", Usage: "max limit for upload, in kilobytes",
@ -365,6 +377,13 @@ func New() *Cmd {
options = append(options, server.RateLimit(v)) options = append(options, server.RateLimit(v))
} }
purgeDays := c.Int("purge-days")
purgeInterval := c.Int("purge-interval")
if purgeDays > 0 && purgeInterval > 0 {
options = append(options, server.Purge(purgeDays, purgeInterval))
}
if cert := c.String("tls-cert-file"); cert == "" { if cert := c.String("tls-cert-file"); cert == "" {
} else if pk := c.String("tls-private-key"); pk == "" { } else if pk := c.String("tls-private-key"); pk == "" {
} else { } else {
@ -410,7 +429,7 @@ func New() *Cmd {
panic("secret-key not set.") panic("secret-key not set.")
} else if bucket := c.String("bucket"); bucket == "" { } else if bucket := c.String("bucket"); bucket == "" {
panic("bucket not set.") panic("bucket not set.")
} else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket, c.String("s3-region"), c.String("s3-endpoint"), logger, c.Bool("s3-no-multipart"), c.Bool("s3-path-style")); err != nil { } else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket, purgeDays, c.String("s3-region"), c.String("s3-endpoint"), c.Bool("s3-no-multipart"), c.Bool("s3-path-style"), logger); err != nil {
panic(err) panic(err)
} else { } else {
options = append(options, server.UseStorage(storage)) options = append(options, server.UseStorage(storage))
@ -434,7 +453,7 @@ func New() *Cmd {
panic("storj-access not set.") panic("storj-access not set.")
} else if bucket := c.String("storj-bucket"); bucket == "" { } else if bucket := c.String("storj-bucket"); bucket == "" {
panic("storj-bucket not set.") panic("storj-bucket not set.")
} else if storage, err := server.NewStorjStorage(access, bucket, logger); err != nil { } else if storage, err := server.NewStorjStorage(access, bucket, purgeDays, logger); err != nil {
panic(err) panic(err)
} else { } else {
options = append(options, server.UseStorage(storage)) options = append(options, server.UseStorage(storage))

View file

@ -690,6 +690,19 @@ func (s *Server) CheckDeletionToken(deletionToken, token, filename string) error
return nil return nil
} }
func (s *Server) purgeHandler() {
ticker := time.NewTicker(s.purgeInterval)
go func() {
for {
select {
case <-ticker.C:
err := s.storage.Purge(s.purgeDays)
log.Printf("error cleaning up expired files: %v", err)
}
}
}()
}
func (s *Server) deleteHandler(w http.ResponseWriter, r *http.Request) { func (s *Server) deleteHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r) vars := mux.Vars(r)

View file

@ -187,6 +187,14 @@ func RateLimit(requests int) OptionFn {
} }
} }
func Purge(days, interval int) OptionFn {
return func(srvr *Server) {
srvr.purgeDays = time.Duration(days) * time.Hour * 24
srvr.purgeInterval = time.Duration(interval) * time.Hour
}
}
func ForceHTTPs() OptionFn { func ForceHTTPs() OptionFn {
return func(srvr *Server) { return func(srvr *Server) {
srvr.forceHTTPs = true srvr.forceHTTPs = true
@ -280,6 +288,9 @@ type Server struct {
maxUploadSize int64 maxUploadSize int64
rateLimitRequests int rateLimitRequests int
purgeDays time.Duration
purgeInterval time.Duration
storage Storage storage Storage
forceHTTPs bool forceHTTPs bool
@ -500,6 +511,10 @@ func (s *Server) Run() {
s.logger.Printf("---------------------------") s.logger.Printf("---------------------------")
if s.purgeDays > 0 {
go s.purgeHandler()
}
term := make(chan os.Signal, 1) term := make(chan os.Signal, 1)
signal.Notify(term, os.Interrupt) signal.Notify(term, os.Interrupt)
signal.Notify(term, syscall.SIGTERM) signal.Notify(term, syscall.SIGTERM)

View file

@ -4,14 +4,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/aws/session"
@ -22,6 +14,14 @@ import (
"golang.org/x/oauth2/google" "golang.org/x/oauth2/google"
"google.golang.org/api/drive/v3" "google.golang.org/api/drive/v3"
"google.golang.org/api/googleapi" "google.golang.org/api/googleapi"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
"time"
"storj.io/common/storj" "storj.io/common/storj"
"storj.io/uplink" "storj.io/uplink"
@ -33,6 +33,7 @@ type Storage interface {
Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) error
Delete(token string, filename string) error Delete(token string, filename string) error
IsNotExist(err error) bool IsNotExist(err error) bool
Purge(days time.Duration) error
Type() string Type() string
} }
@ -91,6 +92,27 @@ func (s *LocalStorage) Delete(token string, filename string) (err error) {
return return
} }
func (s *LocalStorage) Purge(days time.Duration) (err error) {
err = filepath.Walk(s.basedir,
func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if info.IsDir() {
return nil
}
if info.ModTime().After(time.Now().Add(-1 * days)) {
err = os.Remove(path)
return err
}
return nil
})
return
}
func (s *LocalStorage) IsNotExist(err error) bool { func (s *LocalStorage) IsNotExist(err error) bool {
if err == nil { if err == nil {
return false return false
@ -128,13 +150,21 @@ type S3Storage struct {
session *session.Session session *session.Session
s3 *s3.S3 s3 *s3.S3
logger *log.Logger logger *log.Logger
purgeDays time.Duration
noMultipart bool noMultipart bool
} }
func NewS3Storage(accessKey, secretKey, bucketName, region, endpoint string, logger *log.Logger, disableMultipart bool, forcePathStyle bool) (*S3Storage, error) { func NewS3Storage(accessKey, secretKey, bucketName string, purgeDays int, region, endpoint string, disableMultipart bool, forcePathStyle bool, logger *log.Logger) (*S3Storage, error) {
sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle) sess := getAwsSession(accessKey, secretKey, region, endpoint, forcePathStyle)
return &S3Storage{bucket: bucketName, s3: s3.New(sess), session: sess, logger: logger, noMultipart: disableMultipart}, nil return &S3Storage{
bucket: bucketName,
s3: s3.New(sess),
session: sess,
logger: logger,
noMultipart: disableMultipart,
purgeDays: time.Duration(purgeDays * 24) * time.Hour,
}, nil
} }
func (s *S3Storage) Type() string { func (s *S3Storage) Type() string {
@ -162,6 +192,11 @@ func (s *S3Storage) Head(token string, filename string) (contentLength uint64, e
return return
} }
func (s *S3Storage) Purge(days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}
func (s *S3Storage) IsNotExist(err error) bool { func (s *S3Storage) IsNotExist(err error) bool {
if err == nil { if err == nil {
return false return false
@ -242,6 +277,7 @@ func (s *S3Storage) Put(token string, filename string, reader io.Reader, content
Bucket: aws.String(s.bucket), Bucket: aws.String(s.bucket),
Key: aws.String(key), Key: aws.String(key),
Body: reader, Body: reader,
Expires: aws.Time(time.Now().Add(s.purgeDays)),
}) })
return return
@ -448,6 +484,34 @@ func (s *GDrive) Delete(token string, filename string) (err error) {
return return
} }
func (s *GDrive) Purge(days time.Duration) (err error) {
nextPageToken := ""
expirationDate := time.Now().Add(-1 * days).Format(time.RFC3339)
q := fmt.Sprintf("'%s' in parents and modifiedTime > '%s' and mimeType!='%s' and trashed=false", s.rootId, expirationDate, GDriveDirectoryMimeType)
l, err := s.list(nextPageToken, q)
if err != nil {
return err
}
for 0 < len(l.Files) {
for _, fi := range l.Files {
err = s.service.Files.Delete(fi.Id).Do()
if err != nil {
return
}
}
if l.NextPageToken == "" {
break
}
l, err = s.list(l.NextPageToken, q)
}
return
}
func (s *GDrive) IsNotExist(err error) bool { func (s *GDrive) IsNotExist(err error) bool {
if err != nil { if err != nil {
if e, ok := err.(*googleapi.Error); ok { if e, ok := err.(*googleapi.Error); ok {
@ -554,10 +618,11 @@ type StorjStorage struct {
Storage Storage
project *uplink.Project project *uplink.Project
bucket *uplink.Bucket bucket *uplink.Bucket
purgeDays time.Duration
logger *log.Logger logger *log.Logger
} }
func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage, error) { func NewStorjStorage(access, bucket string, purgeDays int, logger *log.Logger) (*StorjStorage, error) {
var instance StorjStorage var instance StorjStorage
var err error var err error
@ -580,6 +645,8 @@ func NewStorjStorage(access, bucket string, logger *log.Logger) (*StorjStorage,
return nil, err return nil, err
} }
instance.purgeDays = time.Duration(purgeDays * 24) * time.Hour
instance.logger = logger instance.logger = logger
return &instance, nil return &instance, nil
@ -634,6 +701,11 @@ func (s *StorjStorage) Delete(token string, filename string) (err error) {
return return
} }
func (s *StorjStorage) Purge(days time.Duration) (err error) {
// NOOP expiration is set at upload time
return nil
}
func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) { func (s *StorjStorage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := storj.JoinPaths(token, filename) key := storj.JoinPaths(token, filename)
@ -641,7 +713,7 @@ func (s *StorjStorage) Put(token string, filename string, reader io.Reader, cont
ctx := context.TODO() ctx := context.TODO()
writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, nil) writer, err := s.project.UploadObject(ctx, s.bucket.Name, key, &uplink.UploadOptions{Expires: time.Now().Add(s.purgeDays)})
if err != nil { if err != nil {
return err return err
} }