前言
需要的工具请前往:https://www.ifnice.cn/archives/30
还需要阿里canal.client软件,这里提供1.1.5版本:https://wwba.lanzoum.com/ie2UG0prasbc
作用:数据库修改了某一个东西,立即同步到Redis,如果Redis原来没有这个数据,也可以同步,有一点延迟
所有路径都不能有中文
Mysql
打开Mysql的my.ini,划到最后一行,添加三行代码,注意看注解,添加完要重启Mysql
#阿里组件需要的
log-bin=D:\BtSoft\mysql\MySQL5.7\data #自己电脑的mysql路径
binlog-format=ROW
server_id=1
阿里canal.client软件
打开\conf\example\instance.properties文件,修改成这里Mysql的账号密码
准备Lua文件
作用:数据库修改了某一个东西,立即同步到Redis,如果Redis原来没有这个数据,也可以同步,有一点延迟
注意看注释
ngx.header.content_type="application/json;charset=utf8"
--同步
local function close_redis(red)
if not red then
return
end
-- 释放连接(连接池实现),毫秒
local pool_max_idle_time = 10000
-- 连接池大小
local pool_size = 100
local ok, err = red:set_keepalive(pool_max_idle_time, pool_size)
local log = ngx_log
if not ok then
log(ngx_ERR, "set redis keepalive error : ", err)
end
end
local uri_args = ngx.req.get_uri_args()
local cid = uri_args['cid']
local mysqlModel = require("resty.mysql")
local db = mysqlModel:new()
db:set_timeout(1000)
--数据库配置
local ok = db:connect{
host="127.0.0.1",
port=3306,
database="shop_content",
user="root",
password="ok"
}
if not ok then
ngx.say('链接失败')
db:close()
return false;
end
--查询语句
res = db:query("SELECT * FROM `tb_content` WHERE category_id="..cid)
local cjson = require("cjson")
--ngx.say(cjson.encode(res))
db:close()
local redisModel = require("resty.redis")
local redis = redisModel.new()
redis:set_timeout(1000)
--Redis连接
local ok = redis:connect('127.0.0.1',6379)
if not ok then
ngx.say('链接redis失败')
return close_redis(redis)
end
--切换第一个Redis表
redis:select(0)
redis:set("content:"..cid,cjson.encode(res))
close_redis(redis)
ngx.say("{'flag':'success'}")
Java文件
单线程版:
没说要动的,就不要动
结构图
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client-adapter</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>
代码,57-72行注意看,67行要换成自已的lua
package com.zb.util;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class ClientSample {
@Autowired
private RestTemplate restTemplate;
public void main() {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
try {
//创建连接
connector.connect();
//监听mysql所有的库和表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
boolean flag = true;
while (flag) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
//用户没有更改数据库中的数据
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//获取修改的每一条记录
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
}
} finally {
connector.disconnect();
}
}
public void updateContentSync(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
if (column.getName().equals("category_id")) {
System.out.println("数据同步准备完毕,请开始操作");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String cid = column.getValue();
String url = "http://localhost:9098/mysave?cid=" + cid;
String data = restTemplate.getForObject(url, String.class);
System.out.println(data);
}
}
}
private void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
//检查到当前执行的代码是事物操作, 跳转下次
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
//代码固定,获取rowChage对象
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
//rowChage getEventType 获取事件类型对象
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
if (entry.getHeader().getSchemaName().equals("shop_content") && entry.getHeader().getTableName().equals("tb_content")) {
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
//rowData.getBeforeColumnsList()获取删除之前的数据
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
//rowData.getAfterColumnsList()获取添加之后的数据
//asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
} else {
System.out.println("1---");
updateContentSync(rowData.getBeforeColumnsList());
System.out.println("3---");
}
}
}
}
}
private void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
启动类
package com.zb;
import com.zb.util.ClientSample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@EnableAsync //开启异步
public class ContentApplication {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(ContentApplication.class, args);
ClientSample bean = run.getBean(ClientSample.class);
bean.main();
}
@Bean
public RestTemplate createRestTemplate() {
return new RestTemplate();
}
}
Java多线程版:
没说要动,不要动
结构:
新增依赖
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client-adapter</artifactId>
<version>1.1.2</version>
<type>pom</type>
</dependency>
application.yml
无需额外新增
AsyncProccess工具类
28行需要改动
package com.zb.util;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.util.List;
@Component
public class AsyncProccess {
@Autowired
private RestTemplate restTemplate;
@Async
public void updateContentSync(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
if (column.getName().equals("category_id")) {
System.out.println("数据同步准备完毕,请开始操作");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String cid = column.getValue();
String url = "http://localhost:9090/mysave?cid=" + cid;
String data = restTemplate.getForObject(url, String.class);
System.out.println(data);
}
}
}
}
ClientSample工具类
package com.zb.util;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;
import java.net.InetSocketAddress;
import java.util.List;
@Component
public class ClientSample {
@Autowired
private AsyncProccess asyncProccess;
public void main() {
System.out.println("开启同步");
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
try {
//创建连接
connector.connect();
//监听mysql所有的库和表
connector.subscribe(".*\\..*");
//回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
connector.rollback();
boolean flag = true;
while (flag) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
//用户没有更改数据库中的数据
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//获取修改的每一条记录
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
}
} finally {
connector.disconnect();
}
}
private void printEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
//检查到当前执行的代码是事物操作, 跳转下次
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
//代码固定,获取rowChage对象
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
//rowChage getEventType 获取事件类型对象
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
if (entry.getHeader().getSchemaName().equals("shop_content") && entry.getHeader().getTableName().equals("tb_content")) {
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
//rowData.getBeforeColumnsList()获取删除之前的数据
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
//rowData.getAfterColumnsList()获取添加之后的数据
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("1---");
asyncProccess.updateContentSync(rowData.getBeforeColumnsList());
System.out.println("3---");
}
}
}
}
}
private void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
启动类
package com.zb;
import com.zb.util.ClientSample;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.client.RestTemplate;
@SpringBootApplication
@EnableDiscoveryClient
@EnableAsync //开启异步
public class ContentApplication {
public static void main(String[] args) {
ConfigurableApplicationContext run = SpringApplication.run(ContentApplication.class, args);
ClientSample bean = run.getBean(ClientSample.class);
bean.main();
}
@Bean
public RestTemplate createRestTemplate() {
return new RestTemplate();
}
}