FTP 文件傳輸服務


昨晚心血來潮,嘗試用python寫了一個ftp文件傳輸服務,可以接收指令,從遠程ftp服務器同步指定目錄數據,最后沒用上,開源出來。

https://github.com/jadepeng/ftp_transfer_service.git

運行原理

  • 'task_server' 是一個web服務器,可以接收傳入任務,接收到任務后,將task寫入mysql
  • 啟動任務后,'task_server'會掃描ftp文件列表,寫入redis隊列
  • transfer_client 是傳輸執行程序,可以多點部署,該程序會讀取redis隊列,進行文件下載

使用

配置

修改 .env 文件, 配置mysql和redis地址

REDIS_SERVER=""
REDIS_PORT=6380
REDIS_PASSWORD=""
MYSQL_HOST=""
MYSQL_PORT=3306
MYSQL_PASSWORD=""
MYSQL_USER=""
MYSQL_DB=""

啟動服務

server 端

python3 task_server.py

傳輸端,可以部署多個

python3 transfer_client.py

接收任務

POST /task/

{
  "taskId": "9",
  "serverPath": "/weblog",
  "storagePath": "/data",
  "host": "ftpServer",
  "port": 21,
  "user": "user",
  "password": "password"
}

啟動傳輸

GET /task/{taskId}/start

查看進度

GET /task/{taskId}/progress

實現簡介

第一次用fastapi來寫web服務,這里記錄下有意思的地方。

配置

可以通過配置類實現app的配置參數,pydantic還可以加載env文件更新配置

setting.py

from pydantic import BaseSettings


class APISettings(BaseSettings):
    mysql_host: str = "127.0.0.1"
    mysql_port: int = 3306
    mysql_password: str
    mysql_user: str
    mysql_db: str
    redis_server: str = "127.0.0.1"
    redis_port: int = 6380
    redis_password: str

    max_wait_time_count: int = 10

    class Config:
        env_file = ".env"
        env_file_encoding = 'utf-8'

redis 隊列

通過list實現隊列,rpush,blpop

import redis

class RedisQueue(object):

    def __init__(self, name, namespace='queue', **redis_kwargs):
        self.__db= redis.Redis(**redis_kwargs)
        self.key = '%s:%s' %(namespace, name)

    def qsize(self):
        return self.__db.llen(self.key)  # 返回隊列里面list內元素的數量

    def put(self, item):
        self.__db.rpush(self.key, item)  # 添加新元素到隊列最右方

    def get_wait(self, timeout=None):
        item = self.__db.blpop(self.key, timeout=timeout)
        return item

    def get_nowait(self):
        item = self.__db.lpop(self.key)
        return item

redis BloomFilter

BloomFilter 可以用來去重

import mmh3
import redis


class BloomFilter(object):
    def __init__(self, bf_key, bit_size=2000000, hash_count=4, start_seed=41, **redis_kwargs):
        self.bit_size = bit_size
        self.hash_count = hash_count
        self.start_seed = start_seed
        self.client = redis.Redis(**redis_kwargs)
        self.bf_key = bf_key

    def add(self, data):
        bit_points = self._get_hash_points(data)
        for index in bit_points:
            self.client.setbit(self.bf_key, index, 1)

    def madd(self, m_data):
        if isinstance(m_data, list):
            for data in m_data:
                self.add(data)
        else:
            self.add(m_data)

    def exists(self, data):
        bit_points = self._get_hash_points(data)
        result = [
            self.client.getbit(self.bf_key, index) for index in bit_points
        ]
        return all(result)

    def mexists(self, m_data):
        result = {}
        if isinstance(m_data, list):
            for data in m_data:
                result[data] = self.exists(data)
        else:
            result[m_data] = self.exists[m_data]
        return result

    def _get_hash_points(self, data):
        return [
            mmh3.hash(data, index) % self.bit_size
            for index in range(self.start_seed, self.start_seed +
                               self.hash_count)
        ]

python的orm框架sqlalchemy

sqlalchemy 需要先定義ORM類

