社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  DATABASE

MySQL同步ES的5种方案!

码小辫 • 昨天 • 10 次点击  

码小辫 丝认证

点击关注后,你不仅获得一个找资源的工具,更获得一个有趣的灵魂  ▶  ▶  ▶


前言

有些小伙伴在工作中可能遇到过数据库查询慢的问题,特别是模糊查询和复杂聚合查询,这时候引入ES(Elasticsearch)作为搜索引擎是个不错的选择。

今天我们来聊聊MySQL同步到ES(Elasticsearch)的5种常见方案。

希望对你会有所帮助。

一、为什么需要MySQL同步到ES?

在我们深入讨论方案之前,先明确一下为什么需要将MySQL数据同步到ES:

  1. 全文搜索能力:ES提供强大的全文搜索功能,远超MySQL的LIKE查询。
  2. 复杂聚合分析:ES支持复杂的聚合查询,适合大数据分析。
  3. 高性能查询:ES的倒排索引设计使查询速度极快。
  4. 水平扩展:ES天生支持分布式,易于水平扩展。

先来看一下整体的同步架构图:

接下来,我们详细分析每种方案的实现原理和优缺点。

二、方案一:双写方案

双写方案是最直接的同步方式,即在业务代码中同时向MySQL和ES写入数据。

示例代码:

@Service
public class UserService {
    @Autowired
    private UserMapper userMapper;
    
    @Autowired
    private ElasticsearchTemplate elasticsearchTemplate;
    
    @Transactional
    public void addUser(User user) {
        // 写入MySQL
        userMapper.insert(user);
        
        // 写入Elasticsearch
        IndexQuery indexQuery = new IndexQueryBuilder()
            .withObject(user)
            .withId(user.getId().toString())
            .build();
        elasticsearchTemplate.index(indexQuery);
    }
    
    @Transactional
    public void updateUser(User user) {
        // 更新MySQL
        userMapper.updateById(user);
        
        // 更新Elasticsearch
        IndexRequest request = new IndexRequest("user_index")
            .id(user.getId().toString())
            .source(JSON.toJSONString(user), XContentType.JSON);
        elasticsearchTemplate.getClient().index(request, RequestOptions.DEFAULT);
    }
}

优缺点分析

优点:

  • 实现简单,不需要引入额外组件
  • 实时性高,数据立即同步

缺点:

  • 数据一致性难保证,需要处理分布式事务问题
  • 代码侵入性强,业务逻辑复杂
  • 性能受影响,每次写操作都要等待ES响应

适用场景

适合数据量不大,对实时性要求高,且能够接受一定数据不一致的业务场景。

三、方案二:定时任务方案

定时任务方案通过定期扫描MySQL数据变化来同步到ES。

示例代码:

@Component
public class UserSyncTask {
    @Autowired
    private UserMapper userMapper;
    
    @Autowired
    private UserESRepository userESRepository;
    
    // 每5分钟执行一次
    @Scheduled(fixedRate = 5 * 60 * 1000)
    public void syncUserToES() {
        // 查询最近更新的数据
        Date lastSyncTime = getLastSyncTime();
        List updatedUsers = userMapper.selectUpdatedAfter(lastSyncTime);
        
        // 同步到ES
        for (User user : updatedUsers) {
            userESRepository.save(user);
        }
        
        // 更新最后同步时间
        updateLastSyncTime(new Date());
    }
    
    // 获取最后同步时间
    private Date getLastSyncTime() {
        // 从数据库或Redis中获取
        // ...
    }
}

数据更新追踪策略

为了提高同步效率,通常需要设计良好的数据变更追踪机制:

优缺点分析

优点:

  • 实现简单,不需要修改现有业务代码
  • 对数据库压力可控,可以调整同步频率

缺点:

  • 实时性差,数据同步有延迟
  • 可能遗漏数据,如果系统崩溃会丢失部分数据
  • 扫描全表可能对数据库造成压力

适用场景

适合对实时性要求不高,数据变更不频繁的场景。

