首先需要導入依賴
go get gopkg.in/amz.v1/aws
go get gopkg.in/amz.v1/s3
1. 初始化ceph連接
在初始化連接之前,我們需要創建一個用戶得到accessKey和secretKey,新增用戶的指令如下:
docker exec ceph-rgw radosgw-admin user create --uid="test" --display-name="test user"
下面就是初始化ceph客戶端的操作,和一些數據庫的連接一樣的,都是授權+地址:
func init() {
auth := aws.Auth{
AccessKey: accessKey,
SecretKey: secretKey,
}
region := aws.Region{
Name: "default",
EC2Endpoint: url, // "http://<ceph-rgw ip>:<ceph-rgw port>"
S3Endpoint: url,
S3BucketEndpoint: "", // Not needed by AWS S3
S3LocationConstraint: false, // true if this region requires a LocationConstraint declaration
S3LowercaseBucket: false, // true if the region requires bucket names to be lower case
Sign: aws.SignV2,
}
CephConn = s3.New(auth, region)
}
下面是針對ceph的一些簡單操作,因為項目原因只需要一些文件上傳下載的操作,其他操作可以參見amz.v1的doc
2. 獲取一個桶
func GetCephBucket(bucket string) *s3.Bucket {
return CephConn.Bucket(bucket)
}
3. 將本地文件上傳到ceph的一個bucket中
func put2Bucket(bucket *s3.Bucket, localPath, cephPath string) (*s3.Bucket, error) {
err := bucket.PutBucket(s3.PublicRead)
if err != nil {
log.Fatal(err.Error())
return nil, err
}
bytes, err := ioutil.ReadFile(localPath)
if err != nil {
log.Fatal(err.Error())
return nil, err
}
err = bucket.Put(cephPath, bytes, "octet-stream" , s3.PublicRead)
return bucket, err
}
4. 從ceph下載文件
func downloadFromCeph(bucket *s3.Bucket, localPath, cephPath string) error {
data, err := bucket.Get(cephPath)
if err != nil {
log.Fatal(err.Error())
return err
}
return ioutil.WriteFile(localPath, data, 0666)
}
5. 刪除指定的文件
func delCephData(bucket *s3.Bucket, cephPath string) error {
err := bucket.Del(cephPath)
if err != nil {
log.Fatal(err.Error())
}
return err
}
6. 刪除桶
刪除桶時要保證桶內文件已經被刪除
func delBucket(bucket *s3.Bucket) error {
err := bucket.DelBucket()
if err != nil {
log.Fatal(err.Error())
}
return err
}
7. 批量獲取文件信息
func getBatchFromCeph(bucket *s3.Bucket, prefixCephPath string) []string {
maxBatch := 100
// bucket.List() 返回桶內objects的信息,默認1000條
resultListResp, err := bucket.List(prefixCephPath, "" , "" , maxBatch)
if err != nil {
log.Fatal(err.Error())
return nil
}
keyList := make([]string, 0)
for _, key := range resultListResp.Contents {
keyList = append(keyList, key.Key)
}
return keyList
}
8. 測試
編寫一個main.go來測試這些接口,嘗試上傳和下載一個文件
func main() {
bucketName := "bucket_test"
filename := "C:\\Users\\dell\\Desktop\\ieee.jpg"
cephPath := "/static/default/bucket_test/V1/" + "ieee_ceph.jpg"
// 獲取指定桶
bucket := GetCephBucket(bucketName)
// 上傳
bucket, err := put2Bucket(bucket, filename, cephPath)
if err != nil {
return
}
// 下載
localPath := "C:\\Users\\dell\\Desktop\\download.jpg"
err = downloadFromCeph(bucket, localPath, cephPath)
if err != nil {
return
}
// 獲得url
url := bucket.SignedURL(cephPath, time.Now().Add(time.Hour))
fmt.Println(url)
// 批量查找
prefixCephpath := "static/default/bucket_test/V1"
lists := getBatchFromCeph(bucket, prefixCephpath)
for _, list := range lists {
fmt.Println(list)
}
// 刪除數據
delCephData(bucket, cephPath)
// 刪除桶
delBucket(bucket)
}
9. 上傳ceph的簡單優化
選擇將本地服務器文件上傳ceph應該是異步操作,無論是采用chan通知一個上傳的協程還是晚上的定時任務
異步任務的操作可以選擇簡單的chan,或者使用消息隊列,如果吞吐量大的情況下就要使用rabbitmq等消息中間件了