一:查找Logstash镜像

https://hub.docker.com/_/Logstash?tab=tags

二:拉取Logstash镜像

docker pull logstash:6.8.12

三:创建Logstash容器

1. 首先创建一个容器 用来获取它的配置文件

docker run -d --name=logstash logstash:6.8.12

查看日志信息 是否启动成功

docker logs -f logstash

在这里插入图片描述

2. 创建创建所需挂载文件

mkdir -p /opt/docker/logstash/config/conf.d

拷贝数据

docker cp logstash:/usr/share/logstash/config /opt/docker/logstash/
docker cp logstash:/usr/share/logstash/data /opt/docker/logstash/
docker cp logstash:/usr/share/logstash/pipeline/opt/docker/logstash/

修改配置
logstash.yml

http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: [ "http://127.0.0.1:9200" ]
#path.config: /usr/share/logstash/config/conf.d/*.conf
path.logs: /usr/share/logstash/logs

2.文件夹赋权

chmod -R 777 logstash/

3.命名行启动

删除之前的容器,重新创建新的容器

docker run \
--name logstash \
--restart=always \
-p 5044:5044 \
-p 9600:9600 \
-e ES_JAVA_OPTS="-Duser.timezone=Asia/Shanghai" \
-v /opt/docker/logstash/config:/usr/share/logstash/config \
-v /opt/docker/logstash/data:/usr/share/logstash/data \
-v /opt/docker/logstash/pipeline:/usr/share/logstash/pipeline \
-d logstash:6.8.12

参数说明

docker run \
--name logstash\ 将容器命名为 logstash
--restart=always \ 容器自动重启
-p 5044:5044 \ 将容器的5044端口映射到宿主机5044端口 logstash的启动端口
-p 9600:9600 \ 将容器的9600端口映射到宿主机9600 端口,api端口
-e ES_JAVA_OPTS="-Duser.timezone=Asia/Shanghai" \ 设置时区
-v /opt/docker/logstash/config:/usr/share/logstash/config \
-v /opt/docker/logstash/data:/usr/share/logstash/data \
-v /opt/docker/logstash/pipeline:/usr/share/logstash/pipeline \ 挂载
-d logstash:6.8.12 后台运行容器,并返回容器ID

4.查看是否启动成功

docker ps -a
docker logs -f --tail 200 logstash 

四:mysql同步配置

  1. 进入容器
    docker exec -it logstash bash
    
  2. 查询是否有 logstash-input-jdbc
    ./bin/logstash-plugin list --verbose
    
  3. 如果没有就安装插件(一般默认安装)
    ./bin/logstash-plugin install logstash-input-jdbc
    
  4. 退出容器
    exit
    
  5. 在文件目录/opt/docker/logstash/config/conf.d下创建jdbc.conf文件
     input{
         jdbc{
              # 连接数据库
              jdbc_connection_string => "jdbc:mysql://47.108.13.175:3306/edu?serverTimezone=Asia/Shanghai&characterEncoding=utf8&useSSL=false"
              jdbc_user => "root"
              jdbc_password => "HBcloud.1024"
              # 连接数据库的驱动包
              jdbc_driver_library => "/usr/share/logstash/config/jars/mysql-connector-java-8.0.20.jar"
              jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
              jdbc_paging_enabled => "true"
              jdbc_page_size => "50000"
              codec => plain { charset => "UTF-8" }
    
              # 数据追踪
              # 追踪的字段
              tracking_column => "update_time"
              # 上次追踪的元数据存放位置
              last_run_metadata_path => "/usr/share/logstash/config/lastrun/logstash_jdbc_last_run"
              # 设置时区
              jdbc_default_timezone => "Asia/Shanghai"  
              # sql 文件地址
              # statement_filepath => ""
              # sql
              statement => "SELECT g.merchant_id AS id,g.nickname AS nickname,g.avatar_name AS avatarName,g.`desc` AS `desc`,g.contacts AS contacts,g.province AS province,g.city AS city,g.district AS district,g.address AS address, ST_AsText(g.coordinate) AS coordinate FROM wbc_merchant g WHERE g.update_time > :sql_last_value"
              # 是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
              clean_run =>false
              # 这是控制定时的,重复执行导入任务的时间间隔,第一位是分钟 不设置就是1分钟执行一次
              schedule => "* * * * *"
         }
    }
    output{
          elasticsearch{
                       # 要导入到的Elasticsearch所在的主机
                       hosts => "132.232.41.245:9200"
                       # 要导入到的Elasticsearch的索引的名称
                       index => "merchant_index"
                       # 类型名称(类似数据库表名)
                       document_type => "merchanteso"
                       # 主键名称(类似数据库表名)
                       document_id => "%{id}"
          }
    
          stdout{
                # JSON 格式输出
                codec => json_lines
          } 
    }
    
  6. 关闭logstash.yml 中path.config: /usr/share/logstash/config/conf.d/*.conf的注释
    重启容器

五:常见问题

An unknown error occurred sending a bulk request to Elasticsearch. We will retry indefinitely {:error_message=>""\xC2" from ASCII-8BIT to UTF-8", :error_class=>“LogStash::Json::GeneratorError”, :backtrace=>["/usr/share/logstash/logstash-core/lib/logsta

问题:编码错误

对 是字符串的 各个字段设置字符集:
在jdbc.conf 的jdbc 中

columns_charset => {
                "id"=> "UTF-8"
                "nickname"=> "UTF-8"
                "avatarName"=> "UTF-8"
                "desc"=> "UTF-8"
                "contacts"=> "UTF-8"
                "province"=> "UTF-8"
                "city"=> "UTF-8"
                "district"=> "UTF-8"
                "address"=> "UTF-8"
          }
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