Python調用Harbor api刪除私有倉庫harbor鏡像


#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = 'hhh'
'''本腳本適用於清理釋放harbor鏡像倉庫空間;
    此腳本基於harbor 1.9.0版本編寫;
    harbor 1.7.0 以后版本可通過頁面垃圾回收;
    如不同版本api不同需自行更改各個函數中url部分。'''

import json
import heapq
import requests
from requests.auth import HTTPBasicAuth
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from tqdm import tqdm
from time import sleep, time
import traceback


class Harbor(object):
    def __init__(self, api_url, user, num, exclude):
        """
        初始化一些基本參數
        :param auth: login password authority management
        :param head: change user-agent
        :param url: harbor server api url
        :param project_exclude: Exclude project team
        :param num_limit: Limit the number of retained versions
        :param project_special: project dict id and repo total
        :param project_state: project dict name and id
        :param repo_state: repo dict name and tag total
        :param repo_dispose: Count the number of tag processing
        :param tag_state: tag dict repo_name and tag
        """
        self.auth = user
        self.head = {"user_agent": "Mozilla/5.0"}
        self.url = api_url
        self.project_exclude = exclude
        self.num_limit = int(num)
        self.project_special = {}
        self.project_state = {}
        self.repo_state = {}
        self.repo_dispose_count = 0
        self.tag_state = {}

    def setting(self):
        self.session = requests.Session()
        self.session.auth = self.auth
        retry = Retry(connect=3, backoff_factor=1)
        adapter = HTTPAdapter(max_retries=retry)
        self.session.mount('https://', adapter)
        self.session.keep_alive = False

    def list_project(self):
        try:
            r_project = self.session.get("{}/projects".format(self.url), headers=self.head)
            r_project.raise_for_status()
            # 將得到的文本轉換格式
            project_data = json.loads(r_project.text)
            for i in project_data:
                # 項目組名稱
                project_name = i.get('name')
                # 項目組id
                project_id = i.get('project_id')
                # 項目組倉庫
                project_repo = i.get('repo_count')
                # 利用一個字典將項目名稱與id對應起來
                self.project_state[project_name] = project_id
                # 由於請求限制,另外用一個字典,對應id於repo總數
                self.project_special[project_id] = project_repo
                print("\033[0;32m項目名稱:{}\t項目編號:{}\t項目下倉庫統計:{}\033[0m".format(project_name, project_id, project_repo))
            print("\033[0;36mproject:項目組對應id列表:{}\033[0m".format(self.project_state))
            print("\033[0;36mproject:項目id對應倉庫數:{}\033[0m".format(self.project_special))
        except:
            traceback.print_exc()
            raise

    def list_repo(self):
        try:
            for a in self.project_state.keys():
                # 排除部分項目組
                if a not in self.project_exclude:
                    id = self.project_state.get(a)
                    # print(id)
                    # 由於請求限制,得出需請求的次數,整除+1
                    number = self.project_special.get(id) // 100 + 1
                    for i in range(number):
                        page = i + 1
                        r_repo = self.session.get(
                            "{}/repositories?project_id={}&page={}&page_size=100".format(self.url, id, page),
                            headers=self.head)
                        # 將得到的文本結果轉換格式
                        repo_data = json.loads(r_repo.text)
                        for r in repo_data:
                            repo_id = r.get('id')
                            repo_name = r.get('name')
                            tag_count = r.get('tags_count')
                            # 利用字典將倉庫名稱與tag總量對應起來
                            self.repo_state[repo_name] = tag_count
            print("\033[0;31mrepo:排除部分項目組后,需過濾處理的倉庫總量為:{}\033[0m".format(len(self.repo_state)))

        except:
            traceback.print_exc()
            raise

    def list_tag(self):
        try:
            # n 為repo 倉庫名字
            for n in self.repo_state.keys():
                # 如果該倉庫下版本總數大於數量限制,繼續往下走
                if self.repo_state.get(n) > self.num_limit:
                    r_tag = self.session.get('{}/repositories/{}/tags'.format(self.url, n))
                    tag_data = json.loads(r_tag.text)
                    tag_dict = {}
                    for t in tag_data:
                        # 取出各個tag的名字
                        tag_name = t.get('name')
                        # 切分各個tag,取出日期時間部分
                        tag_time = int(tag_name.split('.')[-1])
                        # 將tag名稱與切割出來的時間部分對應起來
                        tag_dict[tag_time] = tag_name
                    tagtime_list = []
                    tagname_list = []
                    for h in tag_dict.keys():
                        tagtime_list.append(h)
                    # 取出時間最大值三個
                    max_limit = heapq.nlargest(self.num_limit, tagtime_list)
                    # 取反,將key不為這三個的value版本號找出來
                    for q in tag_dict.keys():
                        if q not in max_limit:
                            name = tag_dict.get(q)
                            tagname_list.append(name)
                    self.tag_state[n] = tagname_list
                    self.repo_dispose_count += len(tagname_list)
            print("\033[0;31mtag:本次過濾出需處理涉及倉庫共:{}個,涉及刪除鏡像版本共:{}個\033[0m".format(len(self.tag_state),
                                                                                self.repo_dispose_count))
        except:
            traceback.print_exc()
            raise

    def del_tag(self):
        try:
            delete_total = 0
            del_faild = []
            if self.repo_dispose_count == 0:
                print("\033[0;34mdel:本次無需刪除tag\033[0m")
            else:
                print("\033[0;34mdel:刪除tag階段耗時較長:請耐心等待\033[0m")
                pbar1 = tqdm(total=self.repo_dispose_count, unit='', unit_scale=True)
                # na 為repo 名稱
                for na in self.tag_state:
                    # ta為需刪除的tag版本號
                    for ta in self.tag_state[na]:
                        try:
                            r_del = self.session.delete('{}/repositories/{}/tags/{}'.format(self.url, na, ta),
                                                    headers=self.head,
                                                    auth=self.auth)
                            r_del.raise_for_status()
                            delete_total += 1
                            pbar1.update(1)
                        except:
                            print('del: {}倉庫下刪除版本號:{}失敗!!!'.format(na, ta))
                            del_faild.append(na + ':' + ta)
                sleep(3)
                pbar1.close()
                print("\033[0;34mdel:需刪除鏡像已完成刪除! 共刪除版本數量:{}個\033[0m".format(delete_total))
                print('刪除失敗共計:{},刪除失敗的為:{}'.format(len(del_faild), del_faild))
        except:
            traceback.print_exc()
            raise

    def volume_recycle(self):
        try:
            if self.repo_dispose_count == 0:
                print("\033[0;35mvolume:本次無需清理存儲\033[0m")
            else:
                # 定義一個立即執行垃圾清理的json
                da = {"schedule": {"cron": "Manual", "type": "Manual"}}
                print("\033[0;35mvolume:開始回收存儲空間!\033[0m")
                r_volume = self.session.post('{}/system/gc/schedule'.format(self.url), json=da)
                r_volume.raise_for_status()
                print("\033[0;35mvolue:回收存儲空間已完成!\033[0m")
        except:
            traceback.print_exc()
            raise


