当前位置: 首页 > news >正文

【Python-MySQL】Python 代码用pool管理MySQL连接,并实现增删改查

Python 代码用pool管理MySQL连接,并实现增删改查和自动备份

  • 说明
    • Code
    • 代码备份

说明

本代码基于python3,需要提前安装pymysql和dbutils

Code

import os
import time
import schedule
import subprocess
import pymysql
from dbutils.pooled_db import PooledDB
from datetime import datetime
from mysql.connector import pooling, Error, errors
from tenacity import retry, stop_after_attempt, wait_fixedfrom utils.loguru_logger import get_logger
from utils.tools import get_container_ip, load_configlogger = get_logger(__name__)# config_file = "basic_config.json" # for production
config_file = "basic_config_dev.json" # for local development
logger.debug(f"Current config file: {config_file}")
config = load_config(config_file)
container_name = config["mysql_container_name"]# host = 'mysql_server' # for production, MySQL container name, for docker-compose
host = get_container_ip(container_name) # MySQL container IP, for local developmentmysqlInfo = {"host": host,"user": 'root',"passwd": '12138',"db": 'database',"port": 3306,"charset": 'utf8'
}class SQL:__pool = Nonedef __init__(self):self.conn = SQL.getmysqlconn()self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)@staticmethoddef getmysqlconn():global __poolif SQL.__pool is None:__pool = PooledDB(creator=pymysql,mincached=2,maxcached=5,maxconnections=6,maxshared=3,blocking=True,maxusage=None,setsession=[],ping=0,host=mysqlInfo['host'],user=mysqlInfo['user'],passwd=mysqlInfo['passwd'],db=mysqlInfo['db'],port=mysqlInfo['port'],charset=mysqlInfo['charset'])return __pool.connection()def release(self):self.conn.close()self.cursor.close()@retry(stop=stop_after_attempt(5), wait=wait_fixed(5))def execute(self, query, params=None):"""_summary_Execute a query on the MySQL server with optional parameters.Args:query (string): _description_params (string?, optional): _description_. Defaults to None."""try:if self.conn is None or not self.conn.is_connected():self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except Error as e:logger.error(f"Error executing query: {e}")raise edef fetchall(self):try:return self.cursor.fetchall()except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()return self.cursor.fetchall()except Error as e:logger.error(f"Error fetching results: {e}")raise edef check_duplicate(self, database, table, condition):query = f"SELECT COUNT(*) FROM `{database}`.`{table}` WHERE {condition}"logger.info(f"Check duplicate query: {query}")self.cursor.execute(query)return self.cursor.fetchall()def insert_data(self, database, table, insert_dict):if not database or not table or not insert_dict:return Falsecolumns_str = ", ".join([f"`{column}`" for column in insert_dict.keys()])values_str = ", ".join([f"{value}" for value in insert_dict.values()])query = f"INSERT INTO `{database}`.`{table}` ({columns_str}) VALUES ({values_str});"logger.info(f"Insert query: {query}")self.cursor.execute(query)self.conn.commit()logger.trace(f"Row affected: {self.cursor.rowcount}, Last row id: {self.cursor.lastrowid}")if self.cursor.rowcount == 0:return Falsereturn self.cursor.rowcount, self.cursor.lastrowiddef update_data(self, database, table, update_dict, condition=""):if not database or not table or not update_dict:return Falseset_values = ", ".join([f"`{column}` = {value}" for column, value in update_dict.items()])query = f"UPDATE `{database}`.`{table}` SET {set_values}"if condition:query += f" WHERE {condition}"logger.info(f"Update query: {query}")self.cursor.execute(query)self.conn.commit()return self.cursor.rowcountdef select_data(self, database, table, columns="*", condition=""):query = f"SELECT {columns} FROM `{database}`.`{table}`"if condition:query += f" WHERE {condition}"logger.info(f"Search query: {query}")self.cursor.execute(query)# field_names = [i[0] for i in self.cursor.description]rows = self.fetchall()logger.info(f"Selected data from '{table}': {rows}")# commented, as update sql library, no more need to orignize field names by hand.# result = [dict(zip(field_names, row)) for row in rows]# logger.info(f"Select result: {result}")return rowsdef delete_data(self, table, condition=""):query = f"DELETE FROM {table}"if condition:query += f" WHERE {condition}"self.cursor.execute(query)self.conn.commit()def backup_database(self):backup_dir = config["mysql_storage"]date = datetime.now().strftime('%Y-%m') # e.g. 2024-06, backup new file by monthbackup_file = f'{backup_dir}/astri_backup_{date}.sql'# make sure the backup directory existsos.makedirs(backup_dir, exist_ok=True)# run mysqldumpcommand = (f"mysqldump -h {mysqlInfo['host']} -P {mysqlInfo['port']} -u {mysqlInfo['user']} "f"--password={mysqlInfo['passwd']} {mysqlInfo['db']}")try:with open(backup_file, 'w') as f:result = subprocess.run(command, shell=True, check=True, stdout=f, stderr=subprocess.PIPE)logger.info(f"Backup successful. Result: {result}")except subprocess.CalledProcessError as e:logger.error(f"Error occurred: {e.stderr.decode().strip()}")# optional - remove old backups over 30 daysold_backups = " ".join([ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+30", "-exec", "rm", "{}", ";" ]) # for production# old_backups = " ".join([ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+1", "-exec", "rm", "{}", ";" ]) # for local devsubprocess.run(old_backups, shell=True)def auto_backup(self):# schedule.every().day.at("00:00").do(self.backup_database)schedule.every().day.do(self.backup_database)while True:schedule.run_pending()logger.info("Waiting for next backup schedule...")time.sleep(10)

