Logstash Kafka--->Influxdb

Logstash是一个类似于Flume的数据流采集工具,可以对接多种的数据输入源(如:各种服务器的日志文件,ES/KAFKA/HDFS等等,更多的输入插件请参照),以及将数据源数据到多种数据存储介质中(如:File/ES/Influxdb/HDFS/Mysql/Kafka等等,更多的输出插件请参照)

安装和使用:

可以从官网下载logstash,当前位置的logstash的版本是6.6.1,发布日期是2019年2月19日,可以根据自己的需求来下载适合自己需求的包,本次使用的是TAR.GZ文件

  1. 将下载的文件上传到linux的指定目录(自己喜欢的目录即可)

  2. tar -zxvf logstash-6.6.1.tar.gz -C targetpath (targetpath为解压到自己的软件安装的目录)

  3. vim xxx.conf(xxx是自己喜欢的名字 随意更换)

  4. 输入

    input { 
       stdin {} 
    } 
    output { 
       stdout{
           codec => json
             } 
    }

  5. $Logstash/bin/logstash -f xxx.conf 即可运行

Logstash的使用仅需玩家配置一个配置文件(如:xxx.conf)即可,logstash的安装也非常的简单

插件 : logstash-input-kafka (早期版本需要安装,当前版本已内置)

关于输入源kafka的相关配置,请参照,在logstash6.6.1中已经集成了logstash-input-kafka这个插件,所以不需要用户再自己去安装这个插件了,如果想查看已安装了哪些插件,可以使用命令:

$logstash/bin/logstash-plugin list

插件 : logstash-output-influxdb (没有内置,需要自己安装)

安装方式

一. 联网安装

$logstash/bin/logstash-plugin insatall logstash-output-influxdb

安装完成之后,执行

$logstash/bin/logstash-plugin list

即可查看是否安装成功

二. 离线安装

首先在一台能联网的电脑上安装logstash的需要的插件,然后在安装后的电脑上执行:

bin/logstash-plugin prepare-offline-pack logstash-output-influxdb

命令执行完后在logstash的根目录下会多出一个logstash-offline-plugins-6.6.1.zip文件

然后将logstash-offline-plugins-6.6.1.zip文件上传到需要离线安装插件的机子上,之后 执行命令:

$logstash/bin/logstash-plugin install file:///../../logstash-offline-plugins-6.6.1.zip

然后执行命令:

$logstash/bin/logstash-plugin list

查看是否已经成功安装

对于influxdb的插件的参数介绍,请以此插件github上项目下的目录 : logstash-output-influxdb/docs/index.asciidoc 文档为准,这里面的参数及使用介绍 ,是最完整的,官网及其他地方的版本可能不一样

还有一点需要注意的是,不同版本的插件参数也是不一样的,使用之前请严格查看自己插件版本对应的参数文档介绍

kafka及其他插件同理!

下面是示例配置文件及解释:

input{
    kafka { #下面的这些配置信息谁在前谁在后无所谓 不同版本的插件这些配置的名字也不同  请注意
        bootstrap_servers => "node01:9092,node02:9092,node03:9092"  # kafka集群信息
        codec => "json" # 将kafka的消息以json格式输入 这样的话  就可以直接在output和filter的部分使用"%{key}"来引用kafka消息中的对应字段的值
        group_id => "logstash1" #消费者组id  为了记录消费的offset
        consumer_threads => 8  # 消费者组中多少个消费者来消费这些topic中的消息  一般情况线程数对应topic的分区个数为最佳
        topics => ["test"]  # 需要消费的topic  可以传入多个topic
        auto_offset_reset => "latest" #如果消费者组的offset丢失或者第一次加入 从什么位置开始消费   
   }

}
filter{
   if ![host]{ #判断给定的消息中是否有host这个字段
        drop {} #如果没有的话就跳过这条消息
   } else {
        date {#如果只有match  没有target的话 默认把匹配到的时间赋值给event的@timestamp
            match => ["logdate","ISO8601"] #匹配 2019-02-26T09:30:51.55+8:00 这种带时区的日期格式
            timezone => "Asia/Shanghai" #如果加时区的话  @timestamp的时间会是UTC时间  并不是该时区的时间
    }
   }
}
output {
  influxdb {
    host => "node01" # influx所在的ip   port的话默认就是8086  如果没有修改默认端口的话 这个位置可以不用配置port
    db => "demo" # 使用的数据库
    measurement => "demo1" # 将数据插入到哪个表中
    retention_policy => "autogen" # 使用哪个保留规则
    send_as_tags => ['aaa'] # 这里面是标示着把哪些字段作为tag  datapoints中的字段必须大于此集合中的字段数
    allow_time_override => true  #是否允许覆盖time字段  logstash在向influx插入消息时默认是将@timestamp作为time字段插入 ,所以上面的date过滤器中直接匹配赋值即可
    flush_size => 100 #多少条消息提交到influx
    idle_flush_time => 1 #多久提交一次消息到influx  这个和上面那个条件  满足其一即可
    codec => json # 将消息以json格式输出
    coerce_values => {"value" => "float"} #将某些字段的值类型强转
    data_points => { # 字段之间不用加逗号 加了的话 报错
        "aaa" => "%{aaa}"#获取tag字段的值
        "value" => "%{value}"#获取field的值
        }    
  }
  stdout {codec => rubydebug}#将消息打印出来
}

如果参照上面的配置不能写入influx的话 请尝试将influx的版本降级

标签: none

相关文章推荐

添加新评论,含*的栏目为必填