class TransferTask(Base):
    __tablename__ = 'transfer_task'

    taskId = Column(String(255), primary_key=True, index=True)
    serverPath = Column(String(255), nullable=False)
    storagePath = Column(String(255), nullable=False)
    host = Column(String(255), nullable=False)
    port = Column(Integer, nullable=False)
    user = Column(String(255), nullable=False)
    password = Column(String(255), nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

class TransferFailedFile(Base):
    __tablename__ = 'transfer_failed_file'
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    taskId = Column(String(255), index=True)
    filePath = Column(String(1024), nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

class TransferProgress(Base):
    __tablename__ = 'transfer_task_progress'

    taskId = Column(String(255), primary_key=True, index=True)
    total = Column(Integer, nullable=False)
    status = Column(Integer, nullable=False)
    finished = Column(Integer, nullable=False)
    failed = Column(Integer, nullable=False)
    time = Column(DateTime, nullable=False, default=datetime.now)

if __name__ == '__main__':
    settings = APISettings()
    db = Database(settings.mysql_host, settings.mysql_port, settings.mysql_user, settings.mysql_password,
                  settings.mysql_db)
    Base.metadata.create_all(db.engine)

使用了sqlalchemy CRUD就比較方便了, 可以通過query,filter來查詢和過濾

 def get_or_create_progress(self, task: TransferTask):
        db = self.database.get_session()
        dbitem = db.query(TransferProgress).filter(TransferProgress.taskId == task.taskId).first()
        if not dbitem:
            dbitem = TransferProgress()
            dbitem.taskId = task.taskId
            dbitem.total = 0
            dbitem.status = TaskStatus.SCANNING.value
            dbitem.finished = 0
            dbitem.failed = 0
            db.add(dbitem)
            db.commit()
        return dbitem

這里需要注意的是,session需要close,不然session過多會報錯,可以封裝一個get_session,利用yield來自動釋放

見database.py

    def get_db(self):
        db = self.SessionLocal()
        try:
            yield db
        finally:
            db.close()

    def get_session(self):
        return next(self.get_db())

python ftp操作

python有個ftplib,可以用來操作ftp,這里簡單封裝一個client類, 實現listfiles和下載文件


import ftplib
import os
from datetime import datetime
import ntpath


class FtpClient:

    def __init__(self, host: str, port: int, user: str, password: str):
        self.host = host
        self.port = port
        self.user = user
        self.password = password
        self.connect()

    def connect(self):
        self.ftp = ftplib.FTP()
        self.ftp.connect(host=self.host, port=self.port)
        self.ftp.login(self.user, self.password)
        self.ftp.encoding = "utf-8"

    def list_files(self, dir):
        self.ftp.cwd(dir)
        for file_data in self.ftp.mlsd():
            file_name, meta = file_data
            file_type = meta.get("type")
            if file_type == "file":
                try:
                    self.ftp.voidcmd("TYPE I")
                    file_size = self.ftp.size(file_name)
                    yield f"{dir}/{file_name}", file_size
                except Exception as e:
                    print(e)
            else:
                yield from self.list_files(dir + "/" + file_name)

    def download_file(self, file_name:str, local_file_name:str):
        try:
            self.ftp.retrbinary('RETR %s' % file_name, open(local_file_name, 'wb').write)
        except ftplib.error_perm:
            print('ERROR: cannot read file "%s"' % file_name)
            os.unlink(local_file_name)

下載程序

作為redis mq的消費者,要考慮的是下載失敗了如何處理,異常退出如何處理?進度如何更新?

針對異常退出,這里用一個簡單的方案,獲取mq消息后,先將item寫入到本地文件,這樣如果client程序異常退出,下次進來還能繼續
針對下載失敗,這里失敗后先重新放入隊列,retryCount+1,如果超過最大重試次數,則寫到錯誤記錄。
進度更新,則依靠update+1執行。

def transfer_task_item(ftp, local_path, queque, task, task_item):
    try:
        local_file_name = local_path + task_item['fileName']
        print("transfer %s to %s" % (task_item['fileName'], local_file_name))

        # 文件已存在
        if os.path.exists(local_file_name):
            # 比較大小
            size = os.path.getsize(local_file_name)
            if size == task_item["fileSize"]:
                db_service.update_finished(task.taskId, 1)
                return

        dir = os.path.abspath(os.path.dirname(local_file_name))
        os.makedirs(dir, exist_ok=True)
        ftp.download_file(task_item['fileName'], local_file_name)
        # 更新進度
        db_service.update_finished(task.taskId, 1)
    except Exception as e:
        print(e)
        if task_item['retryCount'] < 3:
            task_item['retryCount'] = task_item['retryCount'] + 1
            queque.put(json.dumps(task_item))
        else:
            print(task_item['fileName'] + " transfer failed with max_retry_count")
            db_service.add_failed_file(task.taskId, task_item['fileName'])
    finally:
        remove_lock()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM