当前位置: 首页 > news >正文

数据同步大冒险:PostgreSQL到MySQL的奇妙之旅

引言:一场跨数据库的浪漫邂逅 💑

在数据的世界里,不同数据库系统就像是来自不同星球的恋人,它们各自拥有独特的魅力,但偶尔也会渴望一场跨越界限的亲密接触。今天,我们就来见证一场PostgreSQL与MySQL之间的浪漫邂逅——定时获取PostgreSQL中的数据,并将其温柔地同步至MySQL的怀抱中。这不仅是一场技术的挑战,更是一次数据流转的艺术展现!

场景设定:数据的星际旅行 🚀

想象一下,你是一位数据守护者,负责管理着两个星球的数据库:PostgreSQL的“科技星球”和MySQL的“人文星球”。每天,你都需要从“科技星球”收集最新的科研成果(数据),然后运送到“人文星球”上,让那里的居民也能享受到科技进步的果实。

思路分析:星际导航图 🗺️

要完成这场星际旅行,我们需要精心规划航线:

启航准备:确保两艘飞船(数据库连接)都已就绪,且飞船上的储物舱(数据表)结构相似,便于数据转移。
坐标定位:首先,从PostgreSQL中读取最新的数据ID,这是我们的“出发坐标”。然后,在MySQL中查询最新的数据ID,作为我们的“目标坐标”。
航线规划:通过比较两个坐标的差距,我们可以确定哪些数据是新增的,需要被“运送”到MySQL。这就像是在浩瀚的数据海洋中,绘制出一条最优的航线。
数据转移:根据规划好的航线,将选定的数据从PostgreSQL中提取出来,并安全地“降落”在MySQL的相应位置。
实战代码:编写星际导航程序 💻
虽然具体的代码实现需要你们自己来完成(因为你们才是这场冒险的主角!),但我可以给你们一个大致的框架,就像是一个星际导航程序的伪代码:

# 假设这是你的星际导航程序  def connect_to_postgresql():  # 连接PostgreSQL数据库,获取连接对象  # ...  return pg_connection  def connect_to_mysql():  # 连接MySQL数据库,获取连接对象  # ...  return mysql_connection  def fetch_latest_ids(connection, table_name):  # 从指定数据库中获取最新数据ID  # 使用SQL查询,如 SELECT MAX(id) FROM table_name  # ...  return latest_id  def sync_data(pg_connection, mysql_connection, source_table, target_table):  # 1. 获取两个数据库的最新ID  pg_latest_id = fetch_latest_ids(pg_connection, source_table)  mysql_latest_id = fetch_latest_ids(mysql_connection, target_table)  # 2. 确定需要同步的数据范围  if pg_latest_id > mysql_latest_id:  # 3. 编写SQL查询,选择ID在mysql_latest_id到pg_latest_id之间的数据  # 4. 执行查询,获取数据  # 5. 编写SQL语句,将获取的数据插入MySQL  # ...  # 定时执行数据同步任务  
# 可以使用APScheduler等库来实现定时任务

结尾:星际旅行的意义 🌟
通过这场PostgreSQL到MySQL的数据同步冒险,我们不仅实现了数据的跨库流动,更深刻体会到了数据在不同系统间共享的重要性。正如星际旅行不仅仅是为了探索未知,更是为了促进不同文明之间的交流与融合。希望这次经历能激发你们对数据世界更多奇妙的想象和探索!

实际源码

