canal/flume + kafka在实时数据采集中的使用


Flume不会复制消息,因此即使使用可靠的文件渠道,当Flume进程宕机后,你就无法访问这些消息了(当然Flume进程重启,从磁盘上恢复之前状态后,可以继续对消息进行处理)。因此如果对 HA高可用性具有很高要求,我们建议Kafka;


Flume是一个海量日志采集、聚合和传输的系统,支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume提供对数据进行简单处理,并写到各种数据接受方的能力。Flume以流方式处理数据,可作为代理持续运行。当新的数据可用时,Flume能够立即获取数据并输出至目标,这样就可以在很大程度上解决实时性问题。


利用Flume采集关系数据库表数据最大的优点是配置简单,不用编程。相比tungsten-replicator的复杂性,Flume只要在flume.conf文件中配置source、channel及sink的相关属性,已经没什么难度了。


  • 在源库上执行了查询,具有入侵性。
  • 通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。
  • 只能识别新增数据,检测不到删除与更新。
  • 要求源库必须有用于表示增量的字段。

Canel,Databus,Puma等,这些都是需要部署server和client的。其中server端是由这些工具实现,配置了就可以读binlog,而client端是需要我们动手编写程序的,远没有达到我即插即用的期望和懒人的标准。

再来看看flume,只需要写一个配置文件,就可以完成数据同步的操作。官网:http://flume.apache.org/FlumeUserGuide.html#flume-sources。它的数据源默认是没有读取binlog日志实现的,也没有读数据库表的官方实现,只能用开源的自定义source:https://github.com/keedio/flume-ng-sql-source


flume需要考虑同步的格式,原插件flume-ng-sql-source只支持csv的格式


如果使用到大数据的技术,flume更好。Flume 和 Kafka 可以很好地结合起来使用。如果你的设计需要从 Kafka 到 Hadoop 的流数据,使用 Flume 代理并配置 Kafka 的 Source 读取数据也是可行的:你没有必要实现自己的消费者。你可以直接利用Flume 与HDFS 及HBase 的结合的所有好处。


Flume 采集日志是通过流的方式直接将日志收集到存储层,而 kafka 是将缓存在 kafka集群,待后期可以采集到存储层。
Flume 采集中间停了,可以采用文件的方式记录之前的日志,而 kafka 是采用 offset 的方式记录之前的日志。


数据怎么采集到 Kafka,实现方式?
使用官方提供的 flumeKafka 插件,插件的实现方式是自定义了 flume 的 sink,将数据从channle 中取出,通过 kafka 的producer 写入到 kafka 中,可以自定义分区等。


Flume 采集数据会丢失吗?

不会,因为 channel 可以存储在 file 中,而且 flume 本身是有事务的。
可以做 sink 组,一个坏掉了,就用另一个。


flume 有哪些组件,flume 的 source、channel、sink 具体是做什么的?

1)source:用于采集数据,Source 是产生数据流的地方,同时 Source 会将产生的数据
流传输到 Channel,这个有点类似于 Java IO 部分的 Channel。
2)channel:用于桥接 Sources 和 Sinks,类似于一个队列。
3)sink:从 Channel 收集数据,将数据写到目标源(可以是下一个 Source,也可以是 HDFS或者 HBase)。

source :搜集数据
channel :数据缓存
sink :把数据发送到目的地
常用 source 类型 :
1、 监控文件 :exec
2、监控目录 :spooldir

使用canal做数据备份而不用mysql自带的主从备份的场景主要为:

跨数据库的数据备份,例如mysql => oracle
数据异构,即对同一份数据做不同的分库分表查询。例如卖家和买家各自分库索引

canal将自己伪装成mysql的从库,从主库那里消费并解析binlog,通过日志来保持数据的一致性。

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

增量同步和bai全量同步是数du据库同步的两zhi种方式。全量同步是一次dao性同步全部数据zhuan,增量同步则只shu同步两个数据库不同的部分。


1.Canal连接到A数据库,模拟slave

2.canal-client与Canal建立连接,并订阅对应的数据库表

3.A数据库发生变更写入到binlog,Canal向数据库发送dump请求,获取binlog并解析,发送解析后的数据给canal-client

4.canal-client收到数据,将数据同步到新的数据库


为了高可用和更高的性能,我们会创建多个canal-client构成一个集群,来进行解析并同步到新的数据库。为了保证canal-client集群解析消费binlog的顺序性,通常会选择canal + kafka 的方案


