当前位置: 首页 > news >正文

python运维实战-ssh工具

1. 功能描述

1.1 系统具有的功能描述

(1)连接服务器:用户可以通过系统连接到远程服务器,系统支持多个服务器配置,并且可以方便地管理这些配置。

(2)执行命令:用户可以在连接成功后,在界面中输入命令,并执行在远程服务器上执行。执行结果会显示在界面中,包括标准输出和标准错误。

(3)上传和下载文件:用户可以通过系统上传本地文件到远程服务器,或者下载远程服务器上的文件到本地。

(4)服务器监控:系统可以监控远程服务器的 CPU 使用率、内存使用率和进程列表,实时显示在界面上。

(5)查看日志文件:用户可以通过系统查看远程服务器上的日志文件,并且可以选择下载到本地进行查看。

(6)Vim编辑文件:用户可以在界面上编辑远程服务器上的文件,编辑后可以保存到服务器上。

1.2 所用技术与模块描述

(1)JSON模块(json):用于存储和加载配置信息、与服务器交换数据等。

操作系统路径模块(os.path):主要用于处理文件路径,比如检查文件是否存在、获取文件或目录的详细信息等。

(2)正则表达式模块(re):Python的正则表达式模块,用于字符串匹配和替换。

(3)套接字模块(socket):提供了网络通信的底层接口,允许应用程序在网络上发送和接收数据。

(4)子进程模块(subprocess):允许从Python脚本中启动新的应用程序,连接到它们的输入/输出/错误管道,并获取它们的返回码。

 

(5)线程模块(threading):提供了对线程的基本支持,线程是轻量级的进程,允许在Python程序中并发执行代码。

时间模块(time):提供了与时间相关的各种功能,如获取当前时间、执行时间延迟等。

(6)Tkinter简单对话框模块(tkinter.simpledialog):Tkinter GUI库的一部分,提供了简单的对话框功能,如输入对话框、消息框等。

(7)Paramiko模块(paramiko):一个实现SSHv2协议的Python库,支持连接和操作SSH服务器。

(8)Watchdog事件模块(watchdog.events):watchdog库的一部分,提供了文件系统事件(如文件创建、修改、删除等)的接口。

(9)Watchdog观察者模块(watchdog.observers):watchdog库的一部分,提供了跨平台的文件系统事件观察者实现。

1.3 ssh暗影坤手概述

一个简单易用的SSH管理工具,允许用户通过图形化界面实现对虚拟机(VM)的基本操作。通过集成SSH协议,该工具能够提供远程登录、命令执行、文件传输等核心功能,为用户提供便捷、安全的虚拟机管理体验。

 

2. 系统功能实现

2.1 服务器信息管理的功能设计与实现

当前目录下创建一个server.json的文件

将所有服务器信息保存在json文件中,调用时借助函数加载文件,读取数据。

代码展示如下:

def load_server(self):with open('server.json', "r", encoding='utf-8') as fp:self.servers = json.load(fp)print(f"获取到的服务器信息:{self.servers}")def save_server(self):with open('server.json', "w", encoding='utf-8') as fp:fp.write(json.dumps(self.servers))

 

2.2 连接服务器的功能设计与实现

(1)读取json文件中的数据,将ip、端口、用户名等信息传到connect中连接服务器。

代码展示如下:

def connecting(self):self.connected = Noneself.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)try:# 连接到服务器self.ssh.connect(hostname=self.server["ip"], port=self.server["port"], username=self.server["username"],password=self.server["password"])print(f"【success】连接到服务器【{self.server['ip']}】成功!!!")self.connected = Trueself.get_pwd()return Trueexcept Exception as e:print(f"【errors】连接到服务器【{self.server['ip']}】失败!!!")self.connected = Falsereturn False

(2)创建连接指定服务器函数,并且创建线程拥堵单独执行连接操作,不影响其他操作的执行,创建显示函数将连接成功或失败信息展示到界面,以便用户清楚服务器是否已经连接成功。

代码展示如下:

# 连接到指定的服务器
def connect(self, server):print("连接到指定的服务器")self.mssh.server = serverprint(f"服务器信息:{self.mssh.server}")socket.setdefaulttimeout(5)# 在单独的线程中执行连接操作thread_connect = threading.Thread(target=self.connect_thread, args=(server,))thread_connect.start()self.pop_win.destroy()def connect_thread(self, server):result = self.mssh.connecting()if result:self.is_connected = Trueself.update_connection_status(server, True)else:self.is_connected = Falseself.update_connection_status(server, False)
#服务器连接展示
def update_connection_status(self, server, connected):if connected:self.label_server.config(text=f"服务器{server['ip']}连接成功", foreground="green")else:self.label_server.config(text=f"服务器{server['ip']}连接失败", foreground="red")

2.3 添加服务器的功能设计与实现

(1)首先创建函数用于添加服务器的操作,然后输入服务器的IP地址,账户名和密码添加服务器,最后将服务器信息保存到json文件中。

代码展示如下:

def add_server(self, ip, port, username, password, description):self.servers.append({"description": description,"ip": ip,"port": port,"username": username,"password": password})self.save_server()

(2)创建新建函数,用于与用户交互的图形化界面,让用户输入服务器的IP地址,账户名和密码添加服务器信息。

代码展示如下:

# 新建连接
def new_connect_ui(self):self.pop_win = tk.Toplevel(self.root)# 设置标题和大小self.pop_win.title("新建连接")self.pop_win.geometry("300x200+600+250")frame = tk.Frame(self.pop_win)frame.grid()# IPip_label = tk.Label(frame, text="服务器IP地址:")ip_label.grid(row=0, column=0)self.ip_entry = tk.Entry(frame)self.ip_entry.grid(row=0, column=1)# Portport_label = tk.Label(frame, text="服务器端口号:")port_label.grid(row=1, column=0)self.port_entry = tk.Entry(frame)self.port_entry.grid(row=1, column=1)# Usernameusername_label = tk.Label(frame, text="用户名:")username_label.grid(row=2, column=0)self.username_entry = tk.Entry(frame)self.username_entry.grid(row=2, column=1)# Passwordpassword_label = tk.Label(frame, text="密码:")password_label.grid(row=3, column=0)self.password_entry = tk.Entry(frame, show="*")self.password_entry.grid(row=3, column=1)# Descriptiondescription_label = tk.Label(frame, text="备注信息:")description_label.grid(row=4, column=0)self.description_entry = tk.Entry(frame)self.description_entry.grid(row=4, column=1)# Buttonsubmit_button = tk.Button(frame, text="保存", command=self.save_server_info)submit_button.grid(row=5, columnspan=2)

(3)获取用户所输入的信息,将服务器信息保存到文件。

代码展示如下:

def save_server_info(self):ip = self.ip_entry.get()port = self.port_entry.get()username = self.username_entry.get()password = self.password_entry.get()description = self.description_entry.get()# 调用 add_server 函数,将获取到的信息添加到服务器列表中ssh.add_server(ip=ip, port=int(port), username=username, password=password, description=description)messagebox.showinfo("成功","服务器信息已保存!!")# print("服务器信息已保存")

 

2.4 删除服务器的功能设计与实现

创建删除函数,当用户点击确定时,调用remove移除服务器信息,并且返回删除成功提示信息。

代码展示如下:

def del_connect(self, server):print("删除指定的服务器")# 输出服务器信息print(f"服务器信息:{server}")# 确认是否删除服务器confirm = messagebox.askyesno("确认删除", f"确认删除服务器 {server['ip']} 吗?")if confirm:# 从服务器列表中删除指定的服务器if server in self.mssh.servers:self.mssh.servers.remove(server)self.pop_win.destroy()self.open_connect_ui()messagebox.showinfo("删除成功", f"服务器 {server['ip']} 已成功删除!")else:messagebox.showerror("错误", "服务器不存在,无法删除")# 保存更新后的服务器列表到配置文件中self.mssh.save_server()

 

2.5 修改服务器的功能设计与实现

(1)创建修改函数,用于与用户交互的图形化界面,让用户修改服务器的IP地址、账户名和密码等服务器信息。

代码展示如下:

def modify_server(self, server):print("修改服务器信息")self.mssh.server = server.copy()  # 添加这一行来复制服务器信息self.pop_win_modify = tk.Toplevel(self.root)frame = tk.Frame(self.pop_win_modify)frame.grid()# IPip_label = tk.Label(frame, text="服务器IP地址:")ip_label.grid(row=0, column=0)self.ip_entry = tk.Entry(frame, textvariable=tk.StringVar(value=server['ip']))self.ip_entry.grid(row=0, column=1)# Portport_label = tk.Label(frame, text="服务器端口号:")port_label.grid(row=1, column=0)self.port_entry = tk.Entry(frame, textvariable=tk.StringVar(value=str(server['port'])))self.port_entry.grid(row=1, column=1)# Usernameusername_label = tk.Label(frame, text="用户名:")username_label.grid(row=2, column=0)self.username_entry = tk.Entry(frame, textvariable=tk.StringVar(value=server['username']))self.username_entry.grid(row=2, column=1)# Passwordpassword_label = tk.Label(frame, text="密码:")password_label.grid(row=3, column=0)self.password_entry = tk.Entry(frame, show="*", textvariable=tk.StringVar(value=server['password']))self.password_entry.grid(row=3, column=1)# Descriptiondescription_label = tk.Label(frame, text="备注信息:")description_label.grid(row=4, column=0)self.description_entry = tk.Entry(frame, textvariable=tk.StringVar(value=server['description']))self.description_entry.grid(row=4, column=1)# Buttonsubmit_button = tk.Button(frame, text="保存", command=self.save_modified_server)submit_button.grid(row=5, columnspan=2)

(2)获取修改后的服务器信息,保存到json文件中。

代码展示如下:

def save_modified_server(self):modified_server = {"ip": self.ip_entry.get(),"port": int(self.port_entry.get()),"username": self.username_entry.get(),"password": self.mssh.server["password"],  # 假设密码不变,保持原值"description": self.description_entry.get(),}index = self.mssh.servers.index(self.mssh.server)self.mssh.servers[index] = modified_servermessagebox.showinfo("成功", "服务器信息已成功修改!")self.pop_win_modify.destroy()self.open_connect_ui()self.mssh.save_server()

 

2.6 执行交互式命令的功能设计与实现

主要调用paramiko库的exec_command方法执行命令,同时获取标准输入、标准输出和标准错误。

(1)基本命令cd的实现

创建exec方法主要用于执行远程命令,并处理用户输入的路径,确保命令相对于当前路径执行。方法接收用户输入的命令,执行命令(若是"cd"命令则更新当前路径),获取命令执行结果,并返回执行结果。

代码展示如下:

def exec(self, command):if command.startswith("cd"):tmp_path = command.split(" ")[-1]if tmp_path.startswith("/"):# 绝对路径处理self.path = tmp_pathelse:# todo 如果用户输入的是一个相对路径呢?print("用户输入的是相对路径")if tmp_path.startswith(".."):self.path = self.path + "/" + tmp_pathelse:self.path = self.path + tmp_pathelse:# 根据当前的路径去执行指令command = f"cd {self.path} && {command}"print(f"->输入指令({self.path}):{command}")stdin, stdout, stderr = self.ssh.exec_command(command)result = stdout.read().decode("utf-8")print(f"<-返回:{result}")# 用户输入的路径里包含.. ,通过pwd刷新当前路径if command.find("..") > 0:self.get_pwd()return result

 

(2)创建函数用于获取当前路径

代码展示如下:

# 使用pwd更新当前路径
def get_pwd(self):stdin, stdout, stderr = self.ssh.exec_command(f"cd {self.path} && pwd")result = stdout.read().decode("utf-8")self.path = result.strip()

 

  1. 交互式命令实现

通过SSH与远程服务器进行交互,用户可以输入命令并发送到远程服务器,同时应用会读取并显示来自远程服务器的输出。所有的输入/输出处理都在单独的线程中进行,以保持GUI的响应性。

代码展示如下:

def _read_channel(self, channel,output_callback):"""在独立线程中读取通道输出,并调用回调函数处理"""while True:if channel.recv_ready():data = channel.recv(1024)output_callback(data)def handle_output(self, data):# 先解码数据decoded_data = data.decode("utf-8")# 去除颜色转义序列cleaned_data = re.sub(r'\x1B\[([0-?]*[ -/]*[@-~])', '', decoded_data)# 插入到文本框的末尾self.txt_result.insert(tk.END, cleaned_data)# 确保滚动条跟随到最后self.txt_result.see(tk.END)def okk(self,event):self.command.set('q')self.ok(self)def ok(self, event):command = self.command.get()if not self.mssh.connected:self.txt_result.insert(tk.END, "未连接到服务器!\n")return# 如果ssh_channel尚未创建,创建一个新的if self.mssh_channel is None:self.mssh_channel = self.mssh.ssh.invoke_shell()self.mssh_channel.set_combine_stderr(True)try:# 使用ssh_channel发送命令self.mssh_channel.send(f"{command}\n")# 创建线程监听输出并调用UI提供的回调函数t = threading.Thread(target=self.mssh._read_channel, args=(self.mssh_channel, self.handle_output))t.daemon = Truet.start()# 设置延时清空输入框self.root.after(2000, lambda: self.command.set(""))except Exception as e:self.txt_result.insert(tk.END, f"执行错误: {str(e)}\n")# 清空输入框self.command.set("")

2.7 vim编辑器功能设计与实现

利用watchdog模块监控文件系统的更改,如文件的创建、修改、移动或删除等,并在这些事件发生时执行自定义操作。

(1)定义watch_and_upload方法用于监视本地文件的修改并在远程服务器上进行相应的上传操作。

代码展示如下:

def watch_and_upload(self, local_file_path, remote_full_path):class FileModifiedHandler(FileSystemEventHandler):def __init__(self, ssh_handler, remote_full_path):self.ssh_handler = ssh_handlerself.remote_full_path = remote_full_pathdef on_modified(self, event):if event.is_directory or not event.src_path == local_file_path:return# messagebox.showinfo("文件被修改", f"文件被修改:{event.src_path}")try:self.ssh_handler.connecting()  # 重新连接SSH(如果需要)sftp = self.ssh_handler.ssh.open_sftp()sftp.put(local_file_path, self.remote_full_path)sftp.close()# messagebox.showinfo("文件上传成功", f"vim修改成功: {local_file_path} -> {self.remote_full_path}")except Exception as e:messagebox.showerror("文件上传失败", f"文件上传失败: {e}")event_handler = FileModifiedHandler(self, remote_full_path)observer = Observer()observer.schedule(event_handler, path=os.path.dirname(local_file_path), recursive=False)observer.start()try:# 等待用户操作或其他信号来停止监视# 这里使用time.sleep作为示例,但建议使用更好的机制,如事件或信号while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()

(2)创建vim_edit 函数用于在vscode中编辑文件。首先提示用户输入要编辑的文件名,并获取远程文件的路径,用户选择本地目录,并构造本地文件的完整路径,连接到远程服务器,并创建 SFTP 客户端,检查远程文件是否存在,如果不存在则在远程服务器上创建空文件。下载文件到本地,并启动文件监视线程 watch_and_upload,然后调用子进程打开文件编辑器。

 

def vim_edit(self):remote_file = simpledialog.askstring("vim", "请输入要编辑的完整远程文件路径:")if remote_file is None:  # 用户点击“取消”或关闭对话框returnelif not remote_file:  # 用户输入了空字符串messagebox.showerror("错误", "请输入文件路径!")return# 检查用户是否输入了以'/'开头的绝对路径if not remote_file.startswith('/'):messagebox.showerror("错误", "请输入绝对路径,例如:/home/user/file.txt")return# 临时文件目录tmp = filedialog.askdirectory(title="选择下载到的本地目录")if not tmp:  # 用户取消了选择目录returnlocal_file_path = os.path.join(tmp, os.path.basename(remote_file))try:self.connecting()sftp = self.ssh.open_sftp()# 检查远程文件是否存在try:sftp.stat(remote_file)except FileNotFoundError:# 如果不存在,则在远程服务器上创建空文件with sftp.open(remote_file, 'wb') as f:pass  # 写入空内容或者模板内容# 下载文件到本地sftp.get(remote_file, local_file_path)sftp.close()# 启动文件监视线程watch_thread = threading.Thread(target=self.watch_and_upload, args=(local_file_path, remote_file))watch_thread.start()# 打开文件编辑器subprocess.run(['Code', local_file_path], shell=True)except Exception as e:messagebox.showerror("文件下载失败", f"文件下载失败:{e}")print(f"文件下载失败:{e}")

2.8 文件远程上传下载的功能设计与实现

(1)文件上传功能实现

首先检查是否已连接到服务器。如果未连接到服务器,则显示错误消息并返回。如果已连接到服务器,则调用filedialog.askopenfilename()方法请求用户选择要上传的文件。接下来,如果用户选择了文件,则提取文件名并创建一个SFTP客户端。然后,计算远程路径,并在控制台打印文件和远程路径信息。最后,使用SFTP客户端将本地文件上传到远程路径,并显示上传成功的消息框。如果用户未选择文件,则输出“请选择要上传的文件”。如果出现任何异常,则显示文件上传失败的错误消息。

代码展示如下:

# 上传文件
def upload_file(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnelse:self.file_path = filedialog.askopenfilename()try:if self.file_path:# 获取文件名file_name = self.file_path.split("/")[-1]# 创建 SFTP 客户端sftp_client = self.mssh.ssh.open_sftp()remote_path = f"{self.mssh.path}/{file_name}"print(f"文件:{self.file_path},上传到:{remote_path}")# 上传文件sftp_client.put(self.file_path, remote_path)messagebox.showinfo("成功", "上传成功")sftp_client.close()else:print("请选择要上传的文件")except Exception as e:messagebox.showerror("错误", f"文件上传失败: {e}")#判断是否连接服务器def vim_edit(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnself.mssh.vim_edit()

 

 

(2)文件下载功能实现

使用tk中的simpledialog.askstring 获取用户输入的要下载文件名,filedialog.askdirectory获取用户所选目录的完整路径,创建一个SFTP连接,并调用get方法来下载文件,调用完关闭SFTP连接,弹窗提示用户下载成功。

代码展示如下:

 

def download_file(self):#tk.simpledialog.askstring 的主要用途是获取用户输入的文本信息remote_file = tk.simpledialog.askstring("输入文件名", "请输入要下载的文件名:")if remote_file is None:  # 用户点击“取消”或关闭对话框returnelif not remote_file:  # 用户输入了空字符串messagebox.showerror("错误", "请输入文件名!")return#filedialog.askdirectory返回的是用户所选目录的完整路径,而不是文件名local_path = filedialog.askdirectory(title="选择下载到的本地目录")if local_path:try:sftp = self.ssh.open_sftp()local_file_path = os.path.join(local_path, remote_file)sftp.get(remote_file, local_file_path)sftp.close()messagebox.showinfo("下载成功", f"文件下载成功: {remote_file} -> {local_path}")except Exception as e:messagebox.showerror("下载失败", f"文件下载失败: {e}")#下载文件
def downloader(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnself.mssh.download_file()

 

2.9 服务器监控功能设计与实现

通过SSH连接到远程服务器,并在界面上展示服务器的进程信息、系统资源利用情况和磁盘使用情况,同时处理连接状态的变化。

(1)进程展示的实现

创建get_process方法通过执行"ps -ef"命令获取进程信息并展示在界面上的表格中。如果连接失败或执行命令出错,会通过弹出窗口显示错误信息。 代码展示如下:

def get_process(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnself.pop_win = tk.Toplevel(self.root)self.pop_win.title("进程")self.pop_win.geometry("800x600+600+250")table_frame = ttk.Frame(self.pop_win)table_frame.pack(padx=10, pady=10, fill="both", expand=True)server = self.mssh.serverip = server['ip']username = server['username']password = server['password']try:client = paramiko.SSHClient()client.set_missing_host_key_policy(paramiko.AutoAddPolicy())client.connect(ip, 22, username, password)stdin, stdout, stderr = client.exec_command("ps -ef")result = stdout.read().decode("utf-8").strip().split("\n")# 创建表格table = ttk.Treeview(table_frame, columns=("user", "pid", "cpu", "mem", "vsz", "rss", "tty", "stat", "start", "time", "command"), show="headings")table.heading("user", text="用户")table.heading("pid", text="进程ID")table.heading("cpu", text="CPU%")table.heading("mem", text="内存%")table.heading("vsz", text="虚拟内存")table.heading("rss", text="常驻内存")table.heading("tty", text="终端")table.heading("stat", text="状态")table.heading("start", text="启动时间")table.heading("time", text="CPU时间")table.heading("command", text="命令")table.pack(side="left", fill="both", expand=True)# 添加滚动条scrollbar = ttk.Scrollbar(table_frame, orient="vertical", command=table.yview)scrollbar.pack(side="right", fill="y")table.configure(yscrollcommand=scrollbar.set)# 填充表格数据for line in result[1:]:parts = line.split()if len(parts) >= 10:user, pid, cpu, mem, vsz, rss, tty, stat, start, time, *command = partscommand = " ".join(command)table.insert("", "end", values=(user, pid, cpu, mem, vsz, rss, tty, stat, start, time, command))# 调整列宽度table.column("#0", width=0, stretch="no")  # 隐藏第一列table.column("user", width=80, anchor="w")table.column("pid", width=60, anchor="e")table.column("cpu", width=60, anchor="e")table.column("mem", width=60, anchor="e")table.column("vsz", width=100, anchor="e")table.column("rss", width=100, anchor="e")table.column("tty", width=60, anchor="w")table.column("stat", width=60, anchor="w")table.column("start", width=120, anchor="w")table.column("time", width=80, anchor="e")table.column("command", width=300, anchor="w")# 自动调整行高def fixed_map(option):return [elm for elm in style.map("Treeview", query_opt=option) if elm[:2] != ("!disabled", "!selected")]style = ttk.Style()style.map("Treeview", foreground=fixed_map("foreground"), background=fixed_map("background"))except paramiko.AuthenticationException:messagebox.showerror("错误", "认证失败,请验证您的凭据")except paramiko.SSHException as sshException:messagebox.showerror("错误", f"无法建立SSH连接: {sshException}")except paramiko.BadHostKeyException as badHostKeyException:messagebox.showerror("错误", f"无法验证服务器的主机密钥: {badHostKeyException}")except Exception as e:messagebox.showerror("错误", f"发生错误: {e}")finally:if 'client' in locals() and client:client.close()  # 确保连接被关闭# 当窗口关闭时,确保没有遗留的引用或连接self.pop_win.protocol("WM_DELETE_WINDOW", self.on_closing)

(2)CPU、内存、网络等信息展示的实现

创建create_widgets方法制作界面上的标签、进度条和按钮。用于展示CPU、内存、网络等信息,并提供查看磁盘使用情况的按钮。使用update_info方法用于在界面上更新系统信息,包括CPU利用率、内存使用情况和网络速率。reset_system_monitoring方法重置系统监控状态,将CPU信息、内存信息和网络信息重置为初始状态。show_disk_usage展示磁盘使用情况,并创建一个新窗口展示磁盘信息。

代码展示如下:

def create_widgets(self, frame):self.cpu_label = ttk.Label(frame, text="CPU信息:")self.cpu_label.grid()self.cpu_progress = ttk.Progressbar(frame, orient='horizontal', length=200, mode='determinate')self.cpu_progress.grid(pady=(0, 10))self.mem_label = ttk.Label(frame, text="Memory Usage: 0 MB / 0 MB")self.mem_label.grid()self.mem_progress = ttk.Progressbar(frame, orient='horizontal', length=200, mode='determinate')self.mem_progress.grid(pady=(0, 5))self.net_label = ttk.Label(frame, text="网络信息:")self.net_label.grid(pady=5)self.disk_button = tk.Button(frame, text="查看磁盘使用情况", command=self.show_disk_usage)self.disk_button.grid(pady=(5, 5))def update_info(self, frame):self.create_widgets(frame)  # 创建标签# 定义进度条样式style = ttk.Style()style.theme_use("default")style.configure("green.Horizontal.TProgressbar", foreground='green', background='green')style.configure("yellow.Horizontal.TProgressbar", foreground='yellow', background='yellow')style.configure("red.Horizontal.TProgressbar", foreground='red', background='red')prev_connected = self.mssh.connected  # 记录上一次的连接状态while True:if "ip" in self.mssh.server:if self.mssh.connected == None:result_connect = (f"服务器{self.mssh.server['ip']}连接中", "red")self.reset_system_monitoring()  # 重置系统监控状态elif self.mssh.connected:result_connect = (f"服务器{self.mssh.server['ip']}连接成功", "green")# -----------------------------系统信息------------------------------------------------server = self.mssh.serverip = server['ip']username = server['username']password = server['password']self.client = paramiko.SSHClient()self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.client.connect(ip, 22, username, password)# 执行远程命令获取内存信息stdin, stdout, stderr = self.client.exec_command("free -m")result = stdout.read().decode('utf-8')# 获取系统内存信息lines = result.split('\n')mem_info = lines[1].split()total_mem = int(mem_info[1])used_mem = int(mem_info[2])mem_usage = used_mem / total_mem * 100# 获取网络速率stdin1, stdout1, stderr1 = self.client.exec_command("ip a")result = stdout1.read().decode("utf-8")# 使用正则表达式提取数据包和字节数信息matches = re.findall(r'ens160.*?RX packets (\d+)  bytes (\d+).*?TX packets (\d+)  bytes (\d+)',result, re.DOTALL)if matches:rx_packets, rx_bytes, tx_packets, tx_bytes = matches[0]else:rx_packets, rx_bytes, tx_packets, tx_bytes = 0, 0, 0, 0time.sleep(1)stdin2, stdout2, stderr3 = self.client.exec_command("ip a")result = stdout2.read().decode("utf-8")matches = re.findall(r'ens160.*?RX packets (\d+)  bytes (\d+).*?TX packets (\d+)  bytes (\d+)',result, re.DOTALL)if matches:rx_packets, new_rx_bytes, tx_packets, new_tx_bytes = matches[0]else:rx_packets, new_rx_bytes, tx_packets, new_tx_bytes = 0, 0, 0, 0# 计算速率upload_speed = ((int(new_tx_bytes) - int(tx_bytes)) / 1024)  # MB/s,download_speed = ((int(new_rx_bytes) - int(rx_bytes)) / 1024)# 获取cpu利用率stdin4, stdout4, stderr4 = self.client.exec_command("top -bn 1")result = stdout4.read().decode("utf-8")cpu = 0for line in result.split("\n"):if line.startswith("%Cpu(s)"):cpu_us = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[0])[0]cpu_sy = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[1])[0]cpu_ni = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[2])[0]cpu_id = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[3])[0]cpu_wa = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[4])[0]cpu_hi = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[5])[0]cpu_si = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[6])[0]cpu_st = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[7])[0]sum = float(cpu_st) + float(cpu_si) + float(cpu_hi) + float(cpu_wa) + float(cpu_id) + float(cpu_ni) + float(cpu_sy) + float(cpu_us)cpu = sum - float(cpu_id)# 设置进度条样式if cpu > 80:cpu_style = "red.Horizontal.TProgressbar"elif cpu > 50:cpu_style = "yellow.Horizontal.TProgressbar"else:cpu_style = "green.Horizontal.TProgressbar"if mem_usage > 80:mem_style = "red.Horizontal.TProgressbar"elif mem_usage > 50:mem_style = "yellow.Horizontal.TProgressbar"else:mem_style = "green.Horizontal.TProgressbar"# 更新标签self.cpu_label.config(text=f"CPU信息:{cpu:.2f}%")self.cpu_progress.config(style=cpu_style, value=cpu)self.mem_label.config(text=f"Memory Usage: {used_mem} MB / {total_mem} MB ({mem_usage:.2f}%)")self.mem_progress.config(style=mem_style, value=mem_usage)self.net_label.config(text=f"网络信息:↑{upload_speed:.2f}MB/s \t ↓{download_speed:.2f}MB/s")else:result_connect = (f"服务器{self.mssh.server['ip']}连接失败", "red")self.reset_system_monitoring()  # 重置系统监控状态else:result_connect = (f"服务器未选择", "black")self.reset_system_monitoring()  # 重置系统监控状态# 检查连接状态是否发生变化if self.mssh.connected != prev_connected:self.reset_system_monitoring()  # 重置系统监控状态prev_connected = self.mssh.connectedself.label_server.config(text=f"{result_connect[0]}", foreground=result_connect[1])time.sleep(1)def reset_system_monitoring(self):# 重置系统监控状态self.cpu_label.config(text="CPU信息:")self.cpu_progress.config(style="", value=0)self.mem_label.config(text="Memory Usage: 0 MB / 0 MB")self.mem_progress.config(style="", value=0)self.net_label.config(text="网络信息:")def show_disk_usage(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returntry:self.mssh.connecting()stdin, stdout, stderr = self.mssh.ssh.exec_command("df -h")disk_info = stdout.read().decode("utf-8").strip().split("\n")# 创建新窗口disk_window = tk.Toplevel(self.root)disk_window.title("磁盘使用情况")# 创建表格table_frame = ttk.Frame(disk_window)table_frame.pack(padx=10, pady=10, fill="both", expand=True)# 创建表格标题table = ttk.Treeview(table_frame, columns=("filesystem", "size", "used", "avail", "use%", "mounted_on"),show="headings")table.heading("filesystem", text="文件系统")table.heading("size", text="总大小")table.heading("used", text="已用空间")table.heading("avail", text="可用空间")table.heading("use%", text="已用百分比")table.heading("mounted_on", text="挂载点")table.pack(side="left", fill="both", expand=True)# 添加滚动条scrollbar = ttk.Scrollbar(table_frame, orient="vertical", command=table.yview)scrollbar.pack(side="right", fill="y")table.configure(yscrollcommand=scrollbar.set)# 填充表格数据for line in disk_info[1:]:parts = line.split()if len(parts) >= 6:filesystem, size, used, avail, use_percent, mounted_on = partstable.insert("", "end", values=(filesystem, size, used, avail, use_percent, mounted_on))# 调整列宽度table.column("#0", width=0, stretch="no")  # 隐藏第一列table.column("filesystem", width=100, anchor="w")table.column("size", width=80, anchor="e")table.column("used", width=80, anchor="e")table.column("avail", width=80, anchor="e")table.column("use%", width=80, anchor="e")table.column("mounted_on", width=200, anchor="w")# 自动调整行高def fixed_map(option):return [elm for elm in style.map("Treeview", query_opt=option) if elm[:2] != ("!disabled", "!selected")]style = ttk.Style()style.map("Treeview", foreground=fixed_map("foreground"), background=fixed_map("background"))except Exception as e:messagebox.showerror("错误", f"无法获取磁盘使用情况: {e}")

2.10 批量下载日志文件和查看日志功能设计与实现

(1)批量下载日志文件的实现

首先,检查是否已连接到远程服务器,然后设置远程目录为 /var/log。接下来,它通过SSH连接到服务器并打开SFTP客户端。然后,它检查远程目录是否存在,并获取远程目录中的文件列表。接着,它要求用户选择下载到的本地目录,并将选择的本地目录设置为类属性 local_log_dir。然后,它遍历远程目录中的文件,筛选以 .log 结尾的文件,并将这些文件下载到本地目录。最后,它关闭SFTP连接,并显示成功下载的文件数量。 代码展示如下:

 

def download_logs(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")return# 将远程目录设置为 /var/logremote_dir = "/var/log"try:self.mssh.connecting()sftp = self.mssh.ssh.open_sftp()# 检查远程目录是否存在try:sftp.stat(remote_dir)except IOError as e:messagebox.showerror("错误", f"远程目录 {remote_dir} 不存在")returnremote_files = sftp.listdir(remote_dir)local_dir = filedialog.askdirectory(title="选择下载到的本地目录")if local_dir:self.local_log_dir = local_direlse:returndownloaded_count = 0for filename in remote_files:# 检查文件是否以 ".log" 结尾if filename.endswith(".log"):remote_path = f"{remote_dir}/{filename}"local_path = f"{local_dir}/{filename}"sftp.get(remote_path, local_path)downloaded_count += 1print(f"下载成功: {remote_path} -> {local_path}")sftp.close()print(f"共下载了 {downloaded_count} 个文件")if downloaded_count > 0:messagebox.showinfo("成功", f"成功下载了 {downloaded_count} 个日志文件")except Exception as e:print(f"下载失败: {e}")messagebox.showerror("错误", f"下载日志文件失败: {e}")(2)查看日志功能的实现 先检查是否已连接到远程服务器并下载了日志文件。接着,它尝试从 local_log_dir 属性获取之前选择的本地目录,如果该属性不存在,则要求用户选择下载日志文件所在的目录。然后,它要求用户选择要查看的日志文件,并使用VSCode打开选定的日志文件。代码展示如下:def view_log(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接服务器并下载日志文件!")return# 从 download_logs 函数中获取之前选择的本地目录try:local_dir = self.local_log_direxcept AttributeError:local_dir = filedialog.askdirectory(title="选择下载日志文件所在的目录")if not local_dir:returnlog_file = filedialog.askopenfilename(initialdir=local_dir, title="选择要查看的日志文件",filetypes=(("Log Files", "*.log"),))if log_file:try:# 使用 VSCode 打开日志文件subprocess.run(['Code', log_file], shell=True)except Exception as e:messagebox.showerror("错误", f"无法打开文件: {e}")

 

 

2.11界面窗口的实现

创建一个具有菜单、标签、输入框和文本框的用户界面(UI),以及一个后台线程来更新系统信息。同时,通过不同的菜单项和按钮,用户可以执行连接、上传文件、下载文件、查看进程、扩展功能等操作。 代码展示如下:

def __init__(self, ssh):print("初始化")self.is_connected = Falseself.mssh = ssh# 主应用程序窗口self.root = tk.Tk()frame = ttk.Frame(self.root)self.root.geometry("800x550")# 设置窗口标题(可选)self.root.title("ssh 暗影坤手")# 使用 grid 布局self.root.grid_rowconfigure(0, weight=1)self.root.grid_columnconfigure(0, weight=1)# 布局,网格布局# 创建一个框架容器frame = ttk.Frame(self.root)frame.grid(sticky="nsew")# 添加菜单项   连接:新建连接  打开连接 操作:-- 帮助:--menu_top = tk.Menu(self.root)# 连接菜单menu_link = tk.Menu(menu_top)menu_top.add_cascade(label="连接", menu=menu_link)menu_link.add_command(label="新建连接", command=self.new_connect_ui)menu_link.add_command(label="打开连接", command=self.open_connect_ui)# 操作菜单menu_func = tk.Menu(menu_top)menu_top.add_cascade(label="操作", menu=menu_func)menu_func.add_command(label="上传文件", command=self.upload_file)menu_func.add_command(label="下载文件", command=self.downloader)menu_func.add_command(label="vim编辑文件", command=self.vim_edit)# 帮助菜单menu_help = tk.Menu(menu_top)menu_top.add_cascade(label="帮助", menu=menu_help)menu_help.add_command(label="查看进程", command=self.get_process)# menu_help.add_command(label="查看磁盘使用情况", command=self.view_disk_usage)# 扩展菜单menu_ext = tk.Menu(menu_top)menu_top.add_cascade(label="扩展", menu=menu_ext)menu_ext.add_command(label="批量下载日志文件", command=self.download_logs)menu_ext.add_command(label="查看日志文件", command=self.view_log)# 展示信息的labelself.label_server = ttk.Label(frame, text="服务器信息:")self.label_server.grid(row=0, column=0, sticky="w", padx=10, pady=5)# 输入指令框self.command = tk.StringVar()entry_command = ttk.Entry(frame, width=60, textvariable=self.command)entry_command.grid(row=1, column=0, sticky="ew", padx=10, pady=5)# 执行指令的按钮entry_command.bind("<Return>", self.ok)  # 当按下Enter键时执行命令# 展示结果self.txt_result = tk.Text(frame)self.txt_result.grid(row=2, column=0, sticky="nsew", padx=10, pady=5)self.root.config(menu=menu_top)# 文本框背景/前景颜色self.txt_result.config(bg="#262626", fg="#D9D9D9")self.root.config(menu=menu_top)# 导入 ttk 后style = ttk.Style()# 设置 ttk 主题style.theme_use("clam")# 定制按钮样式style.configure("TButton", foreground="white", background="#404040")style.map("TButton",foreground=[("active", " #CCCCCC"), ("disabled", "#666666")],background=[("active", "#404040"), ("disabled", "#262626")])# 让文本框自动扩展以填充剩余空间frame.grid_rowconfigure(2, weight=1)frame.grid_columnconfigure(0, weight=1)frame.grid_rowconfigure(3, weight=0)  # 信息监控面板不需要扩展# thread_info=threading.Thread(target=self.update_info)thread_info = threading.Thread(target=self.update_info, args=(frame,))thread_info.start()# 将主应用程序挂起self.root.protocol("WM_DELETE_WINDOW", self.on_closing_main)self.root.mainloop()

2.12 打包成可执行文件的实现(按需求来)

 PyInstaller将Python程序打包成独立的可执行文件,包括所有必要的依赖项。

在终端执行以下命令:

pip install pyinstaller

Pyinstaller -F -w -i 图标(.ico) -p D:\python\Lib\site-packages --onefile --add-data "server.json;./"  demotest.py

 

3. 系统完成效果

3.1 服务器信息结果展示

65f7cc8fe92e47c6b7656adc55486da2.png

 

3.2 连接服务器结果展示

6a830cf753d14aa8b1029c60dfcba59f.png

 

3.3 添加服务器信息结果展示

138186a1a9e044b2a4a3b6a39ad27ecc.png

07eb7678dae44b47b9432fb37081483d.png

 

3.4 删除服务器信息结果展示

c80e7774b33147118d67425770ec6096.png

7319d0d0aba24b63a08cf8e0c2692421.png

9ef7cfd698e84b8c8a67c78f9efe1f7b.png

 

3.5 修改服务器信息结果展示

26ddec0079804eee8099c6877082458d.png

019fcc0c4e6f4edd9c831a3f0788390b.png

 

3.6 执行交互式交互命令结果展示

59886604a93e43fc90a110d384432b89.png

 

3.7 vim编辑器结果展示

81bb7a35fa814f84bdc778a45fb273ac.png

defbcc46849b467780cb0070cd1ea5bb.png

02239e7b848a4fbd8be189e0f9884140.png

 

3.8 文件上传下载结果展示

(1)上传文件

841945507adc45c8b6979625cecc5e8b.png

e7c2f5d59a33480695d71dfd2ecd8d07.png

(2)下载文件

c01113059e0942dfb80447f9e4c8d1cf.png

7029850b283c4ec2be1ace73ad201c28.png

dfcbbd47486c4733ac966041e034ab61.png

 

3.9 服务器监控功能结果展示

(1)cpu、内存、网络信息展示

13d1ce9928dc44c9b9f37365e5ebd18a.png

(2)磁盘使用情况展示

20b730df478d46a58139ec91a3a5c453.png

(3)进程信息

5d4916b09f95449383a6e80bcd848dff.png

3.10 批量下载日志文件和查看日志文件结果展示

(1)批量下载文件展示

3a0f8ebb24e14fb7b3705c803a18739b.png

14bf906494ec4ec99caf3614e87623fe.png

(2)查看日志信息

80d692b7cea34daa91e114893a158fbb.png

 

3.11窗口界面结果展示

0cdb02bbdabe43369d038660361c632a.png

4. 最终代码

import json
import os.path
import re
import socket
import subprocess
import threading
import time
from tkinter import simpledialogimport paramikofrom watchdog.events import FileSystemEventHandler
from watchdog.observers import Observerclass MySsh:def __init__(self):# 实例化 ,ssh服务器的会话高级表现形式,把通道/传输类/sftp进行了封装self.ssh = paramiko.SSHClient()self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.path = "~"# 所有服务器的列表self.servers = []# 当前正在使用的服务器信息self.server = {}self.load_server()self.connected = []def load_server(self):with open('server.json', "r", encoding='utf-8') as fp:self.servers = json.load(fp)print(f"获取到的服务器信息:{self.servers}")def save_server(self):with open('server.json', "w", encoding='utf-8') as fp:fp.write(json.dumps(self.servers))def add_server(self, ip, port, username, password, description):self.servers.append({"description": description,"ip": ip,"port": port,"username": username,"password": password})self.save_server()# def find_server(self,ip):def connecting(self):self.connected = None# 将信任的主机自动添加到know_hosts文件(~/.ssh/)中self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)try:# 连接到服务器self.ssh.connect(hostname=self.server["ip"], port=self.server["port"], username=self.server["username"],password=self.server["password"])# messagebox.showinfo("成功", f"连接到服务器【{self.server['ip']}】成功!")print(f"连接到服务器【{self.server['ip']}】成功!")self.connected = Trueself.get_pwd()return Trueexcept Exception as e:messagebox.showerror("错误", f"连接到服务器【{self.server['ip']}】失败!")self.connected = Falsereturn Falsedef exec(self, command):if command.startswith("cd"):tmp_path = command.split(" ")[-1]if tmp_path.startswith("/"):# 绝对路径处理self.path = tmp_pathelse:# todo 如果用户输入的是一个相对路径呢?print("用户输入的是相对路径")if tmp_path.startswith(".."):self.path = self.path + "/" + tmp_pathelse:self.path = self.path + tmp_pathelse:# 根据当前的路径去执行指令command = f"cd {self.path} && {command}"print(f"->输入指令({self.path}):{command}")stdin, stdout, stderr = self.ssh.exec_command(command)result = stdout.read().decode("utf-8")print(f"<-返回:{result}")# 用户输入的路径里包含.. ,通过pwd刷新当前路径if command.find("..") > 0:self.get_pwd()return resultdef _read_channel(self, channel, output_callback):"""在独立线程中读取通道输出,并调用回调函数处理"""while True:if channel.recv_ready():data = channel.recv(1024)output_callback(data)def download_file(self):remote_file = tk.simpledialog.askstring("输入文件名", "请输入要下载的文件名:")if remote_file is None:  # 用户点击“取消”或关闭对话框returnelif not remote_file:  # 用户输入了空字符串messagebox.showerror("错误", "请输入文件名!")returnlocal_path = filedialog.askdirectory(title="选择下载到的本地目录")if local_path:try:sftp = self.ssh.open_sftp()local_file_path = os.path.join(local_path, remote_file)sftp.get(remote_file, local_file_path)sftp.close()messagebox.showinfo("下载成功", f"文件下载成功: {remote_file} -> {local_path}")except Exception as e:messagebox.showerror("下载失败", f"文件下载失败: {e}")def watch_and_upload(self, local_file_path, remote_full_path):class FileModifiedHandler(FileSystemEventHandler):def __init__(self, ssh_handler, remote_full_path):self.ssh_handler = ssh_handlerself.remote_full_path = remote_full_pathdef on_modified(self, event):if event.is_directory or not event.src_path == local_file_path:return# messagebox.showinfo("文件被修改", f"文件被修改:{event.src_path}")try:# self.ssh_handler.connecting()  # 重新连接SSH(如果需要)sftp = self.ssh_handler.ssh.open_sftp()sftp.put(local_file_path, self.remote_full_path)sftp.close()messagebox.showinfo("文件上传成功", f"vim修改成功: {local_file_path} -> {self.remote_full_path}")except Exception as e:messagebox.showerror("文件上传失败", f"文件上传失败: {e}")event_handler = FileModifiedHandler(self, remote_full_path)observer = Observer()observer.schedule(event_handler, path=os.path.dirname(local_file_path), recursive=False)observer.start()try:# 等待用户操作或其他信号来停止监视# 这里使用time.sleep作为示例,但建议使用更好的机制,如事件或信号while True:time.sleep(1)except KeyboardInterrupt:observer.stop()observer.join()def vim_edit(self):remote_file = simpledialog.askstring("vim", "请输入要编辑的完整远程文件路径:")if remote_file is None:  # 用户点击“取消”或关闭对话框returnelif not remote_file:  # 用户输入了空字符串messagebox.showerror("错误", "请输入文件路径!")return# 检查用户是否输入了以'/'开头的绝对路径if not remote_file.startswith('/'):messagebox.showerror("错误", "请输入绝对路径,例如:/home/user/file.txt")return# 临时文件目录tmp = filedialog.askdirectory(title="选择下载到的本地目录")if not tmp:  # 用户取消了选择目录returnlocal_file_path = os.path.join(tmp, os.path.basename(remote_file))try:self.connecting()sftp = self.ssh.open_sftp()# 检查远程文件是否存在try:sftp.stat(remote_file)except FileNotFoundError:# 如果不存在,则在远程服务器上创建空文件with sftp.open(remote_file, 'wb') as f:pass  # 写入空内容或者模板内容# 下载文件到本地sftp.get(remote_file, local_file_path)sftp.close()# 启动文件监视线程watch_thread = threading.Thread(target=self.watch_and_upload, args=(local_file_path, remote_file))watch_thread.start()# 打开文件编辑器subprocess.run(['Code', local_file_path], shell=True)except Exception as e:messagebox.showerror("文件下载失败", f"文件下载失败:{e}")print(f"文件下载失败:{e}")# 使用pwd更新当前路径def get_pwd(self):stdin, stdout, stderr = self.ssh.exec_command(f"cd {self.path} && pwd")result = stdout.read().decode("utf-8")self.path = result.strip()import tkinter as tk
from tkinter import ttk, filedialog
from tkinter import messageboxclass MyUI:def __init__(self, ssh):print("初始化")self.is_connected = Falseself.mssh = sshself.mssh_channel = None# 主应用程序窗口self.root = tk.Tk()frame = ttk.Frame(self.root)self.root.geometry("800x550")# 设置窗口标题(可选)self.root.title("暗影坤手")# 使用 grid 布局self.root.grid_rowconfigure(0, weight=1)self.root.grid_columnconfigure(0, weight=1)# 布局,网格布局# 创建一个框架容器frame = ttk.Frame(self.root)frame.grid(sticky="nsew")# 添加菜单项   连接:新建连接  打开连接 操作:-- 帮助:--menu_top = tk.Menu(self.root)# 连接菜单menu_link = tk.Menu(menu_top)menu_top.add_cascade(label="连接", menu=menu_link)menu_link.add_command(label="新建连接", command=self.new_connect_ui)menu_link.add_command(label="打开连接", command=self.open_connect_ui)# 操作菜单menu_func = tk.Menu(menu_top)menu_top.add_cascade(label="操作", menu=menu_func)menu_func.add_command(label="上传文件", command=self.upload_file)menu_func.add_command(label="下载文件", command=self.downloader)menu_func.add_command(label="vim编辑文件", command=self.vim_edit)# 帮助菜单menu_help = tk.Menu(menu_top)menu_top.add_cascade(label="帮助", menu=menu_help)menu_help.add_command(label="查看进程", command=self.get_process)# menu_help.add_command(label="查看磁盘使用情况", command=self.view_disk_usage)# 扩展菜单menu_ext = tk.Menu(menu_top)menu_top.add_cascade(label="扩展", menu=menu_ext)menu_ext.add_command(label="批量下载日志文件", command=self.download_logs)menu_ext.add_command(label="查看日志文件", command=self.view_log)# 展示信息的labelself.label_server = ttk.Label(frame, text="服务器信息:")self.label_server.grid(row=0, column=0, sticky="w", padx=10, pady=5)# 输入指令框self.command = tk.StringVar()entry_command = ttk.Entry(frame, width=60, textvariable=self.command)entry_command.grid(row=1, column=0, sticky="ew", padx=10, pady=5)# 执行指令的按钮entry_command.bind("<Return>", self.ok)  # 当按下Enter键时执行命令entry_command.bind("<Control-c>", self.okk)# 展示结果self.txt_result = tk.Text(frame)self.txt_result.grid(row=2, column=0, sticky="nsew", padx=10, pady=5)self.root.config(menu=menu_top)# 文本框背景/前景颜色self.txt_result.config(bg="#262626", fg="#D9D9D9")self.root.config(menu=menu_top)# 导入 ttk 后style = ttk.Style()# 设置 ttk 主题style.theme_use("clam")# 定制按钮样式style.configure("TButton", foreground="white", background="#404040")style.map("TButton",foreground=[("active", " #CCCCCC"), ("disabled", "#666666")],background=[("active", "#404040"), ("disabled", "#262626")])# 让文本框自动扩展以填充剩余空间frame.grid_rowconfigure(2, weight=1)frame.grid_columnconfigure(0, weight=1)frame.grid_rowconfigure(3, weight=0)  # 信息监控面板不需要扩展# thread_info=threading.Thread(target=self.update_info)thread_info = threading.Thread(target=self.update_info, args=(frame,))thread_info.start()# 将主应用程序挂起self.root.protocol("WM_DELETE_WINDOW", self.on_closing_main)self.root.mainloop()def okk(self, event):self.command.set('q')self.ok(self)def handle_output(self, data):# 先解码数据decoded_data = data.decode("utf-8")# 去除颜色转义序列cleaned_data = re.sub(r'\x1B\[([0-?]*[ -/]*[@-~])', '', decoded_data)# 插入到文本框的末尾self.txt_result.insert(tk.END, cleaned_data)# 确保滚动条跟随到最后self.txt_result.see(tk.END)# ------------------------------------------帮助---------------------------------------------------def get_process(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnself.pop_win = tk.Toplevel(self.root)self.pop_win.title("进程")self.pop_win.geometry("800x600+600+250")table_frame = ttk.Frame(self.pop_win)table_frame.pack(padx=10, pady=10, fill="both", expand=True)server = self.mssh.serverip = server['ip']username = server['username']password = server['password']try:client = paramiko.SSHClient()client.set_missing_host_key_policy(paramiko.AutoAddPolicy())client.connect(ip, 22, username, password)stdin, stdout, stderr = client.exec_command("ps -ef")result = stdout.read().decode("utf-8").strip().split("\n")# 创建表格table = ttk.Treeview(table_frame, columns=("user", "pid", "cpu", "mem", "vsz", "rss", "tty", "stat", "start", "time", "command"), show="headings")table.heading("user", text="用户")table.heading("pid", text="进程ID")table.heading("cpu", text="CPU%")table.heading("mem", text="内存%")table.heading("vsz", text="虚拟内存")table.heading("rss", text="常驻内存")table.heading("tty", text="终端")table.heading("stat", text="状态")table.heading("start", text="启动时间")table.heading("time", text="CPU时间")table.heading("command", text="命令")table.pack(side="left", fill="both", expand=True)# 添加滚动条scrollbar = ttk.Scrollbar(table_frame, orient="vertical", command=table.yview)scrollbar.pack(side="right", fill="y")table.configure(yscrollcommand=scrollbar.set)# 填充表格数据for line in result[1:]:parts = line.split()if len(parts) >= 10:user, pid, cpu, mem, vsz, rss, tty, stat, start, time, *command = partscommand = " ".join(command)table.insert("", "end", values=(user, pid, cpu, mem, vsz, rss, tty, stat, start, time, command))# 调整列宽度table.column("#0", width=0, stretch="no")  # 隐藏第一列table.column("user", width=80, anchor="w")table.column("pid", width=60, anchor="e")table.column("cpu", width=60, anchor="e")table.column("mem", width=60, anchor="e")table.column("vsz", width=100, anchor="e")table.column("rss", width=100, anchor="e")table.column("tty", width=60, anchor="w")table.column("stat", width=60, anchor="w")table.column("start", width=120, anchor="w")table.column("time", width=80, anchor="e")table.column("command", width=300, anchor="w")# 自动调整行高def fixed_map(option):return [elm for elm in style.map("Treeview", query_opt=option) if elm[:2] != ("!disabled", "!selected")]style = ttk.Style()style.map("Treeview", foreground=fixed_map("foreground"), background=fixed_map("background"))except paramiko.AuthenticationException:messagebox.showerror("错误", "认证失败,请验证您的凭据")except paramiko.SSHException as sshException:messagebox.showerror("错误", f"无法建立SSH连接: {sshException}")except paramiko.BadHostKeyException as badHostKeyException:messagebox.showerror("错误", f"无法验证服务器的主机密钥: {badHostKeyException}")except Exception as e:messagebox.showerror("错误", f"发生错误: {e}")finally:if 'client' in locals() and client:client.close()  # 确保连接被关闭# 当窗口关闭时,确保没有遗留的引用或连接self.pop_win.protocol("WM_DELETE_WINDOW", self.on_closing)def create_widgets(self, frame):self.cpu_label = ttk.Label(frame, text="CPU信息:")self.cpu_label.grid()self.cpu_progress = ttk.Progressbar(frame, orient='horizontal', length=200, mode='determinate')self.cpu_progress.grid(pady=(0, 10))self.mem_label = ttk.Label(frame, text="Memory Usage: 0 MB / 0 MB")self.mem_label.grid()self.mem_progress = ttk.Progressbar(frame, orient='horizontal', length=200, mode='determinate')self.mem_progress.grid(pady=(0, 5))self.net_label = ttk.Label(frame, text="网络信息:")self.net_label.grid(pady=5)self.disk_button = tk.Button(frame, text="查看磁盘使用情况", command=self.show_disk_usage)self.disk_button.grid(pady=(5, 5))def update_info(self, frame):self.create_widgets(frame)  # 创建标签# 定义进度条样式style = ttk.Style()style.theme_use("default")style.configure("green.Horizontal.TProgressbar", foreground='green', background='green')style.configure("yellow.Horizontal.TProgressbar", foreground='yellow', background='yellow')style.configure("red.Horizontal.TProgressbar", foreground='red', background='red')prev_connected = self.mssh.connected  # 记录上一次的连接状态while True:if "ip" in self.mssh.server:if self.mssh.connected == None:result_connect = (f"服务器{self.mssh.server['ip']}连接中", "red")self.reset_system_monitoring()  # 重置系统监控状态elif self.mssh.connected:result_connect = (f"服务器{self.mssh.server['ip']}连接成功", "green")# -----------------------------系统信息------------------------------------------------server = self.mssh.serverip = server['ip']username = server['username']password = server['password']self.client = paramiko.SSHClient()self.client.set_missing_host_key_policy(paramiko.AutoAddPolicy())self.client.connect(ip, 22, username, password)# 执行远程命令获取内存信息#是Paramiko库中SSHClient对象的一个方法,可用于在远程服务器上执行命令。# 它接受一个字符串参数作为要执行的命令,并返回输入、输出和错误流作为结果。stdin, stdout, stderr = self.client.exec_command("free -m")result = stdout.read().decode('utf-8')# 获取系统内存信息lines = result.split('\n')mem_info = lines[1].split()total_mem = int(mem_info[1])used_mem = int(mem_info[2])mem_usage = used_mem / total_mem * 100# 获取网络速率stdin1, stdout1, stderr1 = self.client.exec_command("ifconfig")result = stdout1.read().decode("utf-8")# 使用正则表达式提取数据包和字节数信息matches = re.findall(r'ens160.*?RX packets (\d+)  bytes (\d+).*?TX packets (\d+)  bytes (\d+)',result, re.DOTALL)if matches:rx_packets, rx_bytes, tx_packets, tx_bytes = matches[0]else:rx_packets, rx_bytes, tx_packets, tx_bytes = 0, 0, 0, 0time.sleep(1)stdin2, stdout2, stderr3 = self.client.exec_command("ifconfig")result = stdout2.read().decode("utf-8")matches = re.findall(r'ens160.*?RX packets (\d+)  bytes (\d+).*?TX packets (\d+)  bytes (\d+)',result, re.DOTALL)if matches:rx_packets, new_rx_bytes, tx_packets, new_tx_bytes = matches[0]else:rx_packets, new_rx_bytes, tx_packets, new_tx_bytes = 0, 0, 0, 0# 计算速率upload_speed = ((int(new_tx_bytes) - int(tx_bytes)) / 1024)  # MB/s,download_speed = ((int(new_rx_bytes) - int(rx_bytes)) / 1024)# 获取cpu利用率stdin4, stdout4, stderr4 = self.client.exec_command("top -bn 1")result = stdout4.read().decode("utf-8")cpu = 0for line in result.split("\n"):if line.startswith("%Cpu(s)"):cpu_us = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[0])[0]cpu_sy = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[1])[0]cpu_ni = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[2])[0]cpu_id = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[3])[0]cpu_wa = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[4])[0]cpu_hi = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[5])[0]cpu_si = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[6])[0]cpu_st = re.findall(r'\d+\.\d+', line.split(":")[1].split(",")[7])[0]sum = float(cpu_st) + float(cpu_si) + float(cpu_hi) + float(cpu_wa) + float(cpu_id) + float(cpu_ni) + float(cpu_sy) + float(cpu_us)cpu = sum - float(cpu_id)# 设置进度条样式if cpu > 80:cpu_style = "red.Horizontal.TProgressbar"elif cpu > 50:cpu_style = "yellow.Horizontal.TProgressbar"else:cpu_style = "green.Horizontal.TProgressbar"if mem_usage > 80:mem_style = "red.Horizontal.TProgressbar"elif mem_usage > 50:mem_style = "yellow.Horizontal.TProgressbar"else:mem_style = "green.Horizontal.TProgressbar"# 更新标签self.cpu_label.config(text=f"CPU信息:{cpu:.2f}%")self.cpu_progress.config(style=cpu_style, value=cpu)self.mem_label.config(text=f"Memory Usage: {used_mem} MB / {total_mem} MB ({mem_usage:.2f}%)")self.mem_progress.config(style=mem_style, value=mem_usage)self.net_label.config(text=f"网络信息:↑{upload_speed:.2f}MB/s \t ↓{download_speed:.2f}MB/s")else:result_connect = (f"服务器{self.mssh.server['ip']}连接失败", "red")self.reset_system_monitoring()  # 重置系统监控状态else:result_connect = (f"服务器未选择", "black")self.reset_system_monitoring()  # 重置系统监控状态# 检查连接状态是否发生变化if self.mssh.connected != prev_connected:self.reset_system_monitoring()  # 重置系统监控状态prev_connected = self.mssh.connectedself.label_server.config(text=f"{result_connect[0]}", foreground=result_connect[1])time.sleep(1)def reset_system_monitoring(self):# 重置系统监控状态self.cpu_label.config(text="CPU信息:")self.cpu_progress.config(style="", value=0)self.mem_label.config(text="Memory Usage: 0 MB / 0 MB")self.mem_progress.config(style="", value=0)self.net_label.config(text="网络信息:")def show_disk_usage(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returntry:self.mssh.connecting()stdin, stdout, stderr = self.mssh.ssh.exec_command("df -h")disk_info = stdout.read().decode("utf-8").strip().split("\n")# 创建新窗口disk_window = tk.Toplevel(self.root)disk_window.title("磁盘使用情况")# 创建表格table_frame = ttk.Frame(disk_window)table_frame.pack(padx=10, pady=10, fill="both", expand=True)# 创建表格标题table = ttk.Treeview(table_frame, columns=("filesystem", "size", "used", "avail", "use%", "mounted_on"),show="headings")table.heading("filesystem", text="文件系统")table.heading("size", text="总大小")table.heading("used", text="已用空间")table.heading("avail", text="可用空间")table.heading("use%", text="已用百分比")table.heading("mounted_on", text="挂载点")table.pack(side="left", fill="both", expand=True)# 添加滚动条scrollbar = ttk.Scrollbar(table_frame, orient="vertical", command=table.yview)scrollbar.pack(side="right", fill="y")table.configure(yscrollcommand=scrollbar.set)# 填充表格数据for line in disk_info[1:]:parts = line.split()if len(parts) >= 6:filesystem, size, used, avail, use_percent, mounted_on = partstable.insert("", "end", values=(filesystem, size, used, avail, use_percent, mounted_on))# 调整列宽度table.column("#0", width=0, stretch="no")  # 隐藏第一列table.column("filesystem", width=100, anchor="w")table.column("size", width=80, anchor="e")table.column("used", width=80, anchor="e")table.column("avail", width=80, anchor="e")table.column("use%", width=80, anchor="e")table.column("mounted_on", width=200, anchor="w")# 自动调整行高def fixed_map(option):return [elm for elm in style.map("Treeview", query_opt=option) if elm[:2] != ("!disabled", "!selected")]style = ttk.Style()style.map("Treeview", foreground=fixed_map("foreground"), background=fixed_map("background"))except Exception as e:messagebox.showerror("错误", f"无法获取磁盘使用情况: {e}")def ok(self, event):command = self.command.get()if not self.mssh.connected:self.txt_result.insert(tk.END, "未连接到服务器!\n")return# 如果ssh_channel尚未创建,创建一个新的if self.mssh_channel is None:self.mssh_channel = self.mssh.ssh.invoke_shell()self.mssh_channel.set_combine_stderr(True)try:# 使用ssh_channel发送命令self.mssh_channel.send(f"{command}\n")# 创建线程监听输出并调用UI提供的回调函数t = threading.Thread(target=self.mssh._read_channel, args=(self.mssh_channel, self.handle_output))t.daemon = Truet.start()# 设置延时清空输入框self.root.after(2000, lambda: self.command.set(""))except Exception as e:self.txt_result.insert(tk.END, f"执行错误: {str(e)}\n")# 清空输入框self.command.set("")# --------------------------------------------连接-----------------------------------------------# 新建连接def new_connect_ui(self):self.pop_win = tk.Toplevel(self.root)# 设置标题和大小self.pop_win.title("新建连接")self.pop_win.geometry("300x200+600+250")frame = tk.Frame(self.pop_win)frame.grid()# IPip_label = tk.Label(frame, text="服务器IP地址:")ip_label.grid(row=0, column=0)self.ip_entry = tk.Entry(frame)self.ip_entry.grid(row=0, column=1)# Portport_label = tk.Label(frame, text="服务器端口号:")port_label.grid(row=1, column=0)self.port_entry = tk.Entry(frame)self.port_entry.grid(row=1, column=1)# Usernameusername_label = tk.Label(frame, text="用户名:")username_label.grid(row=2, column=0)self.username_entry = tk.Entry(frame)self.username_entry.grid(row=2, column=1)# Passwordpassword_label = tk.Label(frame, text="密码:")password_label.grid(row=3, column=0)self.password_entry = tk.Entry(frame, show="*")self.password_entry.grid(row=3, column=1)# Descriptiondescription_label = tk.Label(frame, text="备注信息:")description_label.grid(row=4, column=0)self.description_entry = tk.Entry(frame)self.description_entry.grid(row=4, column=1)# Buttonsubmit_button = tk.Button(frame, text="保存", command=self.save_server_info)submit_button.grid(row=5, columnspan=2)# 保存新建连接的服务器信息def save_server_info(self):if self.ip_entry.get() == "" or self.port_entry.get() == "" or self.username_entry.get() == "" or self.password_entry.get() == "":messagebox.showerror("错误", "请填写完整的服务器信息!")returnelse:ip = self.ip_entry.get()port = self.port_entry.get()username = self.username_entry.get()password = self.password_entry.get()description = self.description_entry.get()# 这里应该调用你的 add_server 函数,将获取到的信息添加到服务器列表中ssh.add_server(ip=ip, port=int(port), username=username, password=password, description=description)messagebox.showinfo('成功', "服务器信息已保存")#  打开连接def open_connect_ui(self):print("打开连接!")self.pop_win = tk.Toplevel(self.root)# 设置标题和大小self.pop_win.title("打开连接")self.pop_win.geometry("+480+230")frame = tk.Frame(self.pop_win)frame.grid()# 初始化索引变量 index 为 0,用于控制部件的位置index = 0# 遍历服务器列表 self.mssh.servers 中的每个服务器信息for item in self.mssh.servers:# 删除列表信息中的password(这里假设item是一个字典)#用于遍历 item.items() 中的键值对,并使用它们创建一个新的字典,# 创建一个新的字典其中排除了原始字典item中键为'password'的键值对。item_without_password = {k: v for k, v in item.items() if k != 'password'}# 显示不包含密码的服务器信息# 创建一个标签,显示当前服务器的信息,并将其放置在连接界面中的指定位置# padx=5 和 pady=5 是用来设置周围的水平和垂直填充# 在部件的左右/上下两侧各增加 5 个像素的水平填充。tk.Label(self.pop_win, text=str(item_without_password)).grid(row=index, column=0, padx=5, pady=5)# 添加连接、删除、修改按钮...# lambda 表达式用于创建一个匿名函数# command=lambda item=item# 将当前循环中的 item 值作为参数传递给 lambda 表达式,这样在按钮被点击时# lambda 表达式将会调用相应的方法,并将当前循环中的 item 值作为参数传递给这个方法# 使用 lambda item=item 的形式可以确保每个按钮的回调函数都引用了当前循环中的正确值tk.Button(self.pop_win, text="连接", command=lambda item=item: self.connect(item)).grid(row=index, column=2)tk.Button(self.pop_win, text='删除', command=lambda item=item: self.del_connect(item)).grid(row=index,column=4)tk.Button(self.pop_win, text='修改', command=lambda item=item: self.modify_server(item)).grid(row=index,column=6)# 更新索引变量 index,以便将下一个服务器的部件放置在下一行index = index + 1# 连接到指定的服务器def connect(self, server):print("连接到指定的服务器")# 将 self.mssh 对象的 server 属性设置为传入的 server 参数,以便记录当前连接的服务器信息。self.mssh.server = serverprint(f"服务器信息:{self.mssh.server}")#如果在连接到远程服务器时遇到超时,系统将等待 5 秒,然后会引发socket.timeout异常。socket.setdefaulttimeout(5)# 在单独的线程中执行连接操# 在单独的线程中执行连接操作# 创建一个新的线程 thread_connect,目标是调用 self.connect_thread 方法,并传入 server 参数。# 然后启动该线程,以便在单独的线程中执行连接操作,以避免阻塞主线程。thread_connect = threading.Thread(target=self.connect_thread, args=(server,))thread_connect.start()# 销毁打开连接窗口,以便在连接操作进行时,不再显示连接界面self.pop_win.destroy()def connect_thread(self, server):# 调用 self.mssh 对象的 connecting 方法,该方法用于执行连接操作,并将连接结果存储在 result 变量中result = self.mssh.connecting()if result:# 将对象的 is_connected 属性设置为 True,表示连接状态为已连接self.is_connected = True# 调用 update_connection_status 方法,将连接状态更新为已连接,并传入当前连接的服务器和连接状态self.update_connection_status(server, True)else:self.is_connected = Falseself.update_connection_status(server, False)def update_connection_status(self, server, connected):if connected:# 如果连接成功,通过 self.label_server 的 config 方法更新标签的文本内容为连接成功的消息# 使用了格式化字符串来显示服务器的 IP 地址,并设置文本颜色为绿色self.label_server.config(text=f"服务器{server['ip']}连接成功", foreground="green")else:self.label_server.config(text=f"服务器{server['ip']}连接失败", foreground="red")def del_connect(self, server):print("删除指定的服务器")# 输出服务器信息# 输出要删除的服务器信息,包括服务器的详细信息。print(f"服务器信息:{server}")# 确认是否删除服务器confirm = messagebox.askyesno("确认删除", f"确认删除服务器 {server['ip']} 吗?")if confirm:# 从服务器列表中删除指定的服务器# # 检查要删除的服务器是否存在于服务器列表中。if server in self.mssh.servers:self.mssh.servers.remove(server)self.pop_win.destroy()self.open_connect_ui()messagebox.showinfo("删除成功", f"服务器 {server['ip']} 已成功删除!")self.mssh.save_server()else:messagebox.showerror("错误", "服务器不存在,无法删除")# 保存更新后的服务器列表到配置文件中def modify_server(self, server):self.pop_win.destroy()print("修改服务器信息")# 将传入的服务器信息进行复制并存储在 self.mssh.server 中。self.mssh.server = server.copy()  # 添加这一行来复制服务器信息self.pop_win_modify = tk.Toplevel(self.root)frame = tk.Frame(self.pop_win_modify)frame.grid()# IPip_label = tk.Label(frame, text="服务器IP地址:")ip_label.grid(row=0, column=0)# 创建了一个文本框(Entry),并将其父级设置为 frame 框架。此文本框用于接收用户输入的 IP 地址。# textvariable 参数是用来关联文本框的值的变量,# 这里使用了 tk.StringVar 类来创建一个与文本框关联的字符串变量,并将该变量的初始值设置为 server['ip'],即当前服务器的IP地址self.ip_entry = tk.Entry(frame, textvariable=tk.StringVar(value=server['ip']))self.ip_entry.grid(row=0, column=1)# Portport_label = tk.Label(frame, text="服务器端口号:")port_label.grid(row=1, column=0)self.port_entry = tk.Entry(frame, textvariable=tk.StringVar(value=str(server['port'])))self.port_entry.grid(row=1, column=1)# Usernameusername_label = tk.Label(frame, text="用户名:")username_label.grid(row=2, column=0)# 创建了一个文本框(Entry),并将其父级设置为 frame 框架。此文本框用于接收用户输入的 username。# textvariable 参数是用来关联文本框的值的变量,# 这里使用了 tk.StringVar 类来创建一个与文本框关联的字符串变量,并将该变量的初始值设置为 server['username'],即当前服务器的用户名self.username_entry = tk.Entry(frame, textvariable=tk.StringVar(value=server['username']))self.username_entry.grid(row=2, column=1)# Passwordpassword_label = tk.Label(frame, text="密码:")password_label.grid(row=3, column=0)self.password_entry = tk.Entry(frame, show="*", textvariable=tk.StringVar(value=server['password']))self.password_entry.grid(row=3, column=1)# Descriptiondescription_label = tk.Label(frame, text="备注信息:")description_label.grid(row=4, column=0)# 创建了一个文本框(Entry),并将其父级设置为 frame 框架。此文本框用于接收用户输入的 备注信息。# textvariable 参数是用来关联文本框的值的变量,# 这里使用了 tk.StringVar 类来创建一个与文本框关联的字符串变量,并将该变量的初始值设置为 server['description'],即当前服务器的备注信息self.description_entry = tk.Entry(frame, textvariable=tk.StringVar(value=server['description']))self.description_entry.grid(row=4, column=1)# Buttonsubmit_button = tk.Button(frame, text="保存", command=self.save_modified_server)submit_button.grid(row=5, columnspan=2)# 保存修改后的服务器信息def save_modified_server(self):if self.ip_entry.get() == "" or self.port_entry.get() == "" or self.username_entry.get() == "" or self.password_entry.get() == "":messagebox.showerror("错误", "请填写完整的服务器信息!")returnelse:modified_server = {"ip": self.ip_entry.get(),"port": int(self.port_entry.get()),"username": self.username_entry.get(),"password": self.password_entry.get(),  # 假设密码不变,保持原值"description": self.description_entry.get(),}# 找到了待修改服务器在服务器列表中的索引# 使用了 index 方法来查找 self.mssh.server 在 self.mssh.servers 中的位置。index = self.mssh.servers.index(self.mssh.server)# 将修改后的服务器信息替换掉原来的服务器信息,以完成对服务器信息的更新。self.mssh.servers[index] = modified_server# 弹出一个消息框,显示成功修改服务器信息的提示。messagebox.showinfo("成功", "服务器信息已成功修改!")# 关闭了修改服务器信息的窗口self.pop_win_modify.destroy()# 调用了 open_connect_ui 方法,打开连接用户界面,让用户可以选择连接到修改后的服务器。self.open_connect_ui()# 调用了 save_server 方法,用于保存修改后的服务器信息到文件中,以便下次程序运行时能够读取到最新的服务器信息。self.mssh.save_server()##################################  操作  ################################################## 上传文件def upload_file(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnelse:self.file_path = filedialog.askopenfilename()try:if self.file_path:# 获取文件名file_name = self.file_path.split("/")[-1]# 创建 SFTP 客户端sftp_client = self.mssh.ssh.open_sftp()remote_path = f"{self.mssh.path}/{file_name}"print(f"文件:{self.file_path},上传到:{remote_path}")# 上传文件sftp_client.put(self.file_path, remote_path)messagebox.showinfo("成功", "上传成功")sftp_client.close()else:print("请选择要上传的文件")except Exception as e:messagebox.showerror("错误", f"文件上传失败: {e}")# #下载文件def downloader(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnself.mssh.download_file()def vim_edit(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")returnself.mssh.vim_edit()def download_logs(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接到服务器!")return# 将远程目录设置为 /var/logremote_dir = "/var/log"try:self.mssh.connecting()sftp = self.mssh.ssh.open_sftp()# 检查远程目录是否存在try:sftp.stat(remote_dir)except IOError as e:messagebox.showerror("错误", f"远程目录 {remote_dir} 不存在")returnremote_files = sftp.listdir(remote_dir)local_dir = filedialog.askdirectory(title="选择下载到的本地目录")if local_dir:self.local_log_dir = local_direlse:returndownloaded_count = 0for filename in remote_files:# 检查文件是否以 ".log" 结尾if filename.endswith(".log"):remote_path = f"{remote_dir}/{filename}"local_path = f"{local_dir}/{filename}"sftp.get(remote_path, local_path)downloaded_count += 1print(f"下载成功: {remote_path} -> {local_path}")sftp.close()print(f"共下载了 {downloaded_count} 个文件")if downloaded_count > 0:messagebox.showinfo("成功", f"成功下载了 {downloaded_count} 个日志文件")except Exception as e:print(f"下载失败: {e}")messagebox.showerror("错误", f"下载日志文件失败: {e}")def view_log(self):if not self.mssh.connected:messagebox.showerror("错误", "请先连接服务器并下载日志文件!")return# 从 download_logs 函数中获取之前选择的本地目录try:local_dir = self.local_log_direxcept AttributeError:local_dir = filedialog.askdirectory(title="选择下载日志文件所在的目录")if not local_dir:returnlog_file = filedialog.askopenfilename(initialdir=local_dir, title="选择要查看的日志文件",filetypes=(("Log Files", "*.log"),))if log_file:try:# 使用 VSCode 打开日志文件subprocess.run(['Code', log_file], shell=True)except Exception as e:messagebox.showerror("错误", f"无法打开文件: {e}")def on_closing_main(self):if messagebox.askokcancel("Quit", "确认退出吗?"):self.root.destroy()def on_closing(self):if messagebox.askokcancel("Quit", "确认退出吗?"):self.pop_win.destroy()if __name__ == '__main__':ssh = MySsh()ui = MyUI(ssh)

5. 总结与分析

        使用 Python 的 Tkinter 模块编写 SSH 工具需要综合考虑界面设计、SSH 连接的实现、用户体验和安全等方面。通过 Tkinter 提供的丰富小部件,可以创建直观且易于操作的界面,让用户输入主机名、用户名、密码等信息,并提供连接按钮来触发 SSH 连接。同时,使用 Paramiko 或其他类似的库处理 SSH 连接和命令执行的细节,考虑使用异步处理以避免界面冻结,同时要注意处理连接错误和认证失败等异常情况,可以使用多线程来解决。在设计中需确保安全处理用户凭据和敏感信息,避免在界面中明文显示密码。最终,在测试和调试过程中不断完善功能,确保工具的稳定性和可靠性。通过充分利用 Python 和 Tkinter 的功能,可以创建一个功能强大且易于使用的 SSH 工具。

 

注:此项目是大学期间与搭子 霍格沃茨的纸飞鹤-CSDN博客 共同完成的,现在分享给大家参考。

 

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 如何建设和维护数据仓库:深入指南
  • 开源的语音合成工具_ChatTTS_用法及资源
  • 农场驿站平台小程序的设计
  • 概率论原理精解【4】
  • 微信小程序数组绑定使用案例(一)
  • 会Excel就会sql?
  • 【操作系统】定时器(Timer)的实现
  • PY32F002B单片机 ISP 串口下载注意事项
  • 基于GTX的64B66B编码的自定义接收模块(高速收发器二十二)
  • 简化Android数据管理:深入探索SQLite数据库
  • esp8266模块(1)
  • 冒泡,选择,插入,希尔排序
  • 分布式锁的使用场景是什么?有哪些实现方法?
  • Qt创建列表,通过外部按钮控制列表的选中下移、上移以及左侧图标的显现
  • VSCODE 下 openocd Jlink 的配置笔记
  • JS 中的深拷贝与浅拷贝
  • Create React App 使用
  • CSS实用技巧干货
  • Django 博客开发教程 16 - 统计文章阅读量
  • el-input获取焦点 input输入框为空时高亮 el-input值非法时
  • Hibernate【inverse和cascade属性】知识要点
  • Linux各目录及每个目录的详细介绍
  • pdf文件如何在线转换为jpg图片
  • PermissionScope Swift4 兼容问题
  • PHP的类修饰符与访问修饰符
  • RedisSerializer之JdkSerializationRedisSerializer分析
  • Sublime text 3 3103 注册码
  • windows下mongoDB的环境配置
  • 从0搭建SpringBoot的HelloWorld -- Java版本
  • 第13期 DApp 榜单 :来,吃我这波安利
  • 关于List、List?、ListObject的区别
  • 力扣(LeetCode)56
  • 猫头鹰的深夜翻译:JDK9 NotNullOrElse方法
  • 前端每日实战 2018 年 7 月份项目汇总(共 29 个项目)
  • 前端自动化解决方案
  • 如何设计一个微型分布式架构?
  • 使用阿里云发布分布式网站,开发时候应该注意什么?
  • 问题之ssh中Host key verification failed的解决
  • Java性能优化之JVM GC(垃圾回收机制)
  • Nginx惊现漏洞 百万网站面临“拖库”风险
  • 翻译 | The Principles of OOD 面向对象设计原则
  • #laravel部署安装报错loadFactoriesFrom是undefined method #
  • #我与Java虚拟机的故事#连载02:“小蓝”陪伴的日日夜夜
  • #周末课堂# 【Linux + JVM + Mysql高级性能优化班】(火热报名中~~~)
  • (1)bark-ml
  • (2/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (8)STL算法之替换
  • (C++哈希表01)
  • (day6) 319. 灯泡开关
  • (Redis使用系列) SpringBoot 中对应2.0.x版本的Redis配置 一
  • (阿里云万网)-域名注册购买实名流程
  • (附源码)springboot学生选课系统 毕业设计 612555
  • (一)ClickHouse 中的 `MaterializedMySQL` 数据库引擎的使用方法、设置、特性和限制。
  • (转载)跟我一起学习VIM - The Life Changing Editor
  • .net core webapi Startup 注入ConfigurePrimaryHttpMessageHandler