"""
功能:
监测数据表是否更新
连接postgresql
连接mysql
比较-同步
"""
import sys
import pymysql
import psycopg2
import pandas as pd
import numpy as npdbname_mysql = 'followup'
table_name_list_mysql = ['s01_issue_table','np_kickoff','np_rfq','s02_np_kickoff','s02_np_rfq','s06_kick_off_list','s06rfqlist']dbname_postgresql = 'lcmbigdata'
table_name_list_postgresql = ['s01_issue_table','np_kickoff','np_rfq','s02_np_kickoff','s02_np_rfq','s06_kick_off_list','s06rfqlist']col_address_lists = [['create_time', 'update_time'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date']]modelname_postgresql = 'digitalelf'# 数据库连接参数
conn_params_mysql = {"database": dbname_mysql,"user": "root","password": "root","host": "localhost","port": 3306  # 端口号应该是整数
}
conn_params_postgresql = {"database": dbname_postgresql,"user": "postgres",  # 数据库用户"password": "root",  # 数据库密码"host": "localhost",  # 数据库服务器地址"port": 6666  # 数据库端口
}issue_columns = ['id' ,'model_no' ,'part_no' ,'issue_type', 'issue_from', 'start_date','end_date' ,'status', 'priority', 'fab', 'issue_description', 'root_cause','customer' ,'customer_operation' ,'phase', 'analysis', 'solution', 'progress','lesson_learnt' ,'create_by' ,'create_time', 'update_by', 'update_time','sys_org_code' ,'site', 'issue_owner', 'is_nudd' ,'attachment' ,'issue_dept','issue_update' ,'func', 'material_structure']
kickoff_columns = ['id', 'week', 'jiazhi', 'jishu', 'modelno', 'pn', 'customer', 'technology', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'odm', 'others_feiyong', 'renli_target', 'fy_target', 'mpiowner', 'mpi_bumen', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
rfq_columns = ['id', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
s02_kickoff_columns = ['id', 'week', 'customer', 'technology', 'odm', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'renli_target', 'fy_target', 'jiazhi', 'jishu', 'pn', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'others_feiyong', 'mpiowner', 'mpi_bumen', 'modelno']
s02_rfq_columns = ['id', 'customer', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'evaluation_date', 'rfq_result', 'fail_cause']
s06_kickoff_columns = ['id', 'modelno', 'technology', 'cellsite', 'fcst', 'jingzhengduishou', 'odm', 'mpiowner', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'week', 'jiazhi', 'jishu', 'kickoff_gopremium', 'pn', 'customer', 'kickoffdate', 'mpdate', 'dvtdate', 'yingyong', 'kehu_shiyong_fangshi', 'others_feiyong', 'renli_target', 'fy_target', 'design_processremark', 'mpi_bumen']
s06_rfq_columns = ['id', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause']columns_names = [issue_columns,kickoff_columns,rfq_columns,s02_kickoff_columns,s02_rfq_columns,s06_kickoff_columns,s06_rfq_columns]sql_host = 'localhost'
port = 3306
user = 'root'
password = 'root'
sql_db_name = 'followup'
# sql_table_name = 'np_issue'def replace_nan_with_none(value):"""将 numpy.nan 替换为 None,其他值保持不变。"""return None if np.isnan(value) else value# #数据库SQL上传函数
def sql_connect():try:conn = pymysql.connect(host = sql_host,port = port,user = user,password = password,db = sql_db_name,charset='utf8')return connexcept Exception as e:logging.error('SQL CONNECT' + str(e))def sql_upload(raw_data, table_name):"""将DataFrame raw_data写入MySQL数据库指定的table_name表中。"""# 连接数据库conn = sql_connect()cursor = conn.cursor()# 确保表存在,如果不存在则创建表# 注意:这里简化了表的创建过程,实际应用中可能需要根据raw_data的列名和数据类型创建合适的表结构create_table_sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (-- 假设所有列都是VARCHAR类型,实际情况应根据raw_data的列调整flag VARCHAR(255),period VARCHAR(255),dept VARCHAR(255),-- 其他列...bd VARCHAR(255));"""cursor.execute(create_table_sql)# 构建插入语句insert_sql = f"""INSERT INTO {table_name} ({', '.join(raw_data.columns)})VALUES ({', '.join(['%s'] * len(raw_data.columns))});"""# 执行插入语句for index, row in raw_data.iterrows():try:# 处理列表中的每个元素# print("index:\n",index)row_list = list(row)for i in range(len(row_list)):if row_list[i] is np.nan:row_list[i] = replace_nan_with_none(row_list[i])# if np.nan in row:#     cleaned_row = [replace_nan_with_none(x) for x in list(row)]cursor.execute(insert_sql, row_list)conn.commit()except Exception as e:print("上传失败:", e)print(insert_sql,list(row))conn.rollback()cursor.close()conn.close()print("数据成功上传至数据库")def data_update_PostgreSQL_MySQL(data_type_tips):for i in range(0,len(table_name_list_mysql)):# i=1print("🦁"*30)print(f"第{data_type_tips}次更新{table_name_list_mysql[i]}")print("🐎" * 30)table_name_mysql = table_name_list_mysql[i]table_name_postgresql = table_name_list_postgresql[i]col_address_list = col_address_lists[i]data_columns = columns_names[i]id_list_mysql = []id_list_postgresql = []id_difference = []try:# 使用连接参数建立PostgreSQL连接try:conn_postgresql = psycopg2.connect(**conn_params_postgresql)# print("成功连接到PostgreSQL数据库")# 创建一个cursor对象来执行SQL命令cur_postgresql = conn_postgresql.cursor()# 执行SQL查询(例如:选择所有记录)# cur_postgresql.execute(f"SELECT * FROM {dbname}.{your_table_name};")cur_postgresql.execute(f"SELECT id FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql};")# 获取所有查询结果rows = cur_postgresql.fetchall()for row in rows:id_list_postgresql.append(row[0])# print(row)except (Exception, psycopg2.DatabaseError) as error:print(f"PostgreSQL数据库错误:{error}")# 使用连接参数建立MySQL连接try:conn_mysql = pymysql.connect(**conn_params_mysql)# print("成功连接到MySQL数据库")# 创建一个cursor对象来执行SQL命令cur_mysql = conn_mysql .cursor()# 执行SQL查询(例如:选择所有记录)# 修复 SQL 查询字符串中的表名引用#cur.execute(f"SELECT * FROM {your_table_name};")cur_mysql.execute(f"SELECT id FROM {table_name_mysql};")# 获取所有查询结果rows = cur_mysql.fetchall()for row in rows:id_list_mysql.append(row[0])# print(row)for id in id_list_postgresql:if id not in id_list_mysql:# print(id)id_difference.append(id)print(len(id_difference))# print(id_difference)# sys.exit()print("🐒"*20)if len(id_difference):if len(id_difference) == 1:cur_postgresql.execute(f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id = '{id_difference[0]}';")else:id_difference = tuple(id_difference)cur_postgresql.execute(f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id in {id_difference};")"""新入新的数据库"""# 获取所有查询结果rows_new = cur_postgresql.fetchall()# print(len(rows_new),len(data_columns))print("🐅" * 20)rows_pd = pd.DataFrame(rows_new, columns=data_columns)print("🐍"*20)# print(len(rows_pd.columns),len(rows_new),len(data_columns))print("🐉" * 20)# 处理 DataFrame 中的日期时间列for col in data_columns:data_type = rows_pd[col].dtypeif 'datetime64[ns, UTC+08:00]' in str(data_type):# 移除时区信息rows_pd[col] = rows_pd[col].dt.tz_localize(None)# 将日期时间转换为字符串格式rows_pd[col] = rows_pd[col].dt.strftime('%Y-%m-%d %H:%M:%S')sql_upload(rows_pd, table_name_mysql)print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")# print('★' * 10, '\n', 'successs')else:print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")except (Exception, pymysql.DatabaseError) as error:print(f"MySQL数据库错误:{error}")finally:# 关闭cursor和连接if cur_postgresql:cur_postgresql.close()if conn_postgresql:conn_postgresql.close()# print("PostgreSQL数据库连接已关闭")# 关闭cursor和连接if cur_mysql:cur_mysql.close()if conn_mysql :conn_mysql .close()# print("MySQL数据库连接已关闭")except Exception as e:print(f"数据库同步ERROR {e}")def data_update():for i in range(1,3):# print(f"第{i}次更新数据库")data_update_PostgreSQL_MySQL(i)data_update()

运行结果

在这里插入图片描述

增加定时功能

import schedule
import timedef data_update():print("Updating data...")# 每15分钟执行一次
schedule.every(15).minutes.do(data_update)while True:schedule.run_pending()time.sleep(1)

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Java面试宝典-java基础08
  • 构建Spring Boot应用的微服务服务网格Istio集成
  • Java后端服务端渲染与客户端渲染:SSR与CSR的权衡
  • 使用Axure打造智慧场景下的数据可视化大屏原型
  • Android 动态性能框架 (ADPF)
  • STM32F1+HAL库+FreeTOTS学习8——第一个任务,启动!
  • linux-基础知识2
  • 白盒测试及其测试方法
  • Linux高级编程-进程间通信(IPC)
  • 构建大师:深入理解Linux下的Make和Makefile
  • UE5学习笔记20-给游戏添加声音
  • 安装VC++Redist报错0x80070003的解决办法
  • 大连网站建设手机网页页面设计
  • STM32入门教程:SPI通信
  • RabbitMQ 集群与高可用性
  • [译] React v16.8: 含有Hooks的版本
  • 【mysql】环境安装、服务启动、密码设置
  • 【Redis学习笔记】2018-06-28 redis命令源码学习1
  • codis proxy处理流程
  • crontab执行失败的多种原因
  • git 常用命令
  • jdbc就是这么简单
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • RedisSerializer之JdkSerializationRedisSerializer分析
  • Shell编程
  • spring boot下thymeleaf全局静态变量配置
  • 力扣(LeetCode)965
  • 前端_面试
  • 让你成为前端,后端或全栈开发程序员的进阶指南,一门学到老的技术
  • 使用前端开发工具包WijmoJS - 创建自定义DropDownTree控件(包含源代码)
  • 小而合理的前端理论:rscss和rsjs
  • 京东物流联手山西图灵打造智能供应链,让阅读更有趣 ...
  • # 20155222 2016-2017-2 《Java程序设计》第5周学习总结
  • #git 撤消对文件的更改
  • (~_~)
  • (02)Unity使用在线AI大模型(调用Python)
  • (1)(1.13) SiK无线电高级配置(六)
  • (14)Hive调优——合并小文件
  • (30)数组元素和与数字和的绝对差
  • (Arcgis)Python编程批量将HDF5文件转换为TIFF格式并应用地理转换和投影信息
  • (JSP)EL——优化登录界面,获取对象,获取数据
  • (Redis使用系列) Springboot 使用redis实现接口Api限流 十
  • (vue)el-cascader级联选择器按勾选的顺序传值,摆脱层级约束
  • (四)js前端开发中设计模式之工厂方法模式
  • (算法)硬币问题
  • (自适应手机端)行业协会机构网站模板
  • ******IT公司面试题汇总+优秀技术博客汇总
  • .ai域名是什么后缀?
  • .gitignore文件---让git自动忽略指定文件
  • .NET Remoting学习笔记(三)信道
  • .net 程序发生了一个不可捕获的异常
  • .NET 设计一套高性能的弱事件机制
  • .Net+SQL Server企业应用性能优化笔记4——精确查找瓶颈
  • .netcore 获取appsettings
  • .net安装_还在用第三方安装.NET?Win10自带.NET3.5安装