我们配置了kafka的partitionHash,并且我们一个Topic就是一个表。这样的效果就是,一个表的数据只会推到一个固定的partition中,然后再推给consumer进行消费处理,同步到新的数据库。通过这种方式,解决了之前碰到的binlog日志顺序处理的问题。这样即使我们部署了多个kafka consumer端,构成一个集群,这样consumer从一个partition消费消息,就是消费处理同一个表的数据。这样对于一个表来说,牺牲掉了并行处理,不过个人觉得,凭借kafka的性能强大的处理架构,我们的业务在kafka这个节点产生瓶颈并不容易。并且我们的业务目的不是实时一致性,在一定延迟下,两个数据库保证最终一致性。


这篇文章大部分篇幅用于介绍其他中间件是怎么部署的,这个问题侧面说明了Canal本身部署并不复杂,它的配置文件属性项比较多,但是实际上需要自定义和改动的配置项是比较少的,也就是说明了它的运维成本和学习成本并不高。


数据落地

采集之后必然需要将数据落地,即存储层,常见的有:

  • MYSQL、Oracle
  • Hive、Hdfs
  • HBase
  • Redis
  • ElasticSearch
  • MongoDB

需要说明的是,数据采集之后往往会先发送给Kafka这种消息队列,然后才真正落地到各种存储层中。

数据汇聚设计原则

从中台的角度来考虑,数据汇聚层的设计需要考虑几个关键的因素:

  • 设计之初就应该考虑支持各类数据源 ,支持不同来源、不同类型的数据源。数据汇聚层不是为某一种数据而生的,应该做到通用化。
  • 需要支持不同时间窗口的数据采集,实时的、非实时的、历史的。
  • 操作友好简单,即使是不懂技术的人,也可以方便的操作,进行数据同步;举例mysql同步到hive,你不应该让用户去填写复杂的sqoop任务参数,而是只需要选择源表和目的表,其他事情都交给中台去完成。
  • 合理选择存储层,不同数据源应存储在不同的地方,比如日志数据肯定不适合mysql。

后端实时数据,实时接入mysql。为了不影响线上系统的正常使用,同时能够将数据发送到大数据平台,本项目使用Canal来解析实时数据,Flume收集数据并数据发送到实时计算业务流程和离线计算两个流程中。实时数据处理流程使用Canal+Flume+Kafka+SparkStreaming等技术。


一些文章

使用canal+Kafka进行数据库同步实践:https://zhuanlan.zhihu.com/p/154412917

基于Canal和Kafka实现MySQL的Binlog近实时同步:https://zhuanlan.zhihu.com/p/115586732

Canal+Kafka实现MySQL与Redis数据同步:https://zhuanlan.zhihu.com/p/186035586

大数据环境下该如何优雅地设计数据分层:https://zhuanlan.zhihu.com/p/27395332

日志采集系统flume和kafka有什么区别及联系,它们分别在什么时候使用,什么时候又可以结合?:https://www.zhihu.com/question/36688175

canal入门到实战及面试:https://www.cnblogs.com/huanghanyu/articles/12874376.html

canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka: https://segmentfault.com/a/1190000024443393

Flume+Kafka双剑合璧玩转大数据平台日志采集:https://zhuanlan.zhihu.com/p/52087481

Flume+Kafka+Storm+Redis构建大数据实时处理系统:https://zhuanlan.zhihu.com/p/50537056

Flume 日志采集、聚合和传输:https://juejin.cn/post/6844904199063339021

分布式日志收集框架 Flume:https://juejin.cn/post/6844903944141930510

为什么选择Canal + Flume + Kafka 架构而不是Canal + Kafka架构?:https://blog.csdn.net/u012965373/article/details/96314562


flume+kafka的架构:

  • 在数据处理方面:Flume能够对数据进行简单处理,并且具有写道各种数据接受方的能力,这能实现我们项目中——需要支持更多类型的数据库
    • 在一系列博客中,我发现flume+kafka的架构更多用在大数据平台上,比如将数据同步到hadoop生态圈,hdfs,hive,hbase等。所以如果考虑到大数据的应用,可以选择flume框架。
  • 在配置方面:Flume采集关系数据库表数据最大的优点是配置简单,不用编程。
    • flume只要在flume.conf文件中配置source、channel及sink的相关属性,相比canal更简单。
    • 不过flume的数据源默认是没有读取binlog日志实现的,也没有读数据库表的官方实现,只能用开源的自定义source:https://github.com/keedio/flume-ng-sql-source
  • 和kafka的配合方面:
    • Flume 采集日志是通过流的方式直接将日志收集到存储层,而 kafka 是将缓存在 kafka集群,待后期可以采集到存储层。
    • Flume 采集中间停了,可以采用文件的方式记录之前的日志,而 kafka 是采用 offset 的方式记录之前的日志。
    • 两者可以形成和好的互补。
    • flume和kafka的配合主要体现在他的sink组件
      • sink:从 Channel 收集数据,将数据写到目标源(可以是下一个 Source,也可以是 HDFS或者 HBase)。
      • 使用官方提供的 flumeKafka 插件,插件的实现方式是自定义了 flume 的 sink,将数据从channle 中取出,通过 kafka 的producer 写入到 kafka 中,可以自定义分区等。
  • 缺点:
    • 在源库上执行了查询,具有入侵性。
    • 通过轮询的方式实现增量,只能做到准实时,而且轮询间隔越短,对源库的影响越大。
    • 只能识别新增数据,检测不到删除与更新。
    • 要求源库必须有用于表示增量的字段。