代码备份

用原生的mysql.connector连接,但没有用DBUtils管理

import os
import time
import schedule
import subprocess
import pymysql
from datetime import datetime
from mysql.connector import pooling, Error, errors
from tenacity import retry, stop_after_attempt, wait_fixedfrom utils.loguru_logger import get_logger
from utils.tools import get_container_ip, load_configlogger = get_logger(__name__)# config_file = "basic_config.json" # for production
config_file = "basic_config_dev.json" # for local development
logger.debug(f"Current config file: {config_file}")
config = load_config(config_file)
container_name = config["mysql_container_name"]class SQL:def __init__(self):# self.host = 'mysql_server' # for production, MySQL container name, for docker-composeself.host = get_container_ip(container_name) # MySQL container IP, for local developmentself.port = 3306self.user = 'root'self.password = '12138'self.database = 'database'self.pool = Noneself.cnx = Noneself.cursor = Noneself.create_pool()self.connect()logger.trace(f"SQL object created with host: {self.host}, port: {self.port}, user: {self.user}, database: {self.database}")def create_pool(self):"""Create a connection pool to MySQL server"""try:self.pool = pooling.MySQLConnectionPool(pool_name="pool",pool_size=5,host=self.host,port=self.port,user=self.user,password=self.password,database=self.database)logger.success("Connection pool created")except Error as err:logger.error(f"Error creating connection pool: {err}")self.pool = None@retry(stop=stop_after_attempt(5), wait=wait_fixed(5))def connect(self):"""Connect to the MySQL server using the connection pool."""if self.pool is None:logger.error("Connection pool is not created")raise Exception("Connection pool is not created")try:self.cnx = self.pool.get_connection()if self.cnx.is_connected():self.cursor = self.cnx.cursor()logger.success("Connected to MySQL server using connection pool")except Error as err:logger.error(f"Error connecting to MySQL server: {err}")raise errdef close(self):"""_summary_Close the connection to the MySQL server."""if self.cursor:self.cursor.close()if self.cnx:self.cnx.close()logger.info("MySQL connection closed")@retry(stop=stop_after_attempt(5), wait=wait_fixed(5))def execute(self, query, params=None):"""_summary_Execute a query on the MySQL server with optional parameters.Args:query (string): _description_params (string?, optional): _description_. Defaults to None."""# if params:#     self.cursor.execute(query, params)# else:#     self.cursor.execute(query)try:if self.cnx is None or not self.cnx.is_connected():self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except Error as e:logger.error(f"Error executing query: {e}")raise edef fetchall(self):try:return self.cursor.fetchall()except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()return self.cursor.fetchall()except Error as e:logger.error(f"Error fetching results: {e}")raise edef check_duplicate(self, database, table, condition):query = f"SELECT COUNT(*) FROM `{database}`.`{table}` WHERE {condition}"logger.info(f"Check duplicate query: {query}")self.cursor.execute(query)return self.cursor.fetchall()def insert_data(self, database, table, insert_dict):if not database or not table or not insert_dict:return Falsecolumns_str = ", ".join([f"`{column}`" for column in insert_dict.keys()])values_str = ", ".join([f"{value}" for value in insert_dict.values()])query = f"INSERT INTO `{database}`.`{table}` ({columns_str}) VALUES ({values_str});"logger.info(f"Insert query: {query}")self.cursor.execute(query)self.cnx.commit()logger.trace(f"Row affected: {self.cursor.rowcount}, Last row id: {self.cursor.lastrowid}")if self.cursor.rowcount == 0:return Falsereturn self.cursor.rowcount, self.cursor.lastrowiddef update_data(self, database, table, update_dict, condition=""):if not database or not table or not update_dict:return Falseset_values = ", ".join([f"`{column}` = {value}" for column, value in update_dict.items()])query = f"UPDATE `{database}`.`{table}` SET {set_values}"if condition:query += f" WHERE {condition}"logger.info(f"Update query: {query}")self.cursor.execute(query)self.cnx.commit()return self.cursor.rowcountdef select_data(self, database, table, columns="*", condition=""):query = f"SELECT {columns} FROM `{database}`.`{table}`"if condition:query += f" WHERE {condition}"logger.info(f"Search query: {query}")self.cursor.execute(query)field_names = [i[0] for i in self.cursor.description]rows = self.fetchall()result = [dict(zip(field_names, row)) for row in rows]return resultdef delete_data(self, table, condition=""):query = f"DELETE FROM {table}"if condition:query += f" WHERE {condition}"self.cursor.execute(query)self.cnx.commit()def backup_database(self):backup_dir = config["mysql_storage"]date = datetime.now().strftime('%Y-%m') # e.g. 2024-06, backup new file by monthbackup_file = f'{backup_dir}/astri_backup_{date}.sql'# make sure the backup directory existsos.makedirs(backup_dir, exist_ok=True)# run mysqldumpcommand = (f"mysqldump -h {self.host} -P {self.port} -u {self.user} "f"--password={self.password} {self.database}")try:with open(backup_file, 'w') as f:result = subprocess.run(command, shell=True, check=True, stdout=f, stderr=subprocess.PIPE)logger.info(f"Backup successful. Result: {result}")except subprocess.CalledProcessError as e:logger.error(f"Error occurred: {e.stderr.decode().strip()}")# optional - remove old backups over 30 daysold_backups = [ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+30", "-exec", "rm", "{}", ";" ] # for production# old_backups = [ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+1", "-exec", "rm", "{}", ";" ] # for local devsubprocess.run(old_backups)def auto_backup(self):# schedule.every().day.at("00:00").do(self.backup_database)# schedule.every().day.do(self.backup_database)schedule.every().hour.do(self.backup_database) # dev testwhile True:schedule.run_pending()logger.info("Waiting for next backup schedule...")time.sleep(10)

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Pip换源
  • 【zabbix6自定义监控带参数】
  • IIS解析漏洞
  • C++ bind复杂回调逻辑分析
  • LeetCode高频
  • Charles怎么修改参数
  • 擅于辩论的人可以将黑的说成白的,但是存在无法解决的矛盾
  • 【已解决】ERROR: No matching distribution found for torch.安装torch一次性解决方法
  • 【Python】正色表达式 - 验证罗马数字
  • [VS Code扩展]写一个代码片段管理插件(一):介绍与界面搭建
  • jdk和tomcat的环境配置以及使用nginx代理tomcat来实现负载均衡
  • 0064__管道 (进程间通信)
  • C++数据结构学习(顺序表)
  • 洛谷 P1868 饥饿的奶牛
  • 实现一个全栈模糊搜索匹配的功能
  • SegmentFault for Android 3.0 发布
  • 收藏网友的 源程序下载网
  • 2018天猫双11|这就是阿里云!不止有新技术,更有温暖的社会力量
  • Consul Config 使用Git做版本控制的实现
  • Java 内存分配及垃圾回收机制初探
  • JavaScript中的对象个人分享
  • js写一个简单的选项卡
  • Sublime Text 2/3 绑定Eclipse快捷键
  • 发布国内首个无服务器容器服务,运维效率从未如此高效
  • 番外篇1:在Windows环境下安装JDK
  • 学习笔记:对象,原型和继承(1)
  • 运行时添加log4j2的appender
  • postgresql行列转换函数
  • 格斗健身潮牌24KiCK获近千万Pre-A轮融资,用户留存高达9个月 ...
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • ​ 全球云科技基础设施:亚马逊云科技的海外服务器网络如何演进
  • ​​​​​​​GitLab 之 GitLab-Runner 安装,配置与问题汇总
  • #1015 : KMP算法
  • #if等命令的学习
  • #宝哥教你#查看jquery绑定的事件函数
  • #我与Java虚拟机的故事#连载18:JAVA成长之路
  • (react踩过的坑)antd 如何同时获取一个select 的value和 label值
  • (八)Flink Join 连接
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (算法)N皇后问题
  • (转)淘淘商城系列——使用Spring来管理Redis单机版和集群版
  • (转载)VS2010/MFC编程入门之三十四(菜单:VS2010菜单资源详解)
  • (最新)华为 2024 届秋招-硬件技术工程师-单板硬件开发—机试题—(共12套)(每套四十题)
  • * 论文笔记 【Wide Deep Learning for Recommender Systems】
  • **PHP二维数组遍历时同时赋值
  • **PHP分步表单提交思路(分页表单提交)
  • *算法训练(leetcode)第四十七天 | 并查集理论基础、107. 寻找存在的路径
  • .NET 8 跨平台高性能边缘采集网关
  • .NET Core MongoDB数据仓储和工作单元模式封装
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • .NET Framework .NET Core与 .NET 的区别
  • .NET/C# 使用反射注册事件
  • .net操作Excel出错解决
  • .NET轻量级ORM组件Dapper葵花宝典
  • .one4-V-XXXXXXXX勒索病毒数据怎么处理|数据解密恢复