当前位置: 首页 > news >正文

Python实现跨平台运维小神器

(项目最新进展请见github)

这阵子一直在学python,碰巧最近想把线上服务器环境做一些规范化/统一化,于是便萌生了用python写一个小工具的冲动。就功能方面来说,基本上是在“重复造轮子”吧,但是当我用这小工具完成了30多台服务器从系统层面到应用层面的一些规范化工作之后,觉得效果还不算那么low(高手可忽略这句话~~),这才敢拿出来跟小伙伴们分享一下。

(注:笔者所用为python版本为3.5,其他版本未经测试~~)

现在主要功能包括:

  1. 可批量执行远程命令,上传下载文件

  2. 支持多线程并发执行(对于某些耗时的命令或上传文件,可大大减少等待时间)

  3. 严格模式(批量执行中若某一台server执行错误则退出)和非严格模式

  4. 上传下载文件实现了类似rsync的机制

  5. 完善的命令行提示

  6. 跨平台,Linux和Windows均可

大致实现思路如下:

  •     外部包依赖docopt和paramiko

  •     有一个server信息文件,内容格式为 : “主机名-IP:端口”。脚本读取此文件来决定要对哪些server进行操作(该文件内部支持#注释掉某些server)

  •     采用了docopt提供命令行界面

  •     paramiko模块实现远程命令和sftp客户端功能。这里paramiko的sftp实例其只包含了基本的单个文件传输功能;并且不保存文件相关时间信息。

  •     paramiko 通过sftp实例传输文件环节,这里额外实现“保持文件时间信息”和“实现目录传输”以及“实现类似rsync的传输机制”是要考虑很多问题和逻辑的。传输机制模仿rsync的默认机制,检查文件的mtime和size,有差异才会真正传输。

  •     实现了参数中原路径和目标路径的自动判断,例如传输目录时不要求路径后面加‘/’

  •     对于远程命令(cmd),可以通过设置(--skip-err)跳过某些server的错误继续执行。例如批量执行‘ls’命令,一般情况会因为某些server上不存在而报错退出

  •     全面的错误信息提示。对于执行中的几乎所有可能出现的错误,都有捕获机制获取并输出

下面先来看一些基本的使用截图吧

帮助信息:

批量执行远程命令:


上传:

下载:


其实批量执行命令,传输文件在Linux上用shell也是可以很好的实现(而且ssh或rsync等也肯定比这套脚本功能点更多),但是考虑到并发执行以及在Linux和win平台的通用性,用Python来实现就有必要了。尤其是想在Win客户端和Linux服务器之间模仿rsync机制传输文件时,这个脚本就能派上用场了。


接下来直接看代码吧(我的老(lan)习惯,代码里注释还算详细,所以我就懒得再解释那么多喽)

#!/bin/env python3
# coding:utf-8
"""
Usage:
  auto_task [options] cmd <command> [--skip-err] [--parallel]
  auto_task [options] put <src> <dst> [--parallel]
  auto_task [options] get <src> <dst>


Options:
  -h --help             Show this screen.
  -u <user>             Remote username [default: root]
  -p <password>         User's password
  --pkey <private-key>  Local private key [default: /root/.ssh/id_rsa]
  --server <server_info_file>  
                        File include the remote server's information,
                        With the format of 'name-ip:port', such as 'web1-192.168.1.100:22',one server one line.
  --skip-err            Use with cmd, if sikp any server's error and continue process the other servers [default: False].
  --parallel            Parallel execution, only use with cmd or put. This option implies the --skip-err [default: False].

  cmd                   Run command on remote server(s),multiple commands sperate by ';'
  put                   Transfer from local to remote. Transport mechanism similar to rsync.
  get                   Transfer from remote to local. Transport mechanism similar to rsync.

  Notice:       cmd, get, put can only use one at once
  For Windows:  always use double quotes for quote something;
                it's highly recommend that with get or put in Windows,always use '/' instead of '\\'
"""

"""
by ljk 20160704
update at 2017011,20170320
"""
from docopt import docopt
from paramiko import SSHClient, AutoAddPolicy
from os import path, walk, makedirs, stat, utime
from re import split, match, search
from sys import exit, stdout
import platform
from math import floor
import threading

"""
因为涉及了(多)线程,所以我们将串行也归为单线程,这样可以统一用线程的一些思路,而不必编写一套多线程模型一套串行模型。
也因为多线程,所以输出用print()的话,各server的输出会对不上号,所以引入了OutputText类,将每个server的输出统一保存起来,最后打印出来
但是这样依然无法避免多个线程同时完成了,同时打印各自的最终结果。也就是说多线程任务最终需要输出时,输出这个动作必须要串行
"""


class OutputText:
    """该类的对象具有write()方法,用来存储每台server的执行结果.
    因为引入了多线程异步执行才需要这么做,以保证异步执行多台server的输出不会乱.
    为了简洁,并行与串行的输出就都用这一套东西了"""
    def __init__(self):
        self.buffer = []

    def write(self, *args, color=None):
        if color:
            if platform.uname().system == 'Windows':
                self.buffer.extend(args)
            else:
                self.buffer.extend('\033[0;{}m'.format(color))
                self.buffer.extend(args)
                self.buffer.extend('\033[0m')
        else:
            self.buffer.extend(args)

    def print_lock(self):
        """并发模式下,所有的输出动作都要加锁"""
        global_lock.acquire()
        for line in self.buffer:
            print(line, end='')
        global_lock.release()


def print_color(text, color=31, sep=' ', end='\n', file=stdout, flush=False):
    """打印彩色字体,color默认为红色
    该方法只针对Linux有效"""
    if platform.uname().system == 'Windows':
        print(text, sep=sep, end=end, file=file, flush=flush)
    else:
        print('\033[0;{}m'.format(color), end='')
        print(text, sep=sep, end=end, file=file, flush=flush)
        print('\033[0m', end='')


def get_ip_port(fname):
    """从制定文件(特定格式)中,取得主机名/主机ip/端口
    output:存储输出的对象"""
    try:
        with open(fname, 'r') as fobj:
            for line in fobj.readlines():
                if line != '\n' and not match('#', line):  # 过滤空行和注释行
                    list_tmp = split('[-:]', line)
                    server_name = list_tmp[0]
                    server_ip = list_tmp[1]
                    port = int(list_tmp[2])
                    yield (server_name, server_ip, port)
    except Exception as err:
        print_color('{}\n'.format(err))
        exit(10)


def create_sshclient(server_ip, port, output):
    """根据命令行提供的参数,建立到远程server的ssh链接.这段本应在run_command()函数内部。
    摘出来的目的是为了让sftp功能也通过sshclient对象来创建sftp对象,因为初步观察t.connect()方法在使用key时有问题
    output:存储输出的对象"""
    local_client = threading.local()  # 多线程中每个线程要在函数内某些保持自己特定值
    local_client.client = SSHClient()
    local_client.client.set_missing_host_key_policy(AutoAddPolicy())
    try:
        local_client.client.connect(server_ip, port=port, username=arguments['-u'], password=arguments['-p'], key_filename=arguments['--pkey'])
    except Exception as err:  # 有异常,打印异常,并返回'error'
        output.write('{}----{} ssh connect error: {}\n'.format(' ' * 4, server_ip, err), color=31)
        return 'error'
    else:
        return local_client.client  # 返回的client对象在每个线程内是不同的


# ----------
# run_command()执行远程命令
# ----------
def run_command(client, output):
    """
    执行远程命令的主函数
    client: paramiko.client.SSHClient object
    output: 存储输出的对象
    """
    # stdout 假如通过分号提供单行的多条命令,所有命令的输出(在linux终端会输出的内容)都会存储于stdout
    # 据观察,下面三个变量的特点是无论"如何引用过一次"之后,其内容就会清空
    # 有readlines()的地方都是流,用过之后就没有了
    stdin, stdout, stderr = client.exec_command(arguments['<command>'])
    copy_out, copy_err = stdout.readlines(), stderr.readlines()
    if len(copy_out) and len(copy_err):
        output.write('%s----result:\n' % (' ' * 8))
        for i in copy_out:
            output.write('%s%s' % (' ' * 12, i))
        for i in copy_err:
            output.write('%s%s' % (' ' * 12, i), color=31)
        if not arguments['--skip-err']:    # 忽略命令执行错误的情况
            output.print_lock()
            exit(10)
    elif len(copy_out):
        output.write('%s----result:\n' % (' ' * 8))
        for i in copy_out:
            output.write('%s%s' % (' ' * 12, i))
    elif len(copy_err):
        output.write('%s----error:\n' % (' ' * 8), color=31)
        for i in copy_err:
            output.write('%s%s' % (' ' * 12, i), color=31)
        if not arguments['--skip-err']:
            client.close()
            output.print_lock()
            exit(10)
    client.close()


# ----------
# sftp_transfer() 远程传输文件的主函数
# ----------
def sftp_transfer(source_path, destination_path, method, client, output):
    """
    文件传输的 主函数
    paramiko的sftp client传输,只能单个文件作为参数,并且不会保留文件的时间信息,这两点都需要代码里额外处理
    client: paramiko.client.SSHClient object
    output:存储输出的对象
    """
    sftp = client.open_sftp()
    
    if platform.system() == 'Windows':
        '''根据put或get,将windows路径中的 \ 分隔符替换为 / '''
        if arguments["put"]:
            source_path = source_path.replace('\\', '/')
        elif arguments["get"]:
            destination_path = destination_path.replace('\\', '/')

    # -----下面定义sftp_transfer()函数所需的一些子函数-----
    def process_arg_dir(target):
        """处理目录时,检查用户输入,在路径后面加上/"""
        if not target.endswith('/'):
            target = target + '/'
        return target

    def sftp_put(src, dst, space):
        """封装put,增加相应输出,并依据m_time和size判断两端文件一致性,决定是否传输该文件"""
        if check_remote_path(dst) == 'file':
            src_stat = stat(src)
            dst_stat = sftp.stat(dst)
        else:
            src_stat = ''
            dst_stat = ''
        if (src_stat == '' and dst_stat == '') or not (floor(src_stat.st_mtime) == dst_stat.st_mtime and src_stat.st_size == dst_stat.st_size):
            try:
                sftp.put(src, dst)
                output.write('%s%s\n' % (' ' * space, src))
            except Exception as err:
                output.write('%s----Uploading %s Failed\n' % (' ' * (space-4), src), color=31)
                output.write('{}----{}\n'.format(' ' * (space-4), err), color=31)
                client.close()
                output.print_lock()
                exit(10)

    def sftp_get(src, dst, space):
        """封装get,增加相应输出,并依据m_time和size判断两端文件一致性,决定是否传输该文件"""
        if path.isfile(dst):
            src_stat = sftp.stat(src)
            dst_stat = stat(dst)
        else:
            src_stat = ''
            dst_stat = ''
        if (src_stat == '' and dst_stat == '') or not (src_stat.st_mtime == floor(dst_stat.st_mtime) and src_stat.st_size == dst_stat.st_size):
            try:
                sftp.get(src, dst)
                output.write('%s%s\n' % (' ' * space, src))
            except Exception as err:
                output.write('%s----Downloading %s Failed\n' % (' ' * (space-4), src), color=31)
                output.write('{}----{}\n'.format(' ' * (space-4), err), color=31)
                client.close()
                output.print_lock()
                exit(10)

    def sftp_transfer_rcmd(cmd=None, space=None):
        """
        在文件传输功能中,有些时候需要在远程执行一些命令来获取某些信息
        client: paramiko.client.SSHClient object
        output:存储输出的对象
        """
        stdin, stdout, stderr = client.exec_command(cmd)
        copy_out, copy_err = stdout.readlines(), stderr.readlines()
        if len(copy_err):
            for i in copy_err:
                output.write('%s----%s' % (' ' * space, i), color=31)
            output.print_lock()
            exit(10)
        elif len(copy_out):
            return copy_out

    def check_remote_path(r_path):
        """通过client对象在远程linux执行命令,来判断远程路径是否存在,是文件还是目录"""
        check_cmd = 'if [ -e {0} ];then if [ -d {0} ];then echo directory;elif [ -f {0} ];then echo file;fi;else echo no_exist;fi'.format(r_path)
        # check_cmd命令会有三种‘正常输出’directory  file  no_exist
        check_result = sftp_transfer_rcmd(cmd=check_cmd)[0].strip('\n')
        if check_result == 'directory':
            return 'directory'
        elif check_result == 'file':
            return 'file'
        else:
            return 'no_exist'

    def file_time(target, location):
        """获取源文件的atime和mtime"""
        if location == 'local':
            target_stat = stat(target)
        elif location == 'remote':
            target_stat = sftp.stat(target)
        return target_stat.st_atime, target_stat.st_mtime

    def create_dir(target, location, space):
        """将创建目录的代码集中到一个函数"""
        if location == 'local':
            try:
                output.write('%s----Create Local Dir: %s\n' % (' ' * space, target))
                makedirs(target)
            except Exception as err:
                # print_color('%s----%s' % (' ' * space, str(err)))
                output.write('%s----%s\n' % (' ' * space, str(err)), color=31)
                output.print_lock()
                exit(10)
        elif location == 'remote':
            output.write('%s----Create Remote Dir: %s\n' % (' ' * space, target))
            sftp_transfer_rcmd(cmd='mkdir -p {}'.format(target), space=space)
    # -----子函数定义完毕-----

    # -----上传逻辑-----
    if method == 'put':
        output.write('%s----Uploading %s TO %s\n' % (' ' * 4, source_path, destination_path))
        if path.isfile(source_path):
            '''判断src是文件'''
            check_remote_path_result = check_remote_path(destination_path)
            if check_remote_path_result == 'file':
                pass
            elif check_remote_path_result == 'directory':  # dst经判断为目录
                destination_path = process_arg_dir(destination_path) + path.basename(source_path)
            else:
                if not check_remote_path(path.dirname(destination_path)) == 'directory':
                    create_dir(path.dirname(destination_path), 'remote', 8)
                if destination_path.endswith('/') or destination_path.endswith('\\'):
                    destination_path = destination_path + path.basename(source_path)

            sftp_put(source_path, destination_path, 12)
            sftp.utime(destination_path, file_time(source_path, 'local'))
        elif path.isdir(source_path):
            '''判断src是目录'''
            if check_remote_path(destination_path) == 'file':
                output.write('%s----%s is file\n' % (' ' * 8, destination_path), color=31)
                output.print_lock()
                exit(10)
            source_path, destination_path = process_arg_dir(source_path), process_arg_dir(destination_path)
            for root, dirs, files in walk(source_path):
                '''通过 os.walk()函数取得目录下的所有文件,此函数默认包含 . ..的文件/目录,需要去掉'''
                for file_name in files:
                    s_file = path.join(root, file_name)  # 逐级取得每个sftp client端文件的全路径
                    if not search('.*/\..*', s_file):
                        '''过滤掉路径中包含以.开头的目录或文件'''
                        d_file = s_file.replace(source_path, destination_path, 1)  # 由local_file取得每个远程文件的全路径
                        d_path = path.dirname(d_file)
                        if check_remote_path(d_path) == 'directory':
                            sftp_put(s_file, d_file, 12)
                        else:
                            create_dir(d_path, 'remote', 8)
                            sftp_put(s_file, d_file, 12)

                        sftp.utime(d_file, file_time(s_file, 'local'))
        else:
            output.write('%s%s is not exist\n' % (' ' * 8, source_path), color=31)
            output.print_lock()
            exit(10)

    # -----下载逻辑-----
    elif method == 'get':
        output.write('%s----Downloading %s TO %s\n' % (' ' * 4, source_path, destination_path))
        check_remote_path_result = check_remote_path(source_path)

        if check_remote_path_result == 'file':
            '''判断source_path是文件'''
            if path.isfile(destination_path):  # destination_path为文件
                pass
            elif path.isdir(destination_path):  # destination_path为目录
                destination_path = process_arg_dir(destination_path) + path.basename(source_path)
            else:
                if not path.isdir(path.dirname(destination_path)):
                    create_dir(path.dirname(destination_path), 'local', 8)
                if destination_path.endswith('/') or destination_path.endswith('\\'):
                    destination_path = destination_path + path.basename(source_path)

            sftp_get(source_path, destination_path, 12)
            utime(destination_path, file_time(source_path, 'remote'))
        elif check_remote_path_result == 'directory':
            '''判断source_path是目录'''
            if path.isfile(destination_path):
                output.write('%s----%s is file\n' % (' ' * 8, destination_path), color=31)
                output.print_lock()
                exit(10)
            source_path, destination_path = process_arg_dir(source_path), process_arg_dir(destination_path)

            def process_sftp_dir(path_name):
                """
                此函数递归处理sftp server端的目录和文件,并在client端创建所有不存在的目录,然后针对每个文件在两端的全路径执行get操作.
                path_name第一次的引用值应该是source_path的值
                """
                d_path = path_name.replace(source_path, destination_path, 1)
                if not path.exists(d_path):  # 若目标目录不存在则创建
                    create_dir(d_path, 'local', 8)
                for name in (i for i in sftp.listdir(path=path_name) if not i.startswith('.')):
                    '''去掉以.开头的文件或目录'''
                    s_file = path.join(path_name, name)  # 源文件全路径 
                    d_file = s_file.replace(source_path, destination_path, 1)  # 目标端全路径
                    chk_r_path_result = check_remote_path(s_file)
                    if chk_r_path_result == 'file':  # 文件
                        sftp_get(s_file, d_file, 12)
                        utime(d_file, file_time(s_file, 'remote'))
                    elif chk_r_path_result == 'directory':  # 目录
                        process_sftp_dir(s_file)  # 递归调用本身

            process_sftp_dir(source_path)
        else:
            output.write('%s%s is not exist\n' % (' ' * 8, source_path), color=31)
            output.print_lock()
            exit(10)
    client.close()


def process_single_server(server_name, server_ip, port):
    """处理一台server的逻辑"""
    local_data = threading.local()  # 可以看到多线程情况下,确实是不同的OutputText实例,说明threading.local()起到了预期作用
    local_data.output = OutputText()
    local_data.output.write('\n--------{}\n'.format(server_name))  # 这行写入的数据可以在多线程环境下正常打出
    client = create_sshclient(server_ip, port, local_data.output)
    if client == 'error':
        if not arguments['--skip-err']:
            exit(10)
        else:
            return
    # 区别处理 cmd put get参数
    if arguments['cmd']:
        run_command(client, local_data.output)
    elif arguments['put']:
        sftp_transfer(arguments['<src>'], arguments['<dst>'], 'put', client, local_data.output)
    elif arguments['get']:
        sftp_transfer(arguments['<src>'], arguments['<dst>'], 'get', client, local_data.output)
    # 前面的逻辑可以并行,打印必须要加锁实现串行
    local_data.output.print_lock()


if __name__ == "__main__":
    global global_lock
    global_lock = threading.Lock()
    arguments = docopt(__doc__)
    try:
        if not arguments['--parallel']:
            for server_name, server_ip, port in get_ip_port(arguments['--server']):
                '''循环处理每个主机'''
                process_single_server(server_name, server_ip, port)
        else:
            for server_name, server_ip, port in get_ip_port(arguments['--server']):
                # executor.submit(process_single_server, server_name, server_ip, port)
                t = threading.Thread(target=process_single_server, args=(server_name, server_ip, port))
                t.start()
                # t.join()  # 谁对t线程发起join,谁就阻塞直到t线程执行完
    except KeyboardInterrupt:
        print_color('\n-----bye-----')


另外脚本里包含了两个有用的函数(类):

  • print_color()函数方便的在Linux下实现打印不同颜色的字体;

  • OutputText类在多线程任务需要在中终端打印结果时会非常有用

其实之所以想造这么一个轮子,一方面能锻炼python coding,另一方面当时确实有这么一个需求。而且用自己的工具完成工作也是小有成就的(请勿拍砖~)。

另外,在开发过程中对于一些概念性的东西也都有了更深入的了解:

  • 例如在使用paramiko模块的过程中,又促使我深入的了解了一些ssh登陆的详细过程。

  • 又如用到了线程模型,更深入的了解了线程进程相关的概念。

所以作为一枚运维老司机,越来越深刻的理解到“运维”和“开发”这俩概念之间的相互促进。希望大家共勉。


相关文章:

  • 一篇很全面的IOS面试题(下)
  • ViewController与outlet绑定
  • 三、Python-列表
  • 创建和删除数据库和基本查询
  • 翻译 | The Principles of OOD 面向对象设计原则
  • 内置对象String及String的常用操作
  • Java提高篇——单例模式
  • 致远力推协同运营中台,赋能数字化升级
  • 文件属性权限及其权限设置
  • 关于ORACLE的SQL语句拼接、替换、截取、排序,联表等...~持续汇总~
  • Python语言学习 (三)1.1
  • windows服务器上面创建定时任务
  • mybatis整合ehcache
  • mac显示隐藏文件
  • 谈谈持续集成,持续交付,持续部署之间的区别
  • 【跃迁之路】【669天】程序员高效学习方法论探索系列(实验阶段426-2018.12.13)...
  • 2018以太坊智能合约编程语言solidity的最佳IDEs
  • 4月23日世界读书日 网络营销论坛推荐《正在爆发的营销革命》
  • Javascript基础之Array数组API
  • jquery cookie
  • Laravel Telescope:优雅的应用调试工具
  • MySQL-事务管理(基础)
  • Spring Security中异常上抛机制及对于转型处理的一些感悟
  • Terraform入门 - 3. 变更基础设施
  • windows下mongoDB的环境配置
  • 高性能JavaScript阅读简记(三)
  • 互联网大裁员:Java程序员失工作,焉知不能进ali?
  • 理解 C# 泛型接口中的协变与逆变(抗变)
  • 算法系列——算法入门之递归分而治之思想的实现
  • 【云吞铺子】性能抖动剖析(二)
  • 如何在 Intellij IDEA 更高效地将应用部署到容器服务 Kubernetes ...
  • ​LeetCode解法汇总2182. 构造限制重复的字符串
  • ​创新驱动,边缘计算领袖:亚马逊云科技海外服务器服务再进化
  • # Swust 12th acm 邀请赛# [ K ] 三角形判定 [题解]
  • # 执行时间 统计mysql_一文说尽 MySQL 优化原理
  • #1015 : KMP算法
  • #android不同版本废弃api,新api。
  • #传输# #传输数据判断#
  • (html转换)StringEscapeUtils类的转义与反转义方法
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (SpringBoot)第二章:Spring创建和使用
  • (二)【Jmeter】专栏实战项目靶场drupal部署
  • (非本人原创)史记·柴静列传(r4笔记第65天)
  • (附源码)计算机毕业设计ssm电影分享网站
  • (简单有案例)前端实现主题切换、动态换肤的两种简单方式
  • (十一)c52学习之旅-动态数码管
  • (转载)PyTorch代码规范最佳实践和样式指南
  • .NET Core WebAPI中使用swagger版本控制,添加注释
  • .NET Core 和 .NET Framework 中的 MEF2
  • .NET MVC之AOP
  • .NET NPOI导出Excel详解
  • .NET/C# 利用 Walterlv.WeakEvents 高性能地中转一个自定义的弱事件(可让任意 CLR 事件成为弱事件)
  • .NET/C# 推荐一个我设计的缓存类型(适合缓存反射等耗性能的操作,附用法)
  • .NET3.5下用Lambda简化跨线程访问窗体控件,避免繁复的delegate,Invoke(转)