对于flume+kafka我的个人观点:

  • flume+kafka的架构我在博客中更多的看到是在大数据中的应用,因为flume本身是大数据的框架。如果考虑到大数据技术的应用或决定使用大数据生态圈的架构,可以使用flume+kafka来实时采集mysql数据。但是flume是做不到实时增量同步的,只能说是准实时,不过也能满足大部分的需求了。
  • 同时flume相比起canal来说配置简单,对比canal不用搭建主库从库,还可以将应用产生的数据存储到任何集中存储器中。Flume的管道是基于事务,保证了数据在传送和接收时的一致性。
  • 但是如果字段删除或者更新频繁,flume+kafka的效果可能不好。而且在对mysql等关系型数据库、redis、es等,canal似乎更被广泛接受(目前主流)

canal+kafka的架构:

Canal+Kafka实现数据增量同步是目前比较常见的做法。

  • 在数据处理方面:canal主要用途是基于MySQL数据库增量日志解析,提供增量数据订阅和消费。比如,B服务数据库的数据来源于A服务的数据库;A服务的数据有变更操作时,需要同步到B服务中。使用canal+kafka目的就是通过数据库的binlog进行同步。这种解决方案,与A服务是独立的,不会和A服务有代码上的耦合。可以直接TCP连接进行传输数据,优于接口调用的方式。
  • 在结构化数据和非结构化数据的数据同步工作中canal有很好的表现,项目的改进需求中需要支持更多类型的数据库需要支持非结构化的数据同步这两项canal都可以很好的适应。
  • 在配置方面:为了高可用和更高的性能,我们会创建多个canal-client构成一个集群,来进行解析并同步到新的数据库。为了保证canal-client集群解析消费binlog的顺序性,通常会选择canal + kafka 的方案,canal同步数据库的步骤有:
    1. Canal连接到A数据库,模拟slave
    2. canal-client与Canal建立连接,并订阅对应的数据库表
    3. A数据库发生变更写入到binlog,Canal向数据库发送dump请求,获取binlog并解析,发送解析后的数据给canal-client
    4. canal-client收到数据,将数据同步到新的数据库
    • 所以说:canal将自己伪装成mysql的从库,从主库那里消费并解析binlog,通过日志来保持数据的一致性。
  • 和kafka的配合方面:canal动态监控Mysql,将binlog日志解析后,把采集到的数据发送到Kafka: https://segmentfault.com/a/1190000024443393是一个很好的例子
    • 通过不算复杂的配置可以直接将binlog投递到Kafka,无需再自己写producer程序
    • 需要注意kafka和canal的版本
    • 比如在采集msyql数据到es的应用中:canal负责解析MySQL的binlog日志,并将其解析后的数据封装成特定的对象放到Kafka中; kafka通过编写Kafka消费者,消费对应的业务数据,将消费的数据通过ES存储API,通过创建对应的索引的,存储到ES中,这部分负责消费存放在Kafka中的消息,当消费方拿到具体的用户表变更消息时,将最新的用户信息存放到ES数据仓库中。

对于canal+kafka我的个人观点:

canal目前主要支持了mysql,如果同时也支持将一些数据写入到大数据生态圈的数据库中。他主要是将自己伪装成mysql的从库,从主库那里消费并解析binlog,通过日志来保持数据的一致性。这也说明我们在使用canal+kafka的过程中很可能需要搭建集群。因为canal也是分布式集群项目中同步DB数据的解决方案。相比于flume来说配置和搭建可能显得更加复杂,不过也比flume灵活很多。如果我们同步的数据主要在关系型数据库、es、redis、hbase、mongo等,canal有更多经验,我觉得选择canal更好。同样,和flume一样也是近实时不是完全实时(大概1-2s的误差)。

我觉得如果两个技术都可以采用的话,canal+flume+kafka可以提高扩展性,canal 只是模拟 MySQL 中的 从库“骗”日志,其他数据库源的采集可能要用到YuGong这个框架,所以数据源太多种的话可以使用flume来统一数据源,也不失canal的灵活性。


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM