【Python-MySQL】Python 代码用pool管理MySQL连接,并实现增删改查
Python 代码用pool管理MySQL连接,并实现增删改查和自动备份
- 说明
- Code
- 代码备份
说明
本代码基于python3,需要提前安装pymysql和dbutils
Code
import os
import time
import schedule
import subprocess
import pymysql
from dbutils.pooled_db import PooledDB
from datetime import datetime
from mysql.connector import pooling, Error, errors
from tenacity import retry, stop_after_attempt, wait_fixedfrom utils.loguru_logger import get_logger
from utils.tools import get_container_ip, load_configlogger = get_logger(__name__)# config_file = "basic_config.json" # for production
config_file = "basic_config_dev.json" # for local development
logger.debug(f"Current config file: {config_file}")
config = load_config(config_file)
container_name = config["mysql_container_name"]# host = 'mysql_server' # for production, MySQL container name, for docker-compose
host = get_container_ip(container_name) # MySQL container IP, for local developmentmysqlInfo = {"host": host,"user": 'root',"passwd": '12138',"db": 'database',"port": 3306,"charset": 'utf8'
}class SQL:__pool = Nonedef __init__(self):self.conn = SQL.getmysqlconn()self.cursor = self.conn.cursor(cursor=pymysql.cursors.DictCursor)@staticmethoddef getmysqlconn():global __poolif SQL.__pool is None:__pool = PooledDB(creator=pymysql,mincached=2,maxcached=5,maxconnections=6,maxshared=3,blocking=True,maxusage=None,setsession=[],ping=0,host=mysqlInfo['host'],user=mysqlInfo['user'],passwd=mysqlInfo['passwd'],db=mysqlInfo['db'],port=mysqlInfo['port'],charset=mysqlInfo['charset'])return __pool.connection()def release(self):self.conn.close()self.cursor.close()@retry(stop=stop_after_attempt(5), wait=wait_fixed(5))def execute(self, query, params=None):"""_summary_Execute a query on the MySQL server with optional parameters.Args:query (string): _description_params (string?, optional): _description_. Defaults to None."""try:if self.conn is None or not self.conn.is_connected():self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except Error as e:logger.error(f"Error executing query: {e}")raise edef fetchall(self):try:return self.cursor.fetchall()except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()return self.cursor.fetchall()except Error as e:logger.error(f"Error fetching results: {e}")raise edef check_duplicate(self, database, table, condition):query = f"SELECT COUNT(*) FROM `{database}`.`{table}` WHERE {condition}"logger.info(f"Check duplicate query: {query}")self.cursor.execute(query)return self.cursor.fetchall()def insert_data(self, database, table, insert_dict):if not database or not table or not insert_dict:return Falsecolumns_str = ", ".join([f"`{column}`" for column in insert_dict.keys()])values_str = ", ".join([f"{value}" for value in insert_dict.values()])query = f"INSERT INTO `{database}`.`{table}` ({columns_str}) VALUES ({values_str});"logger.info(f"Insert query: {query}")self.cursor.execute(query)self.conn.commit()logger.trace(f"Row affected: {self.cursor.rowcount}, Last row id: {self.cursor.lastrowid}")if self.cursor.rowcount == 0:return Falsereturn self.cursor.rowcount, self.cursor.lastrowiddef update_data(self, database, table, update_dict, condition=""):if not database or not table or not update_dict:return Falseset_values = ", ".join([f"`{column}` = {value}" for column, value in update_dict.items()])query = f"UPDATE `{database}`.`{table}` SET {set_values}"if condition:query += f" WHERE {condition}"logger.info(f"Update query: {query}")self.cursor.execute(query)self.conn.commit()return self.cursor.rowcountdef select_data(self, database, table, columns="*", condition=""):query = f"SELECT {columns} FROM `{database}`.`{table}`"if condition:query += f" WHERE {condition}"logger.info(f"Search query: {query}")self.cursor.execute(query)# field_names = [i[0] for i in self.cursor.description]rows = self.fetchall()logger.info(f"Selected data from '{table}': {rows}")# commented, as update sql library, no more need to orignize field names by hand.# result = [dict(zip(field_names, row)) for row in rows]# logger.info(f"Select result: {result}")return rowsdef delete_data(self, table, condition=""):query = f"DELETE FROM {table}"if condition:query += f" WHERE {condition}"self.cursor.execute(query)self.conn.commit()def backup_database(self):backup_dir = config["mysql_storage"]date = datetime.now().strftime('%Y-%m') # e.g. 2024-06, backup new file by monthbackup_file = f'{backup_dir}/astri_backup_{date}.sql'# make sure the backup directory existsos.makedirs(backup_dir, exist_ok=True)# run mysqldumpcommand = (f"mysqldump -h {mysqlInfo['host']} -P {mysqlInfo['port']} -u {mysqlInfo['user']} "f"--password={mysqlInfo['passwd']} {mysqlInfo['db']}")try:with open(backup_file, 'w') as f:result = subprocess.run(command, shell=True, check=True, stdout=f, stderr=subprocess.PIPE)logger.info(f"Backup successful. Result: {result}")except subprocess.CalledProcessError as e:logger.error(f"Error occurred: {e.stderr.decode().strip()}")# optional - remove old backups over 30 daysold_backups = " ".join([ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+30", "-exec", "rm", "{}", ";" ]) # for production# old_backups = " ".join([ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+1", "-exec", "rm", "{}", ";" ]) # for local devsubprocess.run(old_backups, shell=True)def auto_backup(self):# schedule.every().day.at("00:00").do(self.backup_database)schedule.every().day.do(self.backup_database)while True:schedule.run_pending()logger.info("Waiting for next backup schedule...")time.sleep(10)
代码备份
用原生的mysql.connector连接,但没有用DBUtils管理
import os
import time
import schedule
import subprocess
import pymysql
from datetime import datetime
from mysql.connector import pooling, Error, errors
from tenacity import retry, stop_after_attempt, wait_fixedfrom utils.loguru_logger import get_logger
from utils.tools import get_container_ip, load_configlogger = get_logger(__name__)# config_file = "basic_config.json" # for production
config_file = "basic_config_dev.json" # for local development
logger.debug(f"Current config file: {config_file}")
config = load_config(config_file)
container_name = config["mysql_container_name"]class SQL:def __init__(self):# self.host = 'mysql_server' # for production, MySQL container name, for docker-composeself.host = get_container_ip(container_name) # MySQL container IP, for local developmentself.port = 3306self.user = 'root'self.password = '12138'self.database = 'database'self.pool = Noneself.cnx = Noneself.cursor = Noneself.create_pool()self.connect()logger.trace(f"SQL object created with host: {self.host}, port: {self.port}, user: {self.user}, database: {self.database}")def create_pool(self):"""Create a connection pool to MySQL server"""try:self.pool = pooling.MySQLConnectionPool(pool_name="pool",pool_size=5,host=self.host,port=self.port,user=self.user,password=self.password,database=self.database)logger.success("Connection pool created")except Error as err:logger.error(f"Error creating connection pool: {err}")self.pool = None@retry(stop=stop_after_attempt(5), wait=wait_fixed(5))def connect(self):"""Connect to the MySQL server using the connection pool."""if self.pool is None:logger.error("Connection pool is not created")raise Exception("Connection pool is not created")try:self.cnx = self.pool.get_connection()if self.cnx.is_connected():self.cursor = self.cnx.cursor()logger.success("Connected to MySQL server using connection pool")except Error as err:logger.error(f"Error connecting to MySQL server: {err}")raise errdef close(self):"""_summary_Close the connection to the MySQL server."""if self.cursor:self.cursor.close()if self.cnx:self.cnx.close()logger.info("MySQL connection closed")@retry(stop=stop_after_attempt(5), wait=wait_fixed(5))def execute(self, query, params=None):"""_summary_Execute a query on the MySQL server with optional parameters.Args:query (string): _description_params (string?, optional): _description_. Defaults to None."""# if params:# self.cursor.execute(query, params)# else:# self.cursor.execute(query)try:if self.cnx is None or not self.cnx.is_connected():self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()if params:self.cursor.execute(query, params)else:self.cursor.execute(query)except Error as e:logger.error(f"Error executing query: {e}")raise edef fetchall(self):try:return self.cursor.fetchall()except errors.OperationalError as e:logger.error(f"OperationalError: {e}")self.connect()return self.cursor.fetchall()except Error as e:logger.error(f"Error fetching results: {e}")raise edef check_duplicate(self, database, table, condition):query = f"SELECT COUNT(*) FROM `{database}`.`{table}` WHERE {condition}"logger.info(f"Check duplicate query: {query}")self.cursor.execute(query)return self.cursor.fetchall()def insert_data(self, database, table, insert_dict):if not database or not table or not insert_dict:return Falsecolumns_str = ", ".join([f"`{column}`" for column in insert_dict.keys()])values_str = ", ".join([f"{value}" for value in insert_dict.values()])query = f"INSERT INTO `{database}`.`{table}` ({columns_str}) VALUES ({values_str});"logger.info(f"Insert query: {query}")self.cursor.execute(query)self.cnx.commit()logger.trace(f"Row affected: {self.cursor.rowcount}, Last row id: {self.cursor.lastrowid}")if self.cursor.rowcount == 0:return Falsereturn self.cursor.rowcount, self.cursor.lastrowiddef update_data(self, database, table, update_dict, condition=""):if not database or not table or not update_dict:return Falseset_values = ", ".join([f"`{column}` = {value}" for column, value in update_dict.items()])query = f"UPDATE `{database}`.`{table}` SET {set_values}"if condition:query += f" WHERE {condition}"logger.info(f"Update query: {query}")self.cursor.execute(query)self.cnx.commit()return self.cursor.rowcountdef select_data(self, database, table, columns="*", condition=""):query = f"SELECT {columns} FROM `{database}`.`{table}`"if condition:query += f" WHERE {condition}"logger.info(f"Search query: {query}")self.cursor.execute(query)field_names = [i[0] for i in self.cursor.description]rows = self.fetchall()result = [dict(zip(field_names, row)) for row in rows]return resultdef delete_data(self, table, condition=""):query = f"DELETE FROM {table}"if condition:query += f" WHERE {condition}"self.cursor.execute(query)self.cnx.commit()def backup_database(self):backup_dir = config["mysql_storage"]date = datetime.now().strftime('%Y-%m') # e.g. 2024-06, backup new file by monthbackup_file = f'{backup_dir}/astri_backup_{date}.sql'# make sure the backup directory existsos.makedirs(backup_dir, exist_ok=True)# run mysqldumpcommand = (f"mysqldump -h {self.host} -P {self.port} -u {self.user} "f"--password={self.password} {self.database}")try:with open(backup_file, 'w') as f:result = subprocess.run(command, shell=True, check=True, stdout=f, stderr=subprocess.PIPE)logger.info(f"Backup successful. Result: {result}")except subprocess.CalledProcessError as e:logger.error(f"Error occurred: {e.stderr.decode().strip()}")# optional - remove old backups over 30 daysold_backups = [ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+30", "-exec", "rm", "{}", ";" ] # for production# old_backups = [ "find", backup_dir, "-type", "f", "-name", "*.sql", "-mtime", "+1", "-exec", "rm", "{}", ";" ] # for local devsubprocess.run(old_backups)def auto_backup(self):# schedule.every().day.at("00:00").do(self.backup_database)# schedule.every().day.do(self.backup_database)schedule.every().hour.do(self.backup_database) # dev testwhile True:schedule.run_pending()logger.info("Waiting for next backup schedule...")time.sleep(10)