四、方案三:Binlog同步方案

Binlog是MySQL的二进制日志,记录了所有数据变更操作。

通过解析Binlog可以实现数据同步。

示例代码:

public class BinlogSyncService {
    public void startSync() {
        BinaryLogClient client = new BinaryLogClient("localhost"3306"username""password");
        
        client.registerEventListener(new BinaryLogClient.EventListener() {
            @Override
            public void onEvent(Event event) {
                EventData eventData = event.getData();
                
                if (eventData instanceof WriteRowsEventData) {
                    // 插入操作
                    WriteRowsEventData writeData = (WriteRowsEventData) eventData;
                    processInsertEvent(writeData);
                } elseif (eventData instanceof UpdateRowsEventData) {
                    // 更新操作
                    UpdateRowsEventData updateData = (UpdateRowsEventData) eventData;
                    processUpdateEvent(updateData);
                } elseif (eventData instanceof DeleteRowsEventData) {
                    // 删除操作
                    DeleteRowsEventData deleteData = (DeleteRowsEventData) eventData;
                    processDeleteEvent(deleteData);
                }
            }
        });
        
        client.connect();
    }
    
    private void processInsertEvent(WriteRowsEventData eventData) {
        // 处理插入事件,同步到ES
        for (Serializable[] row : eventData.getRows()) {
            User user = convertRowToUser(row);
            syncToElasticsearch(user, "insert");
        }
    }
    
    private void syncToElasticsearch(User user, String operation) {
        // 同步到ES的实现
        // ...
    }
}

优缺点分析

优点:

  • 实时性高,几乎实时同步
  • 对业务代码无侵入,不需要修改现有代码
  • 性能好,不影响数据库性能

缺点:

  • 实现复杂,需要解析Binlog格式
  • 需要考虑Binlog格式变更的兼容性问题
  • 主从切换时可能需要重新同步

适用场景

适合对实时性要求高,数据量大的场景。

五、方案四:Canal方案

Canal是阿里巴巴开源的MySQL Binlog增量订阅&消费组件,简化了Binlog同步的复杂性。

示例代码:

