這一篇文章分析一下docker push的過程;docker push是將本地的鏡像上傳到registry service的過程;
根據前幾篇文章,可以知道客戶端的命令是在api/client/push.go中,CmdPush()函數:
基本思路就是將通過解析cmd.Arg(0)參數,提取去要push的鏡像的repository 和 tag,通過registry 和 repository獲得repostoryInfo;如果需要安全驗證,那么還要設置一下authConfig;接着通過POST:/images/xxxx/push? 請求調用server端的postImagesPush()函數;(在api/server/image.go)中,主要來分析一下這個函數:
func (s *Server) postImagesPush(version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if vars == nil {
return fmt.Errorf("Missing parameter")
}
metaHeaders := map[string][]string{}
for k, v := range r.Header {
if strings.HasPrefix(k, " ") {
metaHeaders[k] = v
}
}
if err := parseForm(r); err != nil {
return err
}
authConfig := &cliconfig.AuthConfig{}
authEncoded := r.Header.Get("X-Registry-Auth")
if authEncoded != "" {
// the new format is to handle the authConfig as a header
authJSON := base64.NewDecoder(base64.URLEncoding, strings.NewReader(authEncoded))
if err := json.NewDecoder(authJSON).Decode(authConfig); err != nil {
// to increase compatibility to existing api it is defaulting to be empty
authConfig = &cliconfig.AuthConfig{}
}
} else {
// the old format is supported for compatibility if there was no authConfig header
if err := json.NewDecoder(r.Body).Decode(authConfig); err != nil {
return fmt.Errorf("Bad parameters and missing X-Registry-Auth: %v", err)
}
}
name := vars["name"]
output := ioutils.NewWriteFlusher(w)
imagePushConfig := &graph.ImagePushConfig{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
Tag: r.Form.Get("tag"),
OutStream: output,
}
w.Header().Set("Content-Type", "application/json")
if err := s.daemon.Repositories().Push(name, imagePushConfig); err != nil {
if !output.Flushed() {
return err
}
sf := streamformatter.NewJSONStreamFormatter()
output.Write(sf.FormatError(err))
}
return nil
}
最主要的流程是前面是封裝好 imagePushConfig 這個數據結構,imagePushConfig主要包括http request中的header信息、鏡像tag、認證信息以及一個回寫客戶端的output Writer(),然后調用s.daemon.Repositories().Push(name, imagePushConfig) 來進行鏡像文件的上傳;之前的文章中介紹過,deamon的Repositories()函數返回的其實是TagStore結構(/graph/tags.go 文件中);直接去看一下TagStore()的Push函數:
// Push initiates a push operation on the repository named localName.
func (s *TagStore) Push(localName string, imagePushConfig *ImagePushConfig) error {
// FIXME: Allow to interrupt current push when new push of same image is done.
var sf = streamformatter.NewJSONStreamFormatter()
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := s.registryService.ResolveRepository(localName)
if err != nil {
return err
}
endpoints, err := s.registryService.LookupPushEndpoints(repoInfo.CanonicalName)
if err != nil {
return err
}
reposLen := 1
if imagePushConfig.Tag == "" {
reposLen = len(s.Repositories[repoInfo.LocalName])
}
imagePushConfig.OutStream.Write(sf.FormatStatus("", "The push refers to a repository [%s] (len: %d)", repoInfo. CanonicalName, reposLen))
// If it fails, try to get the repository
localRepo, exists := s.Repositories[repoInfo.LocalName]
if !exists {
return fmt.Errorf("Repository does not exist: %s", repoInfo.LocalName)
}
var lastErr error
for _, endpoint := range endpoints {
logrus.Debugf("Trying to push %s to %s %s", repoInfo.CanonicalName, endpoint.URL, endpoint.Version)
pusher, err := s.NewPusher(endpoint, localRepo, repoInfo, imagePushConfig, sf)
if err != nil {
lastErr = err
continue
}
if fallback, err := pusher.Push(); err != nil {
if fallback {
lastErr = err
continue
}
logrus.Debugf("Not continuing with error: %v", err)
return err
}
s.eventsService.Log("push", repoInfo.LocalName, "")
return nil
}
if lastErr == nil {
lastErr = fmt.Errorf("no endpoints found for %s", repoInfo.CanonicalName)
}
return lastErr
}
首先通過localName獲取repositoryInfo,然后通過repositoryInfo獲取endpoints,然后針對每一個endpoint,實例化一個pusher,通過pusher.Push()來實現鏡像的上傳,程序的流程和結構很好理解;看一下NewPusher()的代碼:
func (s *TagStore) NewPusher(endpoint registry.APIEndpoint, localRepo Repository, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig, sf *streamformatter.StreamFormatter) (Pusher, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Pusher{
TagStore: s,
endpoint: endpoint,
localRepo: localRepo,
repoInfo: repoInfo,
config: imagePushConfig,
sf: sf,
}, nil
case registry.APIVersion1:
return &v1Pusher{
TagStore: s,
endpoint: endpoint,
localRepo: localRepo,
repoInfo: repoInfo,
config: imagePushConfig,
sf: sf,
}, nil
}
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
主要是根據endpoint中version的不同,來決定實例化哪個pushser;暫且以v2 pusher()為例來進行分析;
type v2Pusher struct {
*TagStore
endpoint registry.APIEndpoint
localRepo Repository
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
sf *streamformatter.StreamFormatter
repo distribution.Repository
}
這個是v2Pusher的定義;接下來是v2Pusher的Push(/graph/push_v2.go文件)函數:
func (p *v2Pusher) Push() (fallback bool, err error) {
p.repo, err = NewV2Repository(p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig)
if err != nil {
logrus.Debugf("Error getting v2 registry: %v", err)
return true, err
}
return false, p.pushV2Repository(p.config.Tag)
}
首先實例化出來NewV2Repository實例(/graph/registry.go文件中)復制給pusher的repo實例,然后調用pusher的pushV2Repository函數進行鏡像的上傳;新的pusher.repo實例,也就是NewV2Repository返回的其實是一個新的repository(vendor/src/github.com/docker/distribution/registry/client/repository.go文件中),它主要新建了一個提供了超時設定、權限驗證和對遠程endpoint的api version進行驗證的http transport,負責鏡像的上傳;接着看一下最重要的pusher的pushV2Repository函數;
func (p *v2Pusher) pushV2Repository(tag string) error {
localName := p.repoInfo.LocalName
if _, err := p.poolAdd("push", localName); err != nil {
return err
}
defer p.poolRemove("push", localName)
tags, err := p.getImageTags(tag)
if err != nil {
return fmt.Errorf("error getting tags for %s: %s", localName, err)
}
if len(tags) == 0 {
return fmt.Errorf("no tags to push for %s", localName)
}
for _, tag := range tags {
if err := p.pushV2Tag(tag); err != nil {
return err
}
}
return nil
}
調用TagStore的poolAdd函數,將要下載的的名字加入的TagStore的pushingPool里面,保證同一時刻只有一個同名鏡像在下載;通過getImageTags(tag)獲取對應tag的要下載的鏡像,實際上使用pusher的localRepo中查找對應tag的鏡像,localRepo 是Repository類型,更確切說是map[string]string類型,key為tag,value為layerID;如果沒有傳過來的tag為空的話,則返回所有的鏡像;接着分析p.pushV2Tag()函數
func (p *v2Pusher) pushV2Tag(tag string) error {
logrus.Debugf("Pushing repository: %s:%s", p.repo.Name(), tag)
layerID, exists := p.localRepo[tag]
if !exists {
return fmt.Errorf("tag does not exist: %s", tag)
}
layersSeen := make(map[string]bool)
layer, err := p.graph.Get(layerID)
if err != nil {
return err
}
m := &manifest.Manifest{
Versioned: manifest.Versioned{
SchemaVersion: 1,
},
Name: p.repo.Name(),
Tag: tag,
Architecture: layer.Architecture,
FSLayers: []manifest.FSLayer{},
History: []manifest.History{},
}
var metadata runconfig.Config
if layer != nil && layer.Config != nil {
metadata = *layer.Config
}
out := p.config.OutStream
for ; layer != nil; layer, err = p.graph.GetParent(layer) {
if err != nil {
return err
}
if layersSeen[layer.ID] {
break
}
logrus.Debugf("Pushing layer: %s", layer.ID)
if layer.Config != nil && metadata.Image != layer.ID {
if err := runconfig.Merge(&metadata, layer.Config); err != nil {
return err
}
}
jsonData, err := p.graph.RawJSON(layer.ID)
if err != nil {
return fmt.Errorf("cannot retrieve the path for %s: %s", layer.ID, err)
}
var exists bool
dgst, err := p.graph.GetDigest(layer.ID)
switch err {
case nil:
_, err := p.repo.Blobs(context.Background()).Stat(context.Background(), dgst)
switch err {
case nil:
exists = true
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image already exists", nil))
case distribution.ErrBlobUnknown:
// nop
default:
out.Write(p.sf.FormatProgress(stringid.TruncateID(layer.ID), "Image push failed", nil))
return err
}
case ErrDigestNotSet:
// nop
case digest.ErrDigestInvalidFormat, digest.ErrDigestUnsupported:
return fmt.Errorf("error getting image checksum: %v", err)
}
// if digest was empty or not saved, or if blob does not exist on the remote repository,
// then fetch it.
if !exists {
if pushDigest, err := p.pushV2Image(p.repo.Blobs(context.Background()), layer); err != nil {
return err
} else if pushDigest != dgst {
// Cache new checksum
if err := p.graph.SetDigest(layer.ID, pushDigest); err != nil {
return err
}
dgst = pushDigest
}
}
m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})
m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})
layersSeen[layer.ID] = true
}
logrus.Infof("Signed manifest for %s:%s using daemon's key: %s", p.repo.Name(), tag, p.trustKey.KeyID())
signed, err := manifest.Sign(m, p.trustKey)
if err != nil {
return err
}
manifestDigest, manifestSize, err := digestFromManifest(signed, p.repo.Name())
if err != nil {
return err
}
if manifestDigest != "" {
out.Write(p.sf.FormatStatus("", "%s: digest: %s size: %d", tag, manifestDigest, manifestSize))
}
manSvc, err := p.repo.Manifests(context.Background())
if err != nil {
return err
}
return manSvc.Put(signed)
}
pushV2Tag()函數比較長,從上到下一點點分析下:首先根據tag從localRepo中找到LayerID,然后通過graph.Get()通過layerID返回layer,實際是一個image結構;layerSeen是一個map,表示的哪些已經被push過了,以免重復;接下來實例化一個manifest結構,manifest主要記錄鏡像的所有layer信息(包括鏡像的校驗和以及鏡像jsondata中的信息);
接着就是一個for 循環,從一個layer中不斷的取得這個layer的父鏡像;然后上傳;上傳鏡像之前,先通過graph獲取這個鏡像的jsondata和digest校驗和,通過 p.repo.Blobs(context.Background()).Stat(context.Background(), dgst) 判斷校驗和在將要上傳的地點是否已經存在,存在的話則打印出相關提示信息,不存在的情況則會調用 p.pushV2Image() 去上傳鏡像;上傳完鏡像后會生成新的digest,接着調用
m.FSLayers = append(m.FSLayers, manifest.FSLayer{BlobSum: dgst})
m.History = append(m.History, manifest.History{V1Compatibility: string(jsonData)})
layersSeen[layer.ID] = true
這三行代碼,將layer信息放到manifest文件中,並且設置layerSeen標明這個layer已經上傳過了;
當所有的鏡像上傳完畢后,接着對manifest文件進行簽名、生成manifest文件的digest信息、然后通過repo將manifest信息上傳;
以上就是docker push的大致步驟