一. 拉取logstash镜像

docker pull logstash:6.7.0

二. 编写docker-compose.yaml文件

version: '3'
services:
  logstash:
    restart: always
    image: logstash:6.7.0
    deploy:
      replicas: 1
      update_config:
        parallelism: 2
        delay: 10s
      restart_policy:
        condition: on-failure
        delay: 5s
        max_attempts: 3
        window: 120s
      resources:
        limits:
          cpus: '0.5'
          memory: 1024M
        reservations:
          cpus: '1'
          memory: 2408M
    volumes:
      - /opt/data/logstash/:/opt/data/logstash/
    ports:
      - "9600:9600"
      - "5044:5044"
    container_name: "logstash"
    networks:
      - back-up
networks:
  back-up:
    driver: bridge

三. 构建logstash容器

docker-compose -f docker-compose.yaml up -d

四. 进入logstash容器

docker exec -it logstash /bin/bash

查看容器日志

docker logs -f 容器ID

五. 安装logstash-input-jdbc插件

logstash安装目录在 /usr/share下

在logstash的安装目录bin下运行

./logstash-plugin install logstash-input-jdbc

六. logstash配置文件配置

logstash.yml

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
node.name: logstash1
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
path.data: /opt/data/logstash
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
# pipeline.workers: 2
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
path.config: /usr/share/logstash/config/jdbc.conf
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
config.reload.automatic: true
#
# How often to check if the pipeline configuration has changed (in seconds)
#
config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
# queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
http.host: "192.168.2.225"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
# path.logs:
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins: []
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
#xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
#xpack.management.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.verification_mode: certificate
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s

pipelines.yml

- pipeline.id: main
  path.config: "/usr/share/logstash/config/conf.d/*.conf"

 

/usr/share/logstash/config/conf.d/jdbc.conf

input {
    stdin {
    }
    jdbc {
      # mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.2.243:3306/pay_db?useUnicode=true&useSSL=true&allowMultiQueries=true&verifyServerCertificate=false&serverTimezone=Asia/Shanghai"
      # 用户名和密码
      jdbc_user => "mengqi"
      jdbc_password => "123456"
      # 驱动
      jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.18.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      #处理中文乱码问题
      codec => plain { charset => "UTF-8"}
      #使用其它字段追踪,而不是用时间
      use_column_value => true
      #追踪的字段
      tracking_column => mid
      record_last_run => true
      #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/usr/share/logstash/config/sql/station_parameter"
      #开启分页查询
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #设置时区
      jdbc_default_timezone => "Asia/Shanghai" 
      # 执行的sql 文件路径+名称
      statement_filepath => "/usr/share/logstash/config/sql/jdbc.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
      # 索引类型
      type => "order_info"
      #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
    }
    jdbc {
      # mysql 数据库链接,test为数据库名
      jdbc_connection_string => "jdbc:mysql://192.168.2.243:3306/user_info?useUnicode=true&useSSL=true&allowMultiQueries=true&verifyServerCertificate=false&serverTimezone=Asia/Shanghai"
      # 用户名和密码
      jdbc_user => "mengqi"
      jdbc_password => "123456"
      # 驱动
      jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/mysql-connector-java-8.0.18.jar"
      # 驱动类名
      jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
      #处理中文乱码问题
      codec => plain { charset => "UTF-8"}
      #使用其它字段追踪,而不是用时间
      #use_column_value => true
      #追踪的字段
      tracking_column => create_time
      record_last_run => true
      #上一个sql_last_value值的存放文件路径, 必须要在文件中指定字段的初始值
      last_run_metadata_path => "/usr/share/logstash/config/sql/user-user_score"
      #开启分页查询
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"
      #设置时区
      jdbc_default_timezone => "Asia/Shanghai" 
      # 执行的sql 文件路径+名称
      statement_filepath => "/usr/share/logstash/config/sql/user-user_score-jdbc.sql"
      # 设置监听间隔  各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
      # 索引类型
      type => "user_score"
      #是否清除 last_run_metadata_path 的记录,如果为真那么每次都相当于从头开始查询所有的数据库记录
      clean_run => false
    }

}


filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}


output {
	# 通过上面定义的type来区分
    if[type] == "user_score"{
        elasticsearch {
        hosts => "192.168.2.248:9200"
	# 索引
	index => "user_info"
	# 文档type
	document_type => "user_score"
        # 文档id,这个是将sql中的id字段当作文档id,如果sql中没有id找一个唯一值字段as成id
	document_id => "%{mid}"
        }
    }
    if[type] == "order_info"{
        elasticsearch {
	# 要导入到的Elasticsearch所在的主机
	hosts => "192.168.2.248:9200"
	# 要导入到的Elasticsearch的索引的名称
	index => "pay_index"
	# 类型名称(类似数据库表名)
	document_type => "order_info"
	# 主键名称(类似数据库主键)
	document_id => "%{mid}"
        }
    }


    stdout {
        # JSON格式输出
        codec => json_lines
    }
}

/usr/share/logstash/config/conf.d/redis.conf

input {

    redis {
           host => "192.168.2.248"
           port => "63796"
           db => "1"
           batch_count => "1"
           data_type => "list"
           key => "filebeat"
    }
}

filter {
    if [fields][service] == "nginx-access" {
        geoip {
            source => "http_x_forwarded_for"
            add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
            add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]  
        }   
        mutate {
            convert => [ "[geoip][coordinates]", "float"] 
        }
    }
}

output {

  if [fields][service] == "secure"{      
        elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "secure-%{+YYYY.MM.DD}"
        } 
  }else if [fields][service] == "messages"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "messages-%{+YYYY.MM.DD}"
        }
  }else if [fields][service] == "app"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "app-%{+YYYY.MM.DD}"
        }
  }else if [fields][service] == "config"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "config-%{+YYYY.MM.DD}"
        }
  }else if [fields][service] == "eureka"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "eureka-%{+YYYY.MM.DD}"
        }
  }else if [fields][service] == "zuul"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "zuul-%{+YYYY.MM.DD}"
        }
  }else if [fields][service] == "tx-manager"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "tx-manager-%{+YYYY.MM.DD}"
        }
  }else if [fields][service] == "nginx-access"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "logstash-nginx-access-%{+YYYY.MM.DD}"
        }
  }else if [fields][service] == "error"{
       elasticsearch {
            hosts => ["192.168.2.248:9200"]
            index => "error-%{+YYYY.MM.DD}"
        }
  }
} 


/usr/share/logstash/config/sql/user-user_score-jdbc.sql

SELECT s.*, c.user_name, c.age, c.gender FROM user_score s
LEFT JOIN user_info c ON s.user_id = c.mid
WHERE s.create_time> :sql_last_value or c.create_time > :sql_last_value

 

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