# canal.properties 配置
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=username
canal.instance.dbPassword=password
canal.instance.connectionCharset=UTF-8
canal.instance.filter.regex =.*\\..*
public class CanalClientExample {
    public static void main(String[] args) {
        // 创建Canal连接
        CanalConnector connector = CanalConnectors.newSingleConnector(
            new InetSocketAddress("127.0.0.1"11111), "example""""");
        
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            
            while (true) {
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                
                if (batchId != -1 && !message.getEntries().isEmpty()) {
                    processEntries(message.getEntries());
                    connector.ack(batchId); // 提交确认
                }
                
                Thread.sleep(1000);
            }
        } finally {
            connector.disconnect();
        }
    }
    
    private static void processEntries(List entries) {
        for (CanalEntry.Entry entry : entries) {
            if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                
                for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
                    if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
                        processInsert(rowData);
                    } elseif (rowChange.getEventType() == CanalEntry.EventType.UPDATE) {
                        processUpdate(rowData);
                    } elseif (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
                        processDelete(rowData);
                    }
                }
            }
        }
    }
}

架构设计

Calan方案的架构如下:

优缺点分析

优点:

  • 实时性高,延迟低
  • 对业务系统无侵入
  • 阿里巴巴开源项目,社区活跃

缺点:

  • 需要部署维护Canal服务器
  • 需要处理网络分区和故障恢复
  • 可能产生数据重复同步问题

适用场景

适合大数据量、高实时性要求的场景,且有专门团队维护中间件。

六、方案五:MQ异步方案

MQ异步方案通过消息队列解耦MySQL和ES的同步过程,提高系统的可靠性和扩展性。

示例代码:

@Service
public class UserService {
    @Autowired
    private UserMapper userMapper;
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    
    @Transactional
    public void addUser(User user) {
        // 写入MySQL
        userMapper.insert(user);
        
        // 发送消息到MQ
        rabbitTemplate.convertAndSend("user.exchange""user.add", user);
    }
    
    @Transactional
    public void updateUser(User user) {
        // 更新MySQL
        userMapper.updateById(user);
        
        // 发送消息到MQ
        rabbitTemplate.convertAndSend("user.exchange""user.update", user);
    }
}

@Component
public class UserMQConsumer {
    @Autowired
    private UserESRepository userESRepository;
    
    @RabbitListener(queues = "user.queue")
    public void processUserAdd(User user) {
        userESRepository.save(user);
    }
    
    @RabbitListener(queues = "user.queue")
    public void  processUserUpdate(User user) {
        userESRepository.save(user);
    }
    
    @RabbitListener(queues = "user.queue")
    public void processUserDelete(Long userId) {
        userESRepository.deleteById(userId);
    }
}

消息队列选型对比

不同的消息队列产品有不同特点,下面是常见MQ的对比:

优缺点分析

优点:

  • 完全解耦,MySQL和ES同步过程相互独立
  • 高可用,MQ本身提供消息持久化和重试机制
  • 可扩展,可以方便地增加消费者处理消息

缺点:

  • 系统复杂度增加,需要维护MQ集群
  • 可能产生消息顺序问题,需要处理消息顺序性
  • 数据一致性延迟,依赖于消息消费速度

适用场景

适合大型分布式系统,对可靠性和扩展性要求高的场景。

七、5种方案对比

为了更直观地比较这5种方案,我们来看一个综合对比表格:

方案名称
实时性
数据一致性
系统复杂度
性能影响
适用场景
双写方案
难保证
小规模应用
定时任务
最终一致
非实时场景
Binlog方案
最终一致
大数据量高实时
Canal方案
最终一致
大数据量高实时
MQ异步方案
最终一致
分布式大型系统

选择建议

有些小伙伴在工作中可能会纠结选择哪种方案,这里给出一些建议:

  1. 初创项目或小规模系统:可以选择双写方案或定时任务方案,实现简单。
  2. 中大型系统:建议使用Canal方案或MQ异步方案,保证系统的可靠性和扩展性。
  3. 大数据量高实时要求:Binlog方案或Canal方案是最佳选择。
  4. 已有MQ基础设施:优先考虑MQ异步方案,充分利用现有资源。

注意事项

无论选择哪种方案,都需要注意以下几点:

  1. 幂等性处理:同步过程需要保证幂等性,防止重复数据。
  2. 监控告警:建立完善的监控体系,及时发现同步延迟或失败。
  3. 数据校验:定期校验MySQL和ES中的数据一致性。
  4. 容错机制:设计良好的故障恢复机制,避免数据丢失。

总结

MySQL同步到ES(Elasticsearch)是现代应用开发中常见的需求,选择合适的同步方案对系统性能和可靠性至关重要。

本文介绍了5种常见方案,各有优缺点,适用于不同场景。

在实际项目中,可能需要根据具体需求组合使用多种方案,或者对某种方案进行定制化改造。

重要的是要理解每种方案的原理和特点,才能做出合理的技术选型。

希望这篇文章对大家有所帮助,如果有任何问题或建议,欢迎在评论区留言讨论!


高质量原创回顾


中国最难入职的IT公司...


马斯克发私信让美女网红给他生娃,被拒以后切断了网红每两周2.1万美元广告收入?


外包同事参加聚餐,刚到包间门口,就听主管说:都自己人,以后有啥脏活累活都给外包,他们本就是服务咱的,不要不好意思


离职交接后,线上出现bug,接口是自己开发的,n+1补偿被追回了


听说互联网不卡35岁了?


程序员为保饭碗,开始“防御性编程”.....


字节这波操作,直接改变了大家的习惯?


华为员工爆料:OD大部分人是入职即巅峰!D4级,除了升D5时月薪涨了3k,其他时候从来没涨过

图片


别忘了点分享、收藏、在看、点赞哦!


图片

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/186904
 
10 次点击