canal同步es

    22

基础

安装好es 本文使用版本canal1.1.15 https://github.com/alibaba/canal

server端安装

docker安装

sh run.sh -e canal.auto.scan=false \
          -e canal.destinations=blog \
          -e canal.instance.master.address=106.14.239.36:3306  \
          -e canal.instance.dbUsername=zzf-dev  \
          -e canal.instance.dbPassword=YaThM7reZJjXt4pj  \
          -e canal.instance.connectionCharset=UTF-8 \
          -e canal.instance.tsdb.enable=true \
          -e canal.instance.gtidon=false

安装包安装

  1. 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.adapter-1.1.5.tar.gz
  1. 修改配置
#conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 #修改数据库地址
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  #修改数据库账号
canal.instance.dbPassword = canal  #修改数据库密码
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
  1. 启动
sh bin/startup.sh 
cat logs/canal/canal.log
cat logs/example/example.log
sh bin/stop.sh

es适配器

  1. 下载适配器,不要下载正式版,有bug
  2. 修改启动配置
#application.yml
canal.conf:
  canalServerHost: 127.0.0.1:11111
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  mode: tcp 
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true  #修改数据库链接
      username: root #修改数据库账号
      password: 121212 #修改数据库密码
  canalAdapters:
  - instance: example 
    groups:
    - groupId: g1
      outerAdapters:
      - 
        key: exampleKey
        name: es7                           #修改为es7
        hosts: 127.0.0.1:9300               #es 集群地址, 逗号分隔
        properties:
          mode: transport # or rest         #可指定transport模式或者rest模式
          cluster.name: elasticsearch       #es cluster name
  1. 修改映射配置
dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 对应application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId:                        # 对应MQ模式下的groupId, 只会同步对应groupId的数据
esMapping:
  _index: mytest_user           # es 的索引名称
  _type: _doc                   # es 的type名称, es7下无需配置此项
  _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
#  pk: id                       # 如果不需要_id, 则需要指定一个属性为主键属性
  # sql映射
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time, c.labels as _labels from user a
        left join role b on b.id=a.role_id
        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id"
#  objFields:
#    _labels: array:;           # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
#    _obj: object               # json对象
  etlCondition: "where a.c_time>='{0}'"     # etl 的条件参数
  commitBatch: 3000                         # 提交批大小
  1. 启动
sh bin/startup.sh 
sh bin/stop.sh

es同步

  1. 初始化索引
put
http://127.0.0.1:9200/blog
{
    "settings": {
        "analysis.analyzer.default.type": "ik_max_word",
        "number_of_shards": 5,
        "number_of_replicas": 0
    },
    "mappings": {
        "properties": {
            "id": {
                "type": "long"
            },
            "title": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "tag": {
                "type": "keyword"
            },
            "tag_desc": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            },
            "is_delete": {
                "type": "short"
            },
            "is_release": {
                "type": "short"
            },
            "content": {
                "type": "text",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_smart"
            }
        }
    }
}
  1. 全量同步
post
curl -X POST http://127.0.0.1:8081/etl/es7/mytest_user.yml

F&Q

  • 如果系统是1个cpu,需要将canal.instance.parser.parallel设置为false
  • 启动失败一般来说只需调整startup里的内存配置即可
评论区
共有评论 0
暂无评论