比较数据迁移后MySQL数据库和ClickHouse数据仓库中的表

设计一个MySQL数据库和Clickhouse数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较两条两个数据库中,一个数据库有,另一个数据库没有的记录,以id是否存在作为标准,还需要比较两边都有记录的所有数据字段是否都相同,如果有不相同指出这条数据的id,以及不相同数据所在字段的名称和在两个数据库中的值,分批比较时需要考虑所有可能的情况。

以下是一个详细的设计方案,用于比较MySQL数据库和ClickHouse数据仓库中结构相同的表,支持并行数据获取和分批比较:

系统设计

并行处理架构

主线程:负责数据比较
工作线程:负责预取下一批数据
使用队列实现线程间通信

关键组件

数据获取器(DataFetcher)
数据比较器(DataComparator)
结果处理器(ResultHandler)
批处理控制器(BatchController)

详细程序流程

import queue
import threading
from typing import Dict, List, Tuple, Optional

# 假设已实现数据库连接池
from db_connectors import MySQLPool, ClickHousePool

class BatchData:
    def __init__(self):
        self.mysql_data: Dict[int, dict] = {
            }  # {id: {field1:val1,...}}
        self.ch_data: Dict[int, dict] = {
            }
        self.last_id: int = 0  # 本批次最大ID

class DataComparator:
    def __init__(self, result_handler):
        self.result_handler = result_handler
    
    def compare_batch(self, batch: BatchData):
        # 1. 检查ID存在性差异
        mysql_ids = set(batch.mysql_data.keys())
        ch_ids = set(batch.ch_data.keys())
        
        # 1.1 MySQL有但ClickHouse缺失
        for mid in mysql_ids - ch_ids:
            self.result_handler.record_missing(mid, source="clickhouse")
        
        # 1.2 ClickHouse有但MySQL缺失
        for cid in ch_ids - mysql_ids:
            self.result_handler.record_missing(cid, source="mysql")
        
        # 2. 比较共有ID的字段值
        common_ids = mysql_ids & ch_ids
        for cid in common_ids:
            mysql_row = batch.mysql_data[cid]
            ch_row = batch.ch_data[cid]
            self._compare_rows(cid, mysql_row, ch_row)
    
    def _compare_rows(self, id: int, mysql_row: dict, ch_row: dict):
        # 跳过主键字段
        fields = set(mysql_row.keys()) - {
            "id"}
        for field in fields:
            mysql_val = mysql_row[field]
            ch_val = ch_row[field]
            
            # 处理类型差异(如MySQL的Decimal转Float)
            if isinstance(mysql_val, Decimal):
                mysql_val = float(mysql_val)
            if isinstance(ch_val, Decimal):
                ch_val = float(ch_val)
            
            # 特殊处理浮点数精度
            if isinstance(mysql_val, float) and isinstance(ch_val, float):
                if abs(mysql_val - ch_val) > 1e-9:
                    self.result_handler.record_mismatch(
                        id, field, mysql_val, ch_val
                    )
            elif mysql_val != ch_val:
                self.result_handler.record_mismatch(
                    id, field, mysql_val, ch_val
                )

class DataFetcher:
    BATCH_SIZE = 2000
    
    def __init__(self, mysql_pool, ch_pool):
        self.mysql_pool = mysql_pool
        self.ch_pool = ch_pool
    
    def fetch_batch(self, last_id: int) -> Optional[BatchData]:
        batch = BatchData()
        
        # 从MySQL获取数据
        with self.mysql_pool.connection() as conn:
            cursor = conn.cursor(dictionary=True)
            cursor.execute(
                "SELECT * FROM target_table "
                "WHERE id > %s ORDER BY id LIMIT %s",
                (last_id, self.BATCH_SIZE)
            )
            for row in cursor:
                batch.mysql_data[row['id']] = row
                batch.last_id = max(batch.last_id, row['id'])
        
        if not batch.mysql_data:
            return None  # 无更多数据
        
        # 从ClickHouse获取对应数据
        id_list = list(batch.mysql_data.keys())
        with self.ch_pool.connection() as conn:
            cursor = conn.cursor()
            query = f"""
            SELECT * FROM target_table 
            WHERE id IN ({
              ','.join(map(str, id_list))})
            """
            cursor.execute(query)
            for row in cursor.fetchall():
                # 假设row转为字典格式,与MySQL结构对齐
                row_dict = self._map_row_to_dict(cursor.description, row)
                batch.ch_data[row_dict['id']] = row_dict
        
        return batch
    
    def _map_row_to_dict(self, description, row) -> dict:
        """将ClickHouse行数据映射为字段名字典"""
        return {
            desc[0]: val for desc, val in zip(description, row)}

