当前位置: 首页 > news >正文

用python做的windows和linx文件夹同步。解决自动同步、加快传输大量小文件的速度、更丰富的文件上传过滤设置。...

现在工具不好用,用的pycharm自动同步,但对于git拉下来的新文件不能自动上传到linux,只有自己编辑过或者手动ctrl + s的文件才会自动同步。导致为了不遗漏文件,经常需要全量上传,速度非常慢。

由于经常需要在windows的pycharm上直接使用linux解释器,要快速测试,频繁在本机和linux用git push pull不方便,测试环境是 用的git,但开发时候还是直接映射文件夹同步比使用git更方便。

采用了连接池的方式,比单线程单linux链接,一个一个的上传体积很小的碎片时候,文件上传速度提高了数十倍。

 

单linux连接上传。

"""
自动同步文件夹到linux机器
"""
import json
import os
import queue
import re
import time
from collections import OrderedDict
from pathlib import Path
import paramiko
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler


class LinuxSynchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/'), only_upload_within_the_last_modify_time=7 * 24 * 60 * 60, cycle_interval=10, ):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以这些结尾的文件
        :param file_volume_limit: 最大文件容量能够限制,如果超过此大小,则该文件不上传
        :param path_pattern_exluded_tuple: 更强大的文件排除功能,比光排除以什么后缀结尾更强大灵活
        :param only_upload_within_the_last_modify_time: 只上传离当前时间最晚修改时间以后的文件
        :param cycle_interval: 每隔多少秒扫描一次需要上传的文件。
        """
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = remote_dir
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = only_upload_within_the_last_modify_time
        self._cycle_interval = cycle_interval
        self._file_volume_limit = file_volume_limit
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self.build_connect()

    # noinspection PyAttributeOutsideInit
    def build_connect(self):
        self.logger.warning('建立linux连接')
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        self.sftp = paramiko.SFTPClient.from_transport(t)

        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.ssh = ssh

    # @decorators.tomorrow_threads(1)
    def ftp_upload(self, file: str):
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))
        for _ in range(10):
            try:
                time_start = time.time()
                self.sftp.put(file, file_remote)
                self.logger.debug(f'{file_remote} 上传成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上传时间是 {round(time.time() - time_start, 2)}')
                break
            except FileNotFoundError:
                cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
                self.logger.info(cmd)
                tdin, stdout, stderr = self.ssh.exec_command(cmd)
                stderr_bytes = stderr.read()
                # self.logger.debug(stderr_bytes)
                if stderr_bytes != b'':
                    self.logger.debug(stderr_bytes)
            except OSError as e:
                self.logger.exception(e)
                pass
                self.build_connect()     # OSError: Socket is closed

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                        self.filename__st_mtime_map[file_full_name] = file_st_mtime
                        total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        self.logger.warning(f'需要上传的所有文件数量是 {len(self.filename__filesize_map)} ,总大小是 {round(total_volume / 1024, 2)} kb ,文件分别是 {json.dumps(self.filename__filesize_map, indent=4)}')

    @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)()

    def _start_upload_files(self):
        with decorators.TimerContextManager():
            self.find_all_files_meet_the_conditions()
            for file in self.filename__filesize_map:
                self.ftp_upload(file)
            self.logger.warn('完成')

 

 

采用了连接池 加多线程上传

 

"""
自动同步文件夹到linux机器
这个更犀利,采用了连接池 加线程池,上传大量碎片文件的速度大幅提升。
"""
import hashlib
import json
import os
from threading import Thread
import queue
import re
import shutil
import filecmp
import time
from collections import OrderedDict
from pathlib import Path
from typing import Union
import paramiko
from paramiko import SSHException
from app.utils_ydf import decorators, time_util, LoggerMixinDefaultWithFileHandler, nb_print, BoundedThreadPoolExecutor


class LocalCopier(LoggerMixinDefaultWithFileHandler):
    """
    本地的两个文件夹之间的同步
    """

    def __init__(self, local_dir, remote_dir, *args, **kwargs):
        self._local_dir = str(local_dir).replace('\\', '/')
        self._remote_dir = str(remote_dir).replace('\\', '/')
        self.logger_extra_suffix = '本地windows间复制'

    def upload(self, file: str):
        file_remote = file.replace(self._local_dir, self._remote_dir)
        if not Path(file_remote).parent.exists():
            os.makedirs(str(Path(file_remote).parent))
        # if self.get_file_md5(Path(file).open('rb')) != self.get_file_md5(Path(file_remote).open('rb')) :
        if not Path(file_remote).exists() or not filecmp.cmp(file, file_remote):
            shutil.copyfile(file, file_remote)
            self.logger.info(f'从 {file} 复制成功到{file_remote} ,大小是 {round(os.path.getsize(file) / 1024)} kb')
        else:
            self.logger.debug(f'{file} 不复制到 {file_remote} 没有变化。')

    @staticmethod
    def get_file_md5(file):
        m = hashlib.md5()
        while True:
            # 如果不用二进制打开文件,则需要先编码
            # data = f.read(1024).encode('utf-8')
            data = file.read(1024)  # 将文件分块读取
            if not data:
                break
            m.update(data)
        return m.hexdigest()


@decorators.flyweight
class LinuxConnectionPool(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password):  # 对相同的链接参数做了享元模式保存连接池。
        self.logger_extra_suffix = host
        self.logger.warning(f'初始化linux连接池{host}')
        self._host = host
        self._port = port
        self._username = username
        self._password = password
        self.queue_sftp_free = queue.Queue(100)
        self.queue_ssh_free = queue.Queue(100)
        self.build_connect()

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=0)
    def build_sftp(self):
        self.logger.warning(f'建立linux sftp连接中。。。')
        t_start = time.time()
        # noinspection PyTypeChecker
        t = paramiko.Transport((self._host, self._port))
        t.connect(username=self._username, password=self._password)
        sftp = paramiko.SFTPClient.from_transport(t)
        self.queue_sftp_free.put(sftp)
        self.logger.warning(f'建立linux sftp连接耗时 {round(time.time() - t_start, 2)}')

    @decorators.keep_circulating(5, exit_if_function_run_sucsess=True, is_display_detail_exception=1)
    def bulid_ssh(self):
        self.logger.warning(f'建立linux ssh连接中。。。。')
        t_start = time.time()
        ssh = paramiko.SSHClient()
        ssh.load_system_host_keys()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        ssh.connect(self._host, port=self._port, username=self._username, password=self._password, compress=True)
        self.queue_ssh_free.put(ssh)
        self.logger.warning(f'建立linux ssh连接耗时 {round(time.time() - t_start, 2)}')

    def build_connect(self):
        # decorators.tomorrow_threads(10)(self._build_sftp)()
        # decorators.tomorrow_threads(10)(self.__class__._bulid_ssh)(self)
        def _inner():
            executor = BoundedThreadPoolExecutor(100)
            for _ in range(10):
                time.sleep(0.2)
                executor.submit(self.build_sftp)
            for _ in range(3):
                time.sleep(0.5)
                executor.submit(self.bulid_ssh)

        Thread(target=_inner).start()

    def borrow_sftp(self):
        return self.queue_sftp_free.get()

    def borrow_ssh(self):
        return self.queue_ssh_free.get()

    def back_sftp(self, sftp):
        self.queue_sftp_free.put(sftp)

    def back_ssh(self, ssh):
        self.queue_ssh_free.put(ssh)


class LinuxRemoteUploader(LocalCopier):
    """
    windows同步到linux。
    """

    def __init__(self, local_dir, remote_dir, host, port, username, password):
        super().__init__(local_dir, remote_dir)
        self.logger_extra_suffix = host
        self.linux_conn_pool = LinuxConnectionPool(host, port, username, password)

    def _do_mkdir_operation(self, file_remote):
        cmd = 'mkdir -p ' + str(Path(file_remote).parent).replace('\\', '/')
        self.logger.info(cmd)
        ssh = self.linux_conn_pool.borrow_ssh()
        try:
            tdin, stdout, stderr = ssh.exec_command(cmd)
        except SSHException:
            self.linux_conn_pool.bulid_ssh()
        except Exception as e:
            self.logger.exception(e)
        else:
            stderr_bytes = stderr.read()
            # self.logger.debug(stderr_bytes)
            if stderr_bytes != b'':
                self.logger.debug(stderr_bytes)
            self.linux_conn_pool.back_ssh(ssh)

    @decorators.tomorrow_threads(19)
    def upload(self, file: str):
        self.logger.debug(f'sftp空闲链接数量  {self.linux_conn_pool.queue_sftp_free.qsize()},  ssh空闲链接数量 {self.linux_conn_pool.queue_ssh_free.qsize()}')
        # file = file.replace('\\', '/')
        pattern_str = self._local_dir
        file_remote = file.replace(pattern_str, self._remote_dir)
        # self.logger.debug((file, file_remote))

        for _ in range(10):
            sftp = self.linux_conn_pool.borrow_sftp()
            try:
                time_start = time.time()
                sftp.put(file, file_remote)
                self.logger.info(f'{file_remote} 上传成功,大小是 {round(os.path.getsize(file) / 1024)} kb,上传时间是 {round(time.time() - time_start, 2)}')
                self.linux_conn_pool.back_sftp(sftp)
                # self.linux_conn_pool.logger.debug((self.linux_conn_pool.queue_sftp_free.qsize(),self.linux_conn_pool.queue_ssh_free.qsize()))
                break
            except FileNotFoundError:
                self._do_mkdir_operation(file_remote)
                self.linux_conn_pool.back_sftp(sftp)
            except (OSError, SSHException) as e:
                self.logger.exception(e)
                self.linux_conn_pool.build_sftp()  # OSError: Socket is closed


class Synchronizer(LoggerMixinDefaultWithFileHandler):
    def __init__(self, host, port, username, password, local_dir, remote_dir, file_suffix_tuple_exluded=('.pyc', '.log', '.gz'), file_volume_limit=1000 * 1000,
                 path_pattern_exluded_tuple=('/.git/', '/.idea/', 'cnbooking_all.json'), only_upload_within_the_last_modify_time='7 * 24 * 60 * 60', cycle_interval=2, just_windows_copy=False):
        """

        :param host:
        :param port:
        :param username:
        :param password:
        :param local_dir:
        :param remote_dir:
        :param file_suffix_tuple_exluded: 排除以这些结尾的文件。
        :param file_volume_limit: 最大文件容量能够限制,如果超过此大小,则该文件不上传
        :param path_pattern_exluded_tuple: 更强大的文件排除功能,比光排除以什么后缀结尾更强大灵活,使用的是python正则表达式。
        :param only_upload_within_the_last_modify_time: 只上传离当前时间最晚修改时间以后的文件。
        :param cycle_interval: 每隔多少秒扫描一次需要上传的文件。
        :param just_windows_copy: 执行windows不同文件夹之间的复制,不上传linux。
        """
        self.logger_extra_suffix = host if not just_windows_copy else '本地'
        self._local_dir = str(local_dir).replace('\\', '/')
        self._file_suffix_tuple_exluded = file_suffix_tuple_exluded
        self._path_pattern_exluded_tuple = path_pattern_exluded_tuple
        self._only_upload_within_the_last_modify_time = self._compute_result(only_upload_within_the_last_modify_time)
        self._cycle_interval = cycle_interval
        self._file_volume_limit = self._compute_result(file_volume_limit)
        self.filename__filesize_map = dict()
        self.filename__st_mtime_map = dict()
        self._just_windows_copy = just_windows_copy
        self.uploader = LinuxRemoteUploader(local_dir, remote_dir, host, port, username, password) if not just_windows_copy else LocalCopier(local_dir, remote_dir, host, port, username, password)

    @staticmethod
    def _compute_result(sth: Union[str, int]):
        return sth if isinstance(sth, int) else eval(sth)

    def _judge_need_filter_a_file(self, filename: str):
        ext = filename.split('.')[-1]
        if '.' + ext in self._file_suffix_tuple_exluded:
            return True
        for path_pattern_exluded in self._path_pattern_exluded_tuple:
            if re.search(path_pattern_exluded, filename):
                return True
        return False

    def find_all_files_meet_the_conditions(self):
        t_start = time.time()
        total_volume = 0
        self.filename__filesize_map.clear()
        for parent, dirnames, filenames in os.walk(self._local_dir):
            for filename in filenames:
                file_full_name = os.path.join(parent, filename).replace('\\', '/')
                if not self._judge_need_filter_a_file(file_full_name):
                    # self.logger.debug(os.stat(file_full_name).st_mtime)
                    file_st_mtime = os.stat(file_full_name).st_mtime
                    volume = os.path.getsize(file_full_name)
                    if time.time() - file_st_mtime < self._only_upload_within_the_last_modify_time and volume < self._file_volume_limit and (file_full_name
                                                                                                                                             not in self.filename__st_mtime_map or time.time() - file_st_mtime < 10 * 60):
                        if self.filename__st_mtime_map.get(file_full_name, None) != file_st_mtime:
                            self.filename__filesize_map[file_full_name] = {'volume': volume, 'last_modify_time': time_util.DatetimeConverter(file_st_mtime).datetime_str}
                            self.filename__st_mtime_map[file_full_name] = file_st_mtime
                            total_volume += volume
        filename__filesize_map_ordered_by_lsat_modify_time = OrderedDict()
        for k, v in sorted(self.filename__filesize_map.items(), key=lambda item: item[1]['last_modify_time']):
            filename__filesize_map_ordered_by_lsat_modify_time[k] = v
        self.filename__filesize_map = filename__filesize_map_ordered_by_lsat_modify_time
        if len(self.filename__filesize_map) > 0:
            self.logger.warning(f'需要{"复制"  if self._just_windows_copy else "上传"} 的所有文件数量是 {len(self.filename__filesize_map)} ,总大小是 {round(total_volume / 1024, 2)} kb ,'
                                f'查找文件耗时 {round(time.time() - t_start, 2)} 秒,文件分别是 {json.dumps(self.filename__filesize_map, indent=4)}')

    # @decorators.tomorrow_threads(10)
    def start_upload_files(self):
        Thread(target=decorators.keep_circulating(self._cycle_interval)(self._start_upload_files)).start()

    def _start_upload_files(self):
        self.find_all_files_meet_the_conditions()
        for file in self.filename__filesize_map:
            self.uploader.upload(file)


# noinspection PyPep8
if __name__ == '__main__':
    """
    配置里面的内容格式如下,支持同步多个文件夹映射。
    [
      {
        "host": "112.90.xx.xx",
        "port": 10005,
        "username": "root",
        "password": "@0^Lc97MewI3i7xxxxxx",
        "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
        "remote_dir": "/home/ydf/hotelf15",
        "file_suffix_tuple_exluded": [
          ".pyc",
          ".log",
          ".gz"
        ],
        "path_pattern_exluded_tuple": [
          "/.git/",
          "/.idea/",
          "cnbooking_cn_all.json"
        ],
        "only_upload_within_the_last_modify_time": "365 * 24 * 3600",
        "file_volume_limit": "2 * 1000 * 1000",
        "cycle_interval": 10
      }
    ]
    """

    for config_item in json.load(Path('/windows_to_linux_syn_config.json').open()):
        nb_print(json.dumps(config_item))
        Synchronizer(**config_item).start_upload_files()

    # sc create PythonApp6 binPath= "D:\Users\ydf\Desktop\oschina\coding\hotel_fares\dist\windows_to_linux_syn2\windows_to_linux_syn2.exe"
    # pyinstaller --distpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --workpath=D:\Users\ydf\Desktop\oschina\pyinstallerdir --specpath=D:\Users\ydf\Desktop\oschina\specify_pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 可以使用pyinstaller打包这个文件。先添加PYTHONPATH变量,在另外的文件夹执行这个命令。
    # pyinstaller --icon="D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\Users\ydf\Desktop\oschina\coding\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py

    # cd ..
    # set PYTHONPATH=D:\coding2\hotel_fares
    # pyinstaller -F --icon="D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn.ico" D:\coding2\hotel_fares\app\utils_ydf\windows_to_linux_syn3.py
    # 测试更新。。。。。。.

 

 

 

 

 

 

 

 配置里面的内容如下。

[
  {
    "host": "112.xx.89.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\hotel_fares",
    "remote_dir": "/home/ydf/hotelf18",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  },
  {
    "host": "112.90.xx.16",
    "port": 10033,
    "username": "root",
    "password": "xxxx",
    "local_dir": "D:\\Users\\ydf\\Desktop\\oschina\\coding\\movie_data",
    "remote_dir": "/home/ydf/movie_data2",
    "file_suffix_tuple_exluded": [
      ".pyc",
      ".log",
      ".gz"
    ],
    "path_pattern_exluded_tuple": [
      "/.git/",
      "/.idea/",
      "cnbooking_cn_all.json"
    ],
    "only_upload_within_the_last_modify_time": "30 * 24 * 3600",
    "file_volume_limit": "2 * 1000 * 1000",
    "cycle_interval": 1
  }
]

 

 

 

 

 

 

 

 

第一次运行是对指定最晚修改间之内的文件进行全量上传,之后是每隔2秒(由json文件动态配置)检查一次,将最近 10分钟之内变化的文件,上传到linux。

转载于:https://www.cnblogs.com/ydf0509/p/10924130.html

相关文章:

  • 隐私政策--Privacy Policy
  • ModelViewSet的继承关系
  • Day24 正则表达式 正则函数 正则命名分组 正则表达式修饰符号
  • zookeeper原理浅析(二)
  • 自己出题
  • 关于字符串中含有中英文逗号,分号获取,判断并转换
  • 2019最新Python学习教程_Python学习路线:分支结构
  • codeforces 827B. High Load
  • mac 下iterm2终端安装rz sz 命令方法
  • 第二阶段团队冲刺2
  • CGContextRefCGMutablePathRefUIBezierPath简单学习
  • 实现底部top点击到顶部
  • [Operating System] {ud923} P4L4: Datacenter Technologies
  • 第十四周学习进度
  • 数据库实践
  • 2018一半小结一波
  • iOS 系统授权开发
  • Javascript 原型链
  • python学习笔记-类对象的信息
  • Spring声明式事务管理之一:五大属性分析
  • underscore源码剖析之整体架构
  • VuePress 静态网站生成
  • 阿里云应用高可用服务公测发布
  • 成为一名优秀的Developer的书单
  • 从重复到重用
  • 工作踩坑系列——https访问遇到“已阻止载入混合活动内容”
  • 坑!为什么View.startAnimation不起作用?
  • 前端技术周刊 2018-12-10:前端自动化测试
  • 如何抓住下一波零售风口?看RPA玩转零售自动化
  • 网络应用优化——时延与带宽
  • 我的面试准备过程--容器(更新中)
  • 线性表及其算法(java实现)
  • 蚂蚁金服CTO程立:真正的技术革命才刚刚开始
  • ​​​​​​​​​​​​​​Γ函数
  • ​ubuntu下安装kvm虚拟机
  • # 日期待t_最值得等的SUV奥迪Q9:空间比MPV还大,或搭4.0T,香
  • #我与Java虚拟机的故事#连载04:一本让自己没面子的书
  • $.extend({},旧的,新的);合并对象,后面的覆盖前面的
  • (2)(2.4) TerraRanger Tower/Tower EVO(360度)
  • (delphi11最新学习资料) Object Pascal 学习笔记---第5章第5节(delphi中的指针)
  • (TipsTricks)用客户端模板精简JavaScript代码
  • (二)构建dubbo分布式平台-平台功能导图
  • (附源码)ssm基于jsp的在线点餐系统 毕业设计 111016
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (六)激光线扫描-三维重建
  • (免费领源码)python#django#mysql公交线路查询系统85021- 计算机毕业设计项目选题推荐
  • (转)Sql Server 保留几位小数的两种做法
  • (转)重识new
  • *2 echo、printf、mkdir命令的应用
  • .gitignore文件—git忽略文件
  • .Net Core 中间件验签
  • .net 获取url的方法
  • .NET 使用配置文件
  • .NET/C# 项目如何优雅地设置条件编译符号?
  • .net2005怎么读string形的xml,不是xml文件。