当前位置: 首页 > news >正文

赞助网站怎么做360站长

赞助网站怎么做,360站长,网站建设 好公司,做汽车网站开题报告的意义使用前提: 一、mysql开启了logibin 在mysql的安装路径下的my.ini中 【mysqlid】下 添加 log-binmysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 参考gitee的项目…

使用前提:

一、mysql开启了logibin

在mysql的安装路径下的my.ini中 

【mysqlid】下

添加
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

参考gitee的项目,即拉即用。参考地址

zhanghl/spring-boot-debeziumicon-default.png?t=N7T8https://gitee.com/zhl001/spring-boot-debezium项目中一个三个文件,pom和两个类需要参考。

pom.xml

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.13.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>cn.felord</groupId><artifactId>spring-boot-debezium</artifactId><version>0.0.1-SNAPSHOT</version><name>spring-boot-debezium</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><debezium.version>1.5.2.Final</debezium.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>${debezium.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><!--springboot与mybatis的整合包--><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>1.3.0</version></dependency><!--mysql驱动包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!--springboot与JDBC整合包--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><!--sqlserver数据源--><dependency><groupId>com.microsoft.sqlserver</groupId><artifactId>sqljdbc4</artifactId><version>4.0</version></dependency></dependencies>

两个java类

DebeziumConfiguration.java
package cn.felord.debezium.debezium;import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.List;
import java.util.Map;import static io.debezium.data.Envelope.FieldName.*;
import static java.util.stream.Collectors.toMap;/*** The type Debezium configuration.** @author n1* @since 2021 /6/1 17:01*/
@Configuration
public class DebeziumConfiguration {/*** Debezium 配置.** @return configuration*/@Beanio.debezium.config.Configuration debeziumConfig() {return io.debezium.config.Configuration.create()
//            连接器的Java类名称.with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                偏移量持久化文件路径 默认/tmp/offsets.dat  如果路径配置不正确可能导致无法存储偏移量 可能会导致重复消费变更
//                如果连接器重新启动,它将使用最后记录的偏移量来知道它应该恢复读取源信息中的哪个位置。.with("offset.storage.file.filename", "/tmp/offsets.dat")
//                捕获偏移量的周期.with("offset.flush.interval.ms", "1")
//               连接器的唯一名称.with("name", "mysql-connector")
//                数据库的hostname.with("database.hostname", "10.1.1.1")
//                端口.with("database.port", "3306")
//                用户名.with("database.user", "canal")
//                密码.with("database.password", "canal")
//                 包含的数据库列表,你的数据库.with("database.include.list", "md_test")
//                是否包含数据库表结构层面的变更,建议使用默认值true.with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id.with("database.server.id", 1)
//                	MySQL 服务器或集群的逻辑名称.with("database.server.name", "customer-mysql-db-server")
//                历史变更记录.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
//                历史变更记录存储位置,存储DDL.with("database.history.file.filename", "/tmp/dbhistory.dat").build();}/*** Debezium server bootstrap debezium server bootstrap.** @param configuration the configuration* @return the debezium server bootstrap*/@BeanDebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {DebeziumServerBootstrap debeziumServerBootstrap = new DebeziumServerBootstrap();DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(configuration.asProperties()).notifying(this::handlePayload).build();debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);return debeziumServerBootstrap;}private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {recordChangeEvents.forEach(r -> {SourceRecord sourceRecord = r.record();String topic = sourceRecord.topic();Struct sourceRecordChangeValue = (Struct) sourceRecord.value();if (sourceRecordChangeValue != null) {// 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));if (operation != Envelope.Operation.READ) {String record = operation == Envelope.Operation.DELETE ? BEFORE : AFTER;// 获取增删改对应的结构体数据Struct struct = (Struct) sourceRecordChangeValue.get(record);// 将变更的行封装为MapMap<String, Object> payload = struct.schema().fields().stream().map(Field::name).filter(fieldName -> struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));// 这里简单打印一下System.out.println("operation = " + operation);System.out.println("data = " + payload);if(operation.toString().equals("CREATE")){System.out.println("新增记录一条");}//tabelNameif(topic.split("\\.").length > 2){String tableName = topic.split("\\.")[2];System.out.println("tabelName" + tableName);}}}});}}
DebeziumServerBootstrap.java
package cn.felord.debezium.debezium;import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;import java.util.concurrent.Executor;
import java.util.concurrent.Executors;/*** @author n1* @since 2021/6/2 10:45*/
@Data
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {private final Executor executor = Executors.newSingleThreadExecutor();private DebeziumEngine<?> debeziumEngine;@Overridepublic void start() {executor.execute(debeziumEngine);}@SneakyThrows@Overridepublic void stop() {debeziumEngine.close();}@Overridepublic boolean isRunning() {return false;}@Overridepublic void afterPropertiesSet() throws Exception {Assert.notNull(debeziumEngine, "debeziumEngine must not be null");}
}

 结语:

太强了,比canal强10倍,非侵入性。对比可参考:

不想引入MQ?不妨试试 Debezium-CSDN博客

为什么是debezium

这么多技术框架,为什么选debezium?

看起来很多。但一一排除下来就debezium和canal。

sqoop,kettle,datax之类的工具,属于前大数据时代的产物,地位类似于web领域的structs2。而且,它们基于查询而非binlog日志,其实不属于CDC。首先排除。

flink cdc是大数据领域的框架,一般web项目的数据量属于大材小用了。

同时databus,maxwell相对比较冷门,用得比较少。

「最后不用canal的原因有以下几点:」

  • canal需要安装,这违背了“如非必要,勿增实体”的原则。

  • canal只能对MYSQL进行CDC监控。有很大的局限性。

  • 大数据领域非常流行的flink cdc(阿里团队主导)底层使用的也是debezium,而非同是阿里出品的canal。

  • debezium可借助kafka组件,将变动的数据发到kafka topic,后续的读取操作只需读取kafka,可有效减少数据库的读取压力。可保证一次语义,至少一次语义。

  • 同时,也可基于内嵌部署模式,无需我们手动部署kafka集群,可满足”如非必要,勿增实体“的原则。

  • 而且canal只支持源端MySQL版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

实时监视同步数据库变更,这个框架真是神器_Mysql

http://www.yidumall.com/news/3769.html

相关文章:

  • 原创网站源码免费建网站的平台
  • 做网站产品图片素材阿里巴巴国际站
  • 如何做交互式网站免费优化推广网站的软件
  • 网站欢迎页怎么做网络营销竞价推广
  • 做电影网站服务器seo优化排名营销
  • 邯郸专业做wap网站广东seo快速排名
  • 电子商务网站建设的基本过程小程序推广接单平台
  • 网站浏览历史记录恢复方法是什么手机百度app下载安装
  • 网站包括哪些主要内容抖音推广
  • 网络营销导向的企业网站建设的要求apple私人免费网站怎么下载
  • 大型网站开发基本流程如何找外链资源
  • 有什么网站帮做邀请函设计的qq推广平台
  • 网站建设 模板网站网络口碑营销名词解释
  • 一万元做网站关键词歌词打印
  • 国外展览设计网站友情链接搜读
  • 辽中网站建设seo优化博客
  • 自己做的网站能备案免费域名注册平台
  • 安徽省工程建设工程信息网站百度网站提交入口
  • 怎么去跟客户谈网站建设网站优化排名优化
  • 做平面设计的一般浏览什么网站国内专业seo公司
  • 哪里可以做网赚网站网络营销的5种方式
  • 网站建设 制作温州seo教程
  • wordpress分类目录keywordseo网络推广培训班
  • ps做网站边框新产品的推广销售方法
  • 网站建设服装市场分析报告品牌营销策划有限公司
  • 怎么填充网站内容百度seo搜索引擎优化
  • 智能响应式网站建设优化大师卸载不了
  • 维吾尔网站建设学术深圳百度首页优化
  • 香港网站建设展览爱链接网如何使用
  • 南京网站设计案例软文写作是什么