def main(api_url, login, num, exclude):
    start = time()
    try:
        # begin開始
        har = Harbor(api_url=api_url, user=login, num=num, exclude=exclude)
        # 配置
        har.setting()
        # 列出項目組
        har.list_project()
        # 列出repo倉庫
        har.list_repo()
        # 列出tag版本
        har.list_tag()
        # 刪除不保留版本
        har.del_tag()
        # 回收存儲
        har.volume_recycle()
        print("所有操作運行完成!")
        end = time()
        allTime = end - start
        print("運行結束共耗時:{:.2f}s".format(allTime))
    except:
        end = time()
        allTime = end - start
        # traceback.print_exc()
        print('清理出錯!')
        print("運行結束共耗時:{:.2f}s".format(allTime))


if __name__ == '__main__':
    # harbor api interface
    api_url = "https://images.xxx.net/api"
    # Login ,change username and password
    login = HTTPBasicAuth('admin', '123456')
    # 需要排除的項目組,自行根據情況更改,或為空
    exclude = ['k8s', 'basic', 'library']
    # 倉庫下版本過多,需保留的最近版本數量
    keep_num = 3
    # 啟動Start the engine
    main(api_url=api_url, login=login, num=keep_num, exclude=exclude)

1、url(可通過進入harbor首頁左下角api控制中心查看);
2、帶管理權限的harbor用戶名密碼;
3、需要排除的項目組project(無需排除設為空列表即可)
注意一點是,我們image 版本的tag形式為 x.x.x.20191115152501 (版本+時間),如鏡像版本標記不同需要自行修改一下list_tag 中spilit 切分部分

4、需要根據自己實際進行修改相關配置


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM