买卖平台有哪些网站,一家只做外卖的网站,app开发费用,seo查询软件分布式消息队列kafka【六】—— kafka整合数据同步神器canal 文章目录 分布式消息队列kafka【六】—— kafka整合数据同步神器canal数据同步-整体介绍数据同步-初识canalmysql主从数据同步过程canal数据同步过程canal优点canal缺点canal整体架构canal安装部署和组件 数据同步-c…分布式消息队列kafka【六】—— kafka整合数据同步神器canal文章目录分布式消息队列kafka【六】—— kafka整合数据同步神器canal数据同步-整体介绍数据同步-初识canalmysql主从数据同步过程canal数据同步过程canal优点canal缺点canal整体架构canal安装部署和组件数据同步-canal环境搭建mysql binlog开启canal环境搭建数据同步-Java操作canal详解数据同步-canal集成kafka配置详解前言canal.propertiesinstance.properties数据同步-kafka消费canal数据最终测试详解数据同步-整体介绍什么是数据同步数据同步的应用场景有哪些如何去选择数据同步技术数据同步的数据源选择适用场景比如mysql中按照哈希的算法分库分表的存储了订单数据未来可能按照某个维度比如城市作查询此时就需要遍历所有的数据库和表去查询比较麻烦。此时就可以采用数据同步的方案比如按照相同城市的数据存储一张表或者一个elasticsearch索引。传统的数据同步比如业务订单入库后消息发MQ消费者处理消息同步到elasticsearch中这种方式还是存在弊端的开始的订单入库可能失败此时消息发MQ并同步到elasticsearch中会导致数据不一致的问题。注意数据同步要基于已经确定100%入库的数据。数据同步-初识canal数据同步框架canalcanal是用java开发的基于数据库增量日志解析提供增量数据订阅和消费的中间件。目前canal主要支持了MySQL的binlog解析解析完成后才利用canal client 用来处理获得的相关数据。数据库同步需要阿里的otter中间件基于canalgit地址https://github.com/alibaba/canal当前的canal支持源端MySQL版本包括5.1x5.5x5.6x5.7x8.0xcanal基于日志增量数据订阅和消费的业务包括数据库镜像、数据库实时备份索引构建和实时维护拆分异构索引、倒排索引等业务cache刷新、带业务逻辑的增量数据处理mysql主从数据同步过程mysql的slave节点将master的Binary log事件copy到中继日志Relay log然后mysql的slave节点将中继日志Relay log的数据变更更新到自己的数据库上。canal数据同步过程canal会把自己伪装成一个slave节点然后实时读取master的Binary log的变更再在canal内部处理canal优点实时性好因为采用了Binary log的机制master节点上数据的更新、新增、删除都会实时的反馈到Binary log分布式采用zookeeper作分布式协调的框架实现高可用的机制即主备切换的概念ACK机制可以批量读取Binary log解析成功提交失败回滚下次还从上次提交的地方读取数据canal缺点只支持增量同步不支持全量同步支持MySQL-MySQLMySQL-ESMySQL-RDB数据同步过程中日志如果不完整的话没法收集运行过程中的指标对于整个数据同步的监控手段还不是特别完善需要业务方实时上报一个instance只能有一个消费端消费无法并行因为Binary log就是一个直支持主备模式不支持两边分布式的扩容单点压力过大canal整体架构一个server对应有1-n个InstanceEventParser数据源接入协议解析的组件EventSink拦截器解析数据后处理、过滤、加工EventStore存储MetaManager增量订阅和消费信息的管理器canal安装部署和组件canal安装部署https://github.com/alibaba/canal/releasescanal.adapter-1.1.4.tar.gz同步的适配器可以帮助作es等的适配一般不会选择因为真实的业务场景比这个要复杂canal.admin-1.1.4.tar.gzadmin控制台canal.deployer-1.1.4.tar.gzcanal服务canal.example-1.1.4.tar.gzcanal例子标准的canal和kafka集成并最终sink到es的处理流程数据同步-canal环境搭建mysql binlog开启对于自建 MySQL , 需要先开启 Binlog 写入功能配置 binlog-format 为 ROW 模式my.cnf 中配置如下## 准备工作## mysql配置修改文件vim/etc/my.cnf[mysqld]log-binmysql-bin# 开启 binlogbinlog-formatROW# 选择 ROW 模式server_id1# 配置 MySQL replaction 需要定义mysql的master和slave节点不能重复且不要和 canal 的 slaveId 重复## 重启服务## systemctl stop mariadb## systemctl start mariadbmysql -uroot -proot show variables like%log_bin%;## sql_log_bin ON 表示binlog已开启## 授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grantCREATEUSERroot IDENTIFIED BYroot;GRANT ALL PRIVILEGES ON *.* TOroot%IDENTIFIED BYrootWITH GRANT OPTION;## 新建用户授权 canal-- CREATEUSERcanal IDENTIFIED BYcanal;-- GRANT ALL PRIVILEGES ON *.* TOcanal%IDENTIFIED BYcanalWITH GRANT OPTION;-- GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TOcanal%;FLUSH PRIVILEGES;## mysql-bin的log日志存放目录cd/var/lib/mysqlcanal环境搭建## 创建文件夹并 解压 canalmkdir/usr/local/canaltar-zxvf canal.deployer-1.1.4.tar.gz -C /usr/local/canal/## 配置文件vim/usr/local/canal/conf/canal.properties## java程序连接端口canal.port11111vim/usr/local/canal/conf/example/instance.properties## 不能与已有的mysql节点server-id重复canal.instance.mysql.slaveId1001## mysql master的地址canal.instance.master.address192.168.11.31:3306## 修改内容如下:canal.instance.dbUsernameroot#指定连接mysql的用户密码canal.instance.dbPasswordroot canal.instance.connectionCharsetUTF-8#字符集## 启动canalcd/usr/local/canal/bin ./startup.sh## 验证服务cat/usr/local/canal/logs/canal/canal.log## 查看实例日志tail-f -n100/usr/local/canal/logs/example/example.log canal properties配置# position info canal.instance.master.address172.16.210.74:3306#指定要读取binlog的MySQL的IP地址和端口 canal.instance.master.journal.name#从指定的binlog文件开始读取数据 canal.instance.master.position#指定偏移量做过主从复制的应该都理解这两个参数。 #tipsbinlog和偏移量也可以不指定则canal-server会从当前的位置开始读取。我建议不设置 canal.instance.master.timestampcanal.instance.master.gtid# rds oss binlog canal.instance.rds.accesskeycanal.instance.rds.secretkeycanal.instance.rds.instanceId# table meta tsdb info canal.instance.tsdb.enabletrue#canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsernamecanal #canal.instance.tsdb.dbPasswordcanal #这几个参数是设置高可用配置的可以配置mysql从库的信息 #canal.instance.standby.address#canal.instance.standby.journal.name#canal.instance.standby.position#canal.instance.standby.timestamp#canal.instance.standby.gtid# username/password canal.instance.dbUsernamecanal #指定连接mysql的用户密码 canal.instance.dbPasswordcanal canal.instance.connectionCharsetUTF-8#字符集 # enable druid Decrypt database password canal.instance.enableDruidfalse#canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ# table regex #canal.instance.filter.regex.*\\..*canal.instance.filter.regexrisk.canal,risk.cwx #这个是比较重要的参数匹配库表白名单比如我只要test库的user表的增量数据则这样写 test.user # table black regex canal.instance.filter.black.regex# table fieldfilter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field blackfilter(format:schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topicexample # dynamic topic route by schema or table regex #canal.mq.dynamicTopicmytest1.user,mytest2\\..*,.*\\..*canal.mq.partition0# hash partition config #canal.mq.partitionsNum3#canal.mq.partitionHashtest.table:id^name,.*\\..*#################################################数据同步-Java操作canal详解packagecom.bfxy.canal.base;importjava.net.InetSocketAddress;importjava.util.List;importcom.alibaba.otter.canal.client.CanalConnector;importcom.alibaba.otter.canal.client.CanalConnectors;importcom.alibaba.otter.canal.protocol.CanalEntry.Column;importcom.alibaba.otter.canal.protocol.CanalEntry.Entry;importcom.alibaba.otter.canal.protocol.CanalEntry.EntryType;importcom.alibaba.otter.canal.protocol.CanalEntry.EventType;importcom.alibaba.otter.canal.protocol.CanalEntry.RowChange;importcom.alibaba.otter.canal.protocol.CanalEntry.RowData;importcom.alibaba.otter.canal.protocol.Message;/** * CanalTest */publicclassCanalTest{/** * 1. 连接Canal服务器 * 2. 向Master请求dump协议 * 3. 把发送过来的binlog进行解析 * 4. 最后做实际的数据处理...发送到MQ Print... * param args */publicstaticvoidmain(String[]args){CanalConnectorconnectorCanalConnectors.newSingleConnector(newInetSocketAddress(192.168.11.221,11111),example,root,root);intbatchSize1000;// 一次拉取数据量intemptyCount0;// 计数器try{// 连接我们的canal服务器connector.connect();// 订阅什么内容什么库表的内容// .*\..*代表订阅上面连接的这个canal下的全部库表的数据connector.subscribe(.*\\..*);// 出现问题直接进行回滚操作connector.rollback();inttotalEmptyCount1200;// 总拉取数据量while(emptyCounttotalEmptyCount){Messagemessageconnector.getWithoutAck(batchSize);// 用于处理完数据后进行ACK提交动作longbatchIdmessage.getId();intsizemessage.getEntries().size();// 没拉取到数据if(batchId-1||size0){emptyCount;System.err.println(empty count: emptyCount);try{Thread.sleep(1000);}catch(InterruptedExceptione){// ignore..}}else{emptyCount0;System.err.printf(message[batchId%s, size%s] \n,batchId,size);// 处理解析数据printEntry(message.getEntries());}// 确认提交处理后的数据connector.ack(batchId);}System.err.println(empty too many times, exit);}finally{// 关闭连接connector.disconnect();}}privatestaticvoidprintEntry(ListEntryentries){// 可以将一个个entry理解为一个个事件/记录for(Entryentry:entries){// 如果EntryType 当前处于事务的过程中 那就不能处理if(entry.getEntryType()EntryType.TRANSACTIONBEGIN||entry.getEntryType()EntryType.TRANSACTIONEND){continue;}// RowChange里面包含很多信息存储数据库、表、binlogRowChangercnull;try{// entry.getStoreValue()是二进制的数据rcRowChange.parseFrom(entry.getStoreValue());}catch(Exceptione){thrownewRuntimeException(parser error!);}EventTypeeventTyperc.getEventType();System.err.println(String.format(binlog[%s:%s], name[%s,%s], eventType : %s,entry.getHeader().getLogfileName(),// binlog文件名称entry.getHeader().getLogfileOffset(),// binlog当前位置entry.getHeader().getSchemaName(),// 库名entry.getHeader().getTableName(),// 表名eventType));// 真正的对数据进行处理for(RowDatard:rc.getRowDatasList()){if(eventTypeEventType.DELETE){// delete操作获取BeforeColumnsList得到删除之前的数据ListColumndeleteListrd.getBeforeColumnsList();printColumn(deleteList);}elseif(eventTypeEventType.INSERT){// insert操作获取AfterColumnsList得到新增之后的数据ListColumninsertListrd.getAfterColumnsList();printColumn(insertList);}// updateelse{ListColumnupdateBeforeListrd.getBeforeColumnsList();printColumn(updateBeforeList);ListColumnupdateAfterListrd.getAfterColumnsList();printColumn(updateAfterList);}}}}privatestaticvoidprintColumn(ListColumncolumns){for(Columncolumn:columns){System.err.println(column.getName() : column.getValue(), update column.getUpdated());}}}数据同步-canal集成kafka配置详解前言直接用java解析canal的数据性能不是特别好因为这样是单线程的无任何消息堆积能力不适合高并发的场景。一般来说我们把mysql-bin.log直连到java去解析会先把数据发到kafka通过消费者解析数据。canal.properties################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip # register ip to zookeeper canal.register.ip canal.port 11111 canal.metrics.pull.port 11112 # canal instance user/passwd # canal.user canal # canal.passwd E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager 127.0.0.1:8089 canal.admin.port 11110 canal.admin.user admin canal.admin.passwd 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.zkServers # flush data to zk canal.zookeeper.flush.period 1000 canal.withoutNetty false # tcp, kafka, RocketMQ canal.serverMode tcp # flush meta cursor/parse position to file canal.file.data.dir ${canal.conf.dir} canal.file.flush.period 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode MEMSIZE canal.instance.memory.rawEntry true ## detecing config canal.instance.detecting.enable false #canal.instance.detecting.sql insert into retl.xdual values(1,now()) on duplicate key update xnow() canal.instance.detecting.sql select 1 canal.instance.detecting.interval.time 3 canal.instance.detecting.retry.threshold 3 canal.instance.detecting.heartbeatHaEnable false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds 60 # network config canal.instance.network.receiveBufferSize 16384 canal.instance.network.sendBufferSize 16384 canal.instance.network.soTimeout 30 # binlog filter config canal.instance.filter.druid.ddl true canal.instance.filter.query.dcl false canal.instance.filter.query.dml false canal.instance.filter.query.ddl false canal.instance.filter.table.error false canal.instance.filter.rows false canal.instance.filter.transaction.entry false # binlog format/image check canal.instance.binlog.format ROW,STATEMENT,MIXED canal.instance.binlog.image FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation false # parallel parser config canal.instance.parser.parallel true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize 256 # table meta tsdb info canal.instance.tsdb.enable true canal.instance.tsdb.dir ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE1000;MODEMYSQL; canal.instance.tsdb.dbUsername canal canal.instance.tsdb.dbPassword canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey canal.aliyun.secretKey ################################################# ######### destinations ############# ################################################# canal.destinations example # conf root dir canal.conf.dir ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan true canal.auto.scan.interval 5 canal.instance.tsdb.spring.xml classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode spring canal.instance.global.lazy false canal.instance.global.manager.address ${canal.admin.manager} #canal.instance.global.spring.xml classpath:spring/memory-instance.xml canal.instance.global.spring.xml classpath:spring/file-instance.xml #canal.instance.global.spring.xml classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## canal.serverMode kafka canal.mq.servers 192.168.11.51:9092 canal.mq.retries 0 canal.mq.batchSize 16384 canal.mq.maxRequestSize 1048576 canal.mq.lingerMs 100 canal.mq.bufferMemory 33554432 canal.mq.canalBatchSize 50 canal.mq.canalGetTimeout 100 canal.mq.flatMessage true canal.mq.compressionType none canal.mq.acks all #canal.mq.properties. canal.mq.producerGroup test # Set this value to cloud, if you want open message trace feature in aliyun. canal.mq.accessChannel local # aliyun mq namespace #canal.mq.namespace ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable false canal.mq.kafka.kerberos.krb5FilePath ../conf/kerberos/krb5.conf canal.mq.kafka.kerberos.jaasFilePath ../conf/kerberos/jaas.confinstance.properties################################################# ## mysql serverId , v1.0.26 will autoGen canal.instance.mysql.slaveId1001 # enable gtid use true/false canal.instance.gtidonfalse # position info canal.instance.master.address192.168.11.31:3306 canal.instance.master.journal.name canal.instance.master.position canal.instance.master.timestamp canal.instance.master.gtid # rds oss binlog canal.instance.rds.accesskey canal.instance.rds.secretkey canal.instance.rds.instanceId # table meta tsdb info canal.instance.tsdb.enabletrue #canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsernamecanal #canal.instance.tsdb.dbPasswordcanal #canal.instance.standby.address #canal.instance.standby.journal.name #canal.instance.standby.position #canal.instance.standby.timestamp #canal.instance.standby.gtid # username/password canal.instance.dbUsernameroot canal.instance.dbPasswordroot canal.instance.connectionCharset UTF-8 # enable druid Decrypt database password canal.instance.enableDruidfalse #canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ # table regex # 数据过滤.*\\..*代表不过滤 canal.instance.filter.regex.*\\..* # table black regex canal.instance.filter.black.regex # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config # canal.mq.topicexample # dynamic topic route by schema or table regex # 动态生成topic canal.mq.dynamicTopic.*\\..* #canal.mq.partition0 # hash partition config # partitions数需要和正常topic建立的默认partitions一致 canal.mq.partitionsNum5 # 定义发到topic哪个分区的规则表主键hash策略 canal.mq.partitionHash.*\\..*:$pk$ #################################################数据同步-kafka消费canal数据最终测试详解packagecom.bfxy.canal.kafka;importjava.time.Duration;importjava.util.Collections;importjava.util.List;importjava.util.Properties;importorg.apache.kafka.clients.consumer.ConsumerConfig;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.KafkaConsumer;importorg.apache.kafka.clients.consumer.OffsetAndMetadata;importorg.apache.kafka.common.TopicPartition;/** * 直接用java解析canal的数据性能不是特别好因为这样是单线程的无任何消息堆积能力不适合高并发的场景。 * 一般来说我们把mysql-bin.log直连到java去解析会先把数据发到kafka通过消费者解析数据。 */publicclassCollectKafkaConsumer{privatefinalKafkaConsumerString,Stringconsumer;privatefinalStringtopic;publicCollectKafkaConsumer(Stringtopic){PropertiespropsnewProperties();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,192.168.11.221:9092);props.put(ConsumerConfig.GROUP_ID_CONFIG,demo-group-id);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,latest);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);consumernewKafkaConsumer(props);this.topictopic;consumer.subscribe(Collections.singletonList(topic));}privatevoidreceive(KafkaConsumerString,Stringconsumer){while(true){// 拉取结果集ConsumerRecordsString,Stringrecordsconsumer.poll(Duration.ofSeconds(1));for(TopicPartitionpartition:records.partitions()){ListConsumerRecordString,StringpartitionRecordsrecords.records(partition);Stringtopicpartition.topic();intsizepartitionRecords.size();// 获取topic: test-db.demo, 分区位置: 2, 消息数为:1System.err.println(获取topic: topic, 分区位置: partition.partition(), 消息数为:size);for(inti0;isize;i){/** * { * ---- data:[{id:010,name:z100,age:35}], * ---- database:test-db, * es:1605269364000, * ---- id:2, * isDdl:false, * ---- mysqlType:{id:varchar(32),name:varchar(40),age:int(8)}, * ---- old:[{name:z10,age:32}], * ---- pkNames:[id], * sql:, * sqlType:{id:12,name:12,age:4}, * ---- table:demo, * ---- ts:1605269365135, * ---- type:UPDATE} */System.err.println(----- value: partitionRecords.get(i).value());longoffsetpartitionRecords.get(i).offset()1;// consumer.commitSync();consumer.commitSync(Collections.singletonMap(partition,newOffsetAndMetadata(offset)));System.err.println(同步成功, topic: topic, 提交的 offset: offset);}//System.err.println(msgList: msgList);}}}publicstaticvoidmain(String[]args){Stringtopictest-db.demo;CollectKafkaConsumercollectKafkaConsumernewCollectKafkaConsumer(topic);collectKafkaConsumer.receive(collectKafkaConsumer.consumer);}}