s3: Move from goamz to minio-go

goamz is not maintained anymore move to maintained
library like `minio-go`.

`minio-go` supports S3 and many other s3 compatible
vendors. `minio-go` also supports both Signature V2 and
Signature V4. This in-turn would enable `transfer.sh`
to be able to use many different S3 compatible storage
vendors.

Fixes #5
This commit is contained in:
Harshavardhana 2016-05-04 02:35:51 -07:00 committed by Harshavardhana
parent c945cf85a3
commit 3882888d9d
4 changed files with 229 additions and 174 deletions

View file

@ -216,13 +216,15 @@ func New() *Cmd {
switch provider := c.String("provider"); provider {
case "s3":
if accessKey := c.String("aws-access-key"); accessKey == "" {
if endpoint := c.String("s3-endpoint"); endpoint == "" {
endpoint = "s3.amazonaws.com"
} else if accessKey := c.String("aws-access-key"); accessKey == "" {
panic("access-key not set.")
} else if secretKey := c.String("aws-secret-key"); secretKey == "" {
panic("secret-key not set.")
} else if bucket := c.String("bucket"); bucket == "" {
panic("bucket not set.")
} else if storage, err := server.NewS3Storage(accessKey, secretKey, bucket); err != nil {
} else if storage, err := server.NewS3Storage(endpoint, accessKey, secretKey, bucket); err != nil {
panic(err)
} else {
options = append(options, server.UseStorage(storage))

View file

@ -1,17 +1,14 @@
package server
import (
"bytes"
"fmt"
"io"
"log"
"mime"
"os"
"path/filepath"
"strconv"
"sync"
"github.com/goamz/goamz/s3"
"github.com/minio/minio-go"
)
type Storage interface {
@ -105,16 +102,20 @@ func (s *LocalStorage) Put(token string, filename string, reader io.Reader, cont
type S3Storage struct {
Storage
bucket *s3.Bucket
bucket string
client *minio.Client
}
func NewS3Storage(accessKey, secretKey, bucketName string) (*S3Storage, error) {
bucket, err := getBucket(accessKey, secretKey, bucketName)
func NewS3Storage(endpoint, accessKey, secretKey, bucket string) (*S3Storage, error) {
s3Client, err := minio.NewV4(endpoint, accessKey, secretKey, true)
if err != nil {
return nil, err
}
return &S3Storage{bucket: bucket}, nil
return &S3Storage{
client: s3Client,
bucket: bucket,
}, nil
}
func (s *S3Storage) Type() string {
@ -125,18 +126,13 @@ func (s *S3Storage) Head(token string, filename string) (contentType string, con
key := fmt.Sprintf("%s/%s", token, filename)
// content type , content length
response, err := s.bucket.Head(key, map[string][]string{})
if err != nil {
return
}
contentType = response.Header.Get("Content-Type")
contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
objInfo, err := s.client.StatObject(s.bucket, key)
if err != nil {
return
}
contentType = objInfo.ContentType
contentLength = uint64(objInfo.Size)
return
}
@ -156,131 +152,34 @@ func (s *S3Storage) Get(token string, filename string) (reader io.ReadCloser, co
key := fmt.Sprintf("%s/%s", token, filename)
// content type , content length
response, err := s.bucket.GetResponse(key)
obj, err := s.client.GetObject(s.bucket, key)
if err != nil {
return
}
// obj is *minio.Object - implements io.ReadCloser.
reader = obj
objInfo, err := obj.Stat()
if err != nil {
return
}
contentType = response.Header.Get("Content-Type")
contentLength, err = strconv.ParseUint(response.Header.Get("Content-Length"), 10, 0)
if err != nil {
return
}
reader = response.Body
contentType = objInfo.ContentType
contentLength = uint64(objInfo.Size)
return
}
func (s *S3Storage) Put(token string, filename string, reader io.Reader, contentType string, contentLength uint64) (err error) {
key := fmt.Sprintf("%s/%s", token, filename)
var (
multi *s3.Multi
parts []s3.Part
)
if multi, err = s.bucket.InitMulti(key, contentType, s3.Private); err != nil {
log.Printf(err.Error())
n, err := s.client.PutObject(s.bucket, key, reader, contentType)
if err != nil {
return
}
// 20 mb parts
partsChan := make(chan interface{})
// partsChan := make(chan s3.Part)
go func() {
// maximize to 20 threads
sem := make(chan int, 20)
index := 1
var wg sync.WaitGroup
for {
// buffered in memory because goamz s3 multi needs seekable reader
var (
buffer []byte = make([]byte, (1<<20)*10)
count int
err error
)
// Amazon expects parts of at least 5MB, except for the last one
if count, err = io.ReadAtLeast(reader, buffer, (1<<20)*5); err != nil && err != io.ErrUnexpectedEOF && err != io.EOF {
log.Printf(err.Error())
return
}
// always send minimal 1 part
if err == io.EOF && index > 1 {
log.Printf("Waiting for all parts to finish uploading.")
// wait for all parts to be finished uploading
wg.Wait()
// and close the channel
close(partsChan)
return
}
wg.Add(1)
sem <- 1
// using goroutines because of retries when upload fails
go func(multi *s3.Multi, buffer []byte, index int) {
log.Printf("Uploading part %d %d", index, len(buffer))
defer func() {
log.Printf("Finished part %d %d", index, len(buffer))
wg.Done()
<-sem
}()
partReader := bytes.NewReader(buffer)
var part s3.Part
if part, err = multi.PutPart(index, partReader); err != nil {
log.Printf("Error while uploading part %d %d %s", index, len(buffer), err.Error())
partsChan <- err
return
}
log.Printf("Finished uploading part %d %d", index, len(buffer))
partsChan <- part
}(multi, buffer[:count], index)
index++
}
}()
// wait for all parts to be uploaded
for part := range partsChan {
switch part.(type) {
case s3.Part:
parts = append(parts, part.(s3.Part))
case error:
// abort multi upload
log.Printf("Error during upload, aborting %s.", part.(error).Error())
err = part.(error)
multi.Abort()
return
}
}
log.Printf("Completing upload %d parts", len(parts))
if err = multi.Complete(parts); err != nil {
log.Printf("Error during completing upload %d parts %s", len(parts), err.Error())
if uint64(n) != contentLength {
err = fmt.Errorf("Uploaded content %d is not equal to requested length %d", n, contentLength)
return
}
log.Printf("Completed uploading %d", len(parts))
log.Printf("Completed uploading %s", key)
return
}

View file

@ -30,55 +30,11 @@ import (
"net/mail"
"strconv"
"strings"
"time"
"github.com/goamz/goamz/aws"
"github.com/goamz/goamz/s3"
"github.com/golang/gddo/httputil/header"
)
func getBucket(accessKey, secretKey, bucket string) (*s3.Bucket, error) {
auth, err := aws.GetAuth(accessKey, secretKey, "", time.Time{})
if err != nil {
return nil, err
}
var EUWestWithoutHTTPS = aws.Region{
Name: "eu-west-1",
EC2Endpoint: "https://ec2.eu-west-1.amazonaws.com",
S3Endpoint: "http://s3-eu-west-1.amazonaws.com",
S3BucketEndpoint: "",
S3LocationConstraint: true,
S3LowercaseBucket: true,
SDBEndpoint: "https://sdb.eu-west-1.amazonaws.com",
SESEndpoint: "https://email.eu-west-1.amazonaws.com",
SNSEndpoint: "https://sns.eu-west-1.amazonaws.com",
SQSEndpoint: "https://sqs.eu-west-1.amazonaws.com",
IAMEndpoint: "https://iam.amazonaws.com",
ELBEndpoint: "https://elasticloadbalancing.eu-west-1.amazonaws.com",
DynamoDBEndpoint: "https://dynamodb.eu-west-1.amazonaws.com",
CloudWatchServicepoint: aws.ServiceInfo{
Endpoint: "https://monitoring.eu-west-1.amazonaws.com",
Signer: aws.V2Signature,
},
AutoScalingEndpoint: "https://autoscaling.eu-west-1.amazonaws.com",
RDSEndpoint: aws.ServiceInfo{
Endpoint: "https://rds.eu-west-1.amazonaws.com",
Signer: aws.V2Signature,
},
STSEndpoint: "https://sts.amazonaws.com",
CloudFormationEndpoint: "https://cloudformation.eu-west-1.amazonaws.com",
ECSEndpoint: "https://ecs.eu-west-1.amazonaws.com",
DynamoDBStreamsEndpoint: "https://streams.dynamodb.eu-west-1.amazonaws.com",
}
conn := s3.New(auth, EUWestWithoutHTTPS)
b := conn.Bucket(bucket)
return b, nil
}
func formatNumber(format string, s uint64) string {
return RenderFloat(format, float64(s))
}

198
transfersh-server/main.go Normal file
View file

@ -0,0 +1,198 @@
/*
The MIT License (MIT)
Copyright (c) 2014 DutchCoders [https://github.com/dutchcoders/]
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
package main
import (
// _ "transfer.sh/app/handlers"
// _ "transfer.sh/app/utils"
"flag"
"fmt"
"log"
"math/rand"
"mime"
"net/http"
"net/url"
"os"
"time"
"github.com/PuerkitoBio/ghost/handlers"
"github.com/gorilla/mux"
)
const SERVER_INFO = "transfer.sh"
// parse request with maximum memory of _24Kilobits
const _24K = (1 << 20) * 24
type s3Config struct {
Endpoint string
AwsAccessKey string
AwsSecretKey string
Bucket string
SignV2 bool
DisableSSL bool
}
var config struct {
s3Cfg s3Config
VIRUSTOTAL_KEY string
CLAMAV_DAEMON_HOST string "/tmp/clamd.socket"
Temp string
}
var storage Storage
func init() {
config.s3Cfg.Endpoint = func() string {
endpoint := os.Getenv("S3_ENDPOINT")
if endpoint == "" {
// Defaults to Amazon S3.
endpoint = "s3.amazonaws.com"
}
return endpoint
}()
config.s3Cfg.AwsAccessKey = os.Getenv("AWS_ACCESS_KEY_ID")
config.s3Cfg.AwsSecretKey = os.Getenv("AWS_SECRET_KEY")
config.s3Cfg.Bucket = os.Getenv("BUCKET")
// Enables AWS Signature v2 if set to 'v2', default is 'v4'.
config.s3Cfg.SignV2 = os.Getenv("SIGNATURE_VERSION") == "v2"
// Disables SSL for s3 connection if set to true.
config.s3Cfg.DisableSSL = os.Getenv("DISABLE_SSL") == "true"
config.VIRUSTOTAL_KEY = os.Getenv("VIRUSTOTAL_KEY")
if os.Getenv("CLAMAV_DAEMON_HOST") != "" {
config.CLAMAV_DAEMON_HOST = os.Getenv("CLAMAV_DAEMON_HOST")
}
config.Temp = os.TempDir()
}
func main() {
rand.Seed(time.Now().UTC().UnixNano())
r := mux.NewRouter()
r.PathPrefix("/scripts/").Methods("GET").Handler(http.FileServer(http.Dir("./static/")))
r.PathPrefix("/styles/").Methods("GET").Handler(http.FileServer(http.Dir("./static/")))
r.PathPrefix("/images/").Methods("GET").Handler(http.FileServer(http.Dir("./static/")))
r.PathPrefix("/fonts/").Methods("GET").Handler(http.FileServer(http.Dir("./static/")))
r.PathPrefix("/ico/").Methods("GET").Handler(http.FileServer(http.Dir("./static/")))
r.PathPrefix("/favicon.ico").Methods("GET").Handler(http.FileServer(http.Dir("./static/")))
r.PathPrefix("/robots.txt").Methods("GET").Handler(http.FileServer(http.Dir("./static/")))
r.HandleFunc("/({files:.*}).zip", zipHandler).Methods("GET")
r.HandleFunc("/({files:.*}).tar", tarHandler).Methods("GET")
r.HandleFunc("/({files:.*}).tar.gz", tarGzHandler).Methods("GET")
r.HandleFunc("/download/{token}/{filename}", getHandler).Methods("GET")
r.HandleFunc("/{token}/{filename}", previewHandler).MatcherFunc(func(r *http.Request, rm *mux.RouteMatch) (match bool) {
match = false
// The file will show a preview page when opening the link in browser directly or
// from external link. If the referer url path and current path are the same it will be
// downloaded.
if !acceptsHtml(r.Header) {
return false
}
match = (r.Referer() == "")
u, err := url.Parse(r.Referer())
if err != nil {
log.Fatal(err)
return
}
match = match || (u.Path != r.URL.Path)
return
}).Methods("GET")
r.HandleFunc("/{token}/{filename}", getHandler).Methods("GET")
r.HandleFunc("/get/{token}/{filename}", getHandler).Methods("GET")
r.HandleFunc("/{filename}/virustotal", virusTotalHandler).Methods("PUT")
r.HandleFunc("/{filename}/scan", scanHandler).Methods("PUT")
r.HandleFunc("/put/{filename}", putHandler).Methods("PUT")
r.HandleFunc("/upload/{filename}", putHandler).Methods("PUT")
r.HandleFunc("/{filename}", putHandler).Methods("PUT")
r.HandleFunc("/health.html", healthHandler).Methods("GET")
r.HandleFunc("/", postHandler).Methods("POST")
// r.HandleFunc("/{page}", viewHandler).Methods("GET")
r.HandleFunc("/", viewHandler).Methods("GET")
r.NotFoundHandler = http.HandlerFunc(notFoundHandler)
port := flag.String("port", "8080", "port number, default: 8080")
temp := flag.String("temp", config.Temp, "")
basedir := flag.String("basedir", "", "")
logpath := flag.String("log", "", "")
provider := flag.String("provider", "s3", "")
flag.Parse()
if *logpath != "" {
f, err := os.OpenFile(*logpath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
log.Fatalf("error opening file: %v", err)
}
defer f.Close()
log.SetOutput(f)
}
config.Temp = *temp
var err error
switch *provider {
case "s3":
storage, err = NewS3Storage(config.s3Cfg)
case "local":
if *basedir == "" {
log.Panic("basedir not set")
}
storage, err = NewLocalStorage(*basedir)
default:
log.Fatalf("Unknown provider \"%s\", currently supported are [s3,local]", *provider)
}
if err != nil {
log.Panic("Error while creating storage.", err)
}
mime.AddExtensionType(".md", "text/x-markdown")
log.Printf("Transfer.sh server started. :\nlistening on port: %v\nusing temp folder: %s\nusing storage provider: %s", *port, config.Temp, *provider)
log.Printf("---------------------------")
s := &http.Server{
Addr: fmt.Sprintf(":%s", *port),
Handler: handlers.PanicHandler(LoveHandler(RedirectHandler(handlers.LogHandler(r, handlers.NewLogOptions(log.Printf, "_default_")))), nil),
}
log.Panic(s.ListenAndServe())
log.Printf("Server stopped.")
}