这篇文章主要介绍了php使用Canal监听msyql的全过程,文中通过图文和代码示例讲解的非常详细,对大家的学习或工资有一定的帮助,需要的朋友可以参考下
目录
安装Java设置mysql用户权限修改mysql配置安装canal修改实例配置启动和停止php测试canal需要java8去官网下载java8安装Java
#创建目录mkdir -p /usr/local/java/#解压到目录tar zxvf jdk-8u411-linux-x64.tar.gz -C /usr/local/java/
配置环境变量在 /etc/profile 最后加入
export JAVA_HOME=/usr/local/java/jdk1.8.0_411export CLASSPATH=.:$JAVA_HOME/jre/lib/rt.jar:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jarexport PATH=$PATH:$JAVA_HOME/bin
使之生效
source /etc/profile
查看是否安装成功
java -version
设置mysql用户权限
#创建用户名和密码都为 canal 的用户create user 'canal'@'%' identified by 'canal';#授予该用户对所有数据库和表的查询、复制主节点数据的操作权限GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES; #重新加载权限
修改mysql配置
vim /etc/my.cnf
修改部分内容如下
# 开启 binloglog-bin=mysql-bin#master端的ID号,不能和 canal 的 slaveId 重复;server-id=1#行级,记录每次操作后每行记录的变化。binlog-format=row#指定库,缩小监控的范围。binlog-do-db=test
查看是否开启主从
show master status;
安装canal
下载最新canal
下载wget https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz#创建canal目录mkdir -p /usr/local/canal/#解压到canal目录tar -zxvf canal.deployer-1.1.7.tar.gz -C /usr/local/canal/
查看canal主配置文件
cat /usr/local/canal/conf/canal.properties
查看会看到暴露了三个端口和指定了一个实例
canal.admin.port = 11110canal.port = 11111canal.metrics.pull.port = 11112#多个实例使用逗号分隔: canal.destinations = example1,example2,#这里对应/usr/local/canal/conf/example/canal.destinations = example
canal.destinations:canal能可以收集多个MySQL数据库数据,每个MySQL数据库都有独立的配置文件控制。具体配置规则: conf/目录下,使用文件夹放置,文件夹名代表一个MySQL实例。canal.destinations用于配置需要监控数据的数据库。如果是多个,使用,隔开。
修改实例配置文件
修改实例配置
vim /usr/local/canal/conf/example/instance.properties
在文件最后加入
#配置 slaveId ,不能等于 mysql 配置里的 server Id 即可canal.instance.mysql.slaveId=10 #数据库连接canal.instance.master.address=127.0.0.1:3306 #数据库账号密码canal.instance.dbUsername=canal canal.instance.dbPassword=canal#代表数据库的编码方式canal.instance.connectionCharset = UTF-8#设置白名单,这里是监控test库下所有表canal.instance.filter.regex=test\\..*#设置黑名单,这里设置排除test库下以_noc结尾的表canal.instance.filter.black.regex=test\\..*_noc
这个正则表达式 test\.*_noc 的含义是:
test:精确匹配字符串 "test"。\\.:匹配一个点(.),因为点在正则表达式中有特殊含义,所以需要使用 \\ 进行转义。*:匹配零个或多个任意字符。_noc:精确匹配字符串 "_noc"。因此,这个正则表达式可以用来匹配所有以 test. 开头且以 _noc 结尾的字符串。在 Canal 的配置中使用这个正则表达式,就可以实现排除以 test. 开头且以 _noc 结尾的数据库和表,而监控其他数据库和表。
比如我们要查询 test库下users表id为3的用户信息,sql语句可以这么写
SELECT * FROM test.users WHERE id = 3;
这里的test.users就是正则要匹配的地方
如果系统是1个 cpu,需要将 canal.instance.parser.parallel 设置为 false
启动和停止
#启动
/usr/local/canal/bin/startup.sh
#停止
/usr/local/canal/bin/stop.sh
查看是否启动成功
ps -ef | grep canal
php测试
使用 canal-php
composer require xingwenge/canal_php
新建index.php文件
<?phprequire __DIR__.'/vendor/autoload.php';use xingwenge\canal_php\CanalClient;use xingwenge\canal_php\CanalConnectorFactory;use xingwenge\canal_php\Fmt;try { $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE); # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE); $client->connect("127.0.0.1", 11111); $client->checkValid(); //设置过滤tes库t下的所有表 $client->subscribe("1001", "example", "test.*"); while (true) { $message = $client->get(100); if ($entries = $message->getEntries()) { foreach ($entries as $entry) { Fmt::println($entry); } } sleep(1); } $client->disConnect();} catch (\Exception $e) { echo $e->getMessage(), PHP_EOL;}
运行php文件
php index.php
eventType:1、是新增行,2、修改行,3、删除行、4、新增表
也可以直接将数据写入rabbitmq
修改conf/canal.properties
vim conf/canal.properties
修改RabbitMQ配置如下
rabbitmq.host = 127.0.0.1rabbitmq.virtual.host = /#交换机名称rabbitmq.exchange =exchange.canal#队列名称rabbitmq.queue = canal_queue#路由键名rabbitmq.routingKey = canal-routing-key#账号密码rabbitmq.username =adminrabbitmq.password =admin#路由模式,这里是严格模式rabbitmq.deliveryMode =direct
修改实例配置conf/example/instance.properties
vim conf/example/instance.properties
修改部分内容如下
#rabbitmq的路由键配置#这里与canal.properties里的rabbitmq.routingKey一样canal.mq.topic=canal-routing-keycanal.mq.partition=0canal.instance.multi.stream.on=false#rabbitmq 的 routing key#dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*#hash partition config#canal.mq.enableDynamicQueuePartition=false#canal.mq.partitionsNum=3#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6#canal.mq.partitionHash=test.table:id^name,.*\\..*