class ResultHandler:
    def record_missing(self, id: int, source: str):
        print(f"[MISSING] ID {
              id} missing in {
              source.upper()}")
        # 实际实现可写入文件/数据库
    
    def record_mismatch(self, id: int, field: str, mysql_val, ch_val):
        print(
            f"[MISMATCH] ID {
              id} Field {
              field}: "
            f"MySQL={
              mysql_val} vs CH={
              ch_val}"
        )
        # 实际实现可写入文件/数据库

class BatchController:
    def __init__(self, fetcher: DataFetcher, comparator: DataComparator):
        self.fetcher = fetcher
        self.comparator = comparator
        self.data_queue = queue.Queue(maxsize=2)  # 预存最多2个批次
        self.stop_event = threading.Event()
    
    def _prefetch_worker(self):
        """工作线程:预取数据到队列"""
        last_id = 0
        while not self.stop_event.is_set():
            batch = self.fetcher.fetch_batch(last_id)
            if batch is None:
                self.data_queue.put(None)  # 结束信号
                break
            
            self.data_queue.put(batch)
            last_id = batch.last_id
    
    def run_comparison(self):
        """主比较流程"""
        # 启动预取线程
        prefetch_thread = threading.Thread(target=self._prefetch_worker)
        prefetch_thread.start()
        
        processed_batches = 0
        try:
            while True:
                batch = self.data_queue.get()
                
                # 终止条件
                if batch is None:
                    print(f"Comparison completed. Total batches: {
              processed_batches}")
                    break
                
                # 执行比较
                self.comparator.compare_batch(batch)
                processed_batches += 1
                print(f"Processed batch #{
              processed_batches}, last_id={
              batch.last_id}")
        finally:
            self.stop_event.set()
            prefetch_thread.join()

# 主程序
if __name__ == "__main__":
    # 初始化组件
    mysql_pool = MySQLPool(config=mysql_config)
    ch_pool = ClickHousePool(config=ch_config)
    
    fetcher = DataFetcher(mysql_pool, ch_pool)
    result_handler = ResultHandler()
    comparator = DataComparator(result_handler)
    controller = BatchController(fetcher, comparator)
    
    # 执行比较
    controller.run_comparison()

关键处理逻辑说明

数据获取优化

MySQL:使用游标方式WHERE id > last_id ORDER BY id LIMIT 2000高效分页
ClickHouse:使用IN查询精确获取对应ID数据
类型转换:自动处理Decimal->Float等类型差异
浮点数比较:使用精度容差(1e-9)避免精度误差

并行处理流程

异常情况处理

数据缺失:精确记录缺失ID和所在系统
字段不匹配:记录ID、字段名和双方值
连接故障:数据库连接池自动重连
队列阻塞:设置超时机制防止死锁
线程安全:使用线程安全队列和事件通知

内存控制

每批最多4000条记录(MySQL+CH各2000)
队列最多缓存2个批次
及时释放已处理批次内存

特殊场景处理

ID不连续场景

使用游标追踪(last_id)而非传统分页
确保不遗漏任何ID区间

字段类型差异

在比较前统一类型(如Decimal转Float)
特殊处理日期/时间格式

大数据量优化

# 可配置的批处理大小
class DataFetcher:
    def __init__(self, batch_size=2000):
        self.BATCH_SIZE = batch_size

# 按需调整
if __name__ == "__main__":
    fetcher = DataFetcher(batch_size=5000)  # 大内存机器可增大批次

性能监控点

每批次处理时间
数据库查询耗时
队列等待时间
内存使用量

该设计通过双线程并行实现了:

数据获取与比较操作重叠执行
精确的差异检测(存在性和字段级)
内存高效使用
全量数据无遗漏比较
完善的异常处理机制

实际部署时建议添加:

断点续查功能(记录last_id到文件)
详细日志系统
邮件/短信报警机制
性能监控仪表盘

© 版权声明
THE END
如果内容对您有所帮助,就支持一下吧!
点赞0 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容