1. 首页
  2. >
  3. 编程技术
  4. >
  5. Java

FileBeat + Kafka + ELK搭建与简单示例

最近调研了一下ELK,自己也尝试搭建了一套环境,用于学习, 现将整个部署的过程记录下来

现 Filebeat 已经完全替代了 Logstash-Forwarder 成为新一代的日志采集器,越来越多人开始使用它,所以现在基于 Filebeat ELK 架构如下图

FileBeat + Kafka + ELK搭建与简单示例

看完了图,再来简单介绍一下ELK

E lasticsearch :简称 ES,是ELK的核心,是基于Apache Lucene的开源数据搜索引擎,可以实时快速的搜索和分析,性能强悍

L ogstash :一个具有实时传输能力的数据收集引擎,用来数据收集、分析、过滤日志的工具,支持多类型日志

K ibana :为Elasticsearch提供了分析和可视化的Web平台 ,可以生成各种维度的表格,图形, Kibana 还可以用于问题分析,可以很快的将异常事件或者事件范围缩小到秒级或者个位数,从TB级别的数据中搜到关键的错误信息

Filebeat : 一个轻量级日志采集器, 早期的 ELK 架构中使用 Logstash 收集、解析日志,但是 Logstash 对内存、cpu、io 等资源消耗比较高, 相比 Logstash,Filebeat 所占系统的 CPU 和内存几乎可以忽略不计

Kafka:一种高吞吐量的分布式发布订阅消息系统, 如果日志量巨大,还需要引入Kafka用以均衡网络传输,降低网络闭塞, 保证数据不丢失,还可以系统之间解耦,具有更好的灵活性和扩展性

版本:

Filebeat:7.8.1
LogStash:7.8.1
Kibana:7.8.1
Elasticsearch:7.8.1
Kafka:2.2.2 - 2.12
Java:11

在选择对应版本的时候需要注意,ES 7.8.1需要Java 11

Filebeat与Kafka对应版本的选择,官网也有说明,建议Kafka版本在 0.11 和 2.2.2 之间,所以选用了Kafka 2.2.2

Filebea t

1.解压

2.Filebeat配置起来很简单,修改filebeat.yml配置文件如下

定义Filebeat的输入:为目标日志

定义Filebeat的输出:为kakfa,指定topic

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

# Change to true to enable this input configuration.
enabled: true

# Paths that should be crawled and fetched. Glob based paths.
paths:
- /yw/log/*.log


# ------------------------------ Logstash Output -------------------------------
output.kafka:
enable: true
hosts: ["localhost:9092"]
topic: "test"

2.启动Filebeat,需要指定配置文件,即刚才配置好的filebeat.yml

nohup ./filebeat -c filebeat.yml &

Kafka

1.解压

2.修改配置文件 server.properties,修改如下配置

listeners=PLAINTEXT://localhost:9092

3.如果你没有zookeeper,则可以使用kakfa自带的zookeeper,配置zookeeper

# the directory where the snapshot is stored.
dataDir=/tmp/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0

4.启动zookeeper

cd bin
nohup ./zookeeper-server-start.sh ../config/zookeeper.properties &

查看进程是否正确启动

[root@localhost bin]# jps
9139 QuorumPeerMain

5.启动Kafka

nohup ./kafka-server-start.sh ../config/server.properties &

查看进程是否正确启动

[root@localhost bin]# jps
9139 QuorumPeerMain
9683 Kafka

6.查看 topic,在我们配置 Filebeat 的时候定义了topic,当 Kafka 启动后,就会创建 topic

[root@localhost bin]# ./kafka-topics.sh --list --bootstrap-server localhost:9092
test
[root@localhost bin]# ./kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:segment.bytes=1073741824
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

LogStash

1.解压

2.修改配置文件 config/logstash.yml

node.name: localhost

3.在使用Logstash时,可根据需求创建自己的配置文件

定义 input :Logstash 去接 Filebeat 发送到 Kafka 的数据,配置 Kafka 相关的信息,指定 topic,并指明类型为 json

定义 output : 需要向 ES 中存储,指定ES的地址,并创建索引: test – 年份.月份 ,配置文件如下

注意,需要保证格式的正确,要不然 LogStash会抛出异常

[root@localhost config]# cat test.conf 
input {
kafka {
bootstrap_servers => ["localhost:9092"]
group_id => "test"
topics => ["test"]
consumer_threads => 1
codec => json {
charset => "UTF-8"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "test-%{+YYYY.MM}"
}
}

3. 启动 Logstash,需要指定我们自定义的配置文件

cd ../bin/
nohup ./logstash -f ../config/test.conf &

查看进程是否启动

[root@localhost bin]# jps
9139 QuorumPeerMain
9683 Kafka
3619 Logstash

Elasticsearch

1.解压

2.创建 elasticsearch 用户(root用户不能启动 elasticsearch )

useradd elasticsearch

3.修改文件拥有用户

chown -R elasticsearch elasticsearch/

切换用户

su - elasticsearch

4.修改配置文件,config/elasticsearch.yml

http.port: 9200

5.修改Jvm配置文件 jvm.options,资源有限,需要限定一下堆的大小

-Xms256m
-Xmx512m

6.启动 Elasticsearch

[root@localhost bin]# cd bin
[root@localhost bin]# ./elasticsearch

查看进程是否启动

[2020-07-29T21:54:17,410][INFO ][o.e.h.AbstractHttpServerTransport] [localhost.localdomain] publish_address {127.0.0.1:9200}, bound_addresses {[::1]:9200}, {127.0.0.1:9200}
[2020-07-29T21:54:17,411][INFO ][o.e.n.Node ] [localhost.localdomain] started
[root@localhost bin]# jps
3619 Logstash
2516 Kafka
2955 QuorumPeerMain
3725 Elasticsearch
3262 Kafka

7.测试es是否好用

[root@localhost bin]# curl localhost:9200
{
"name" : "localhost.localdomain",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "2dcAImNFTli_lGTHJYx7_A",
"version" : {
"number" : "7.8.1",
"build_flavor" : "default",
"build_type" : "tar",
"build_hash" : "b5ca9c58fb664ca8bf9e4057fc229b3396bf3a89",
"build_date" : "2020-07-21T16:40:44.668009Z",
"build_snapshot" : false,
"lucene_version" : "8.5.1",
"minimum_wire_compatibility_version" : "6.8.0",
"minimum_index_compatibility_version" : "6.0.0-beta1"
},
"tagline" : "You Know, for Search"
}

Kibana

1.解压

2.创建 kibana 用户

useradd kibana

3. 修改文件拥有用户

chown -R kibana kibana/

切换用户

su - kibana

4.修改配置文件 kibana.yml

elasticsearch.hosts: ["http://localhost:9200"]

5.启动 kibana

[root@localhost kibana]# cd bin/
[root@localhost bin]# ./kibana

查看是否启动成功,相关信息如下

[01:55:38.282] [info][listening] Server running at http://localhost:5601
[01:55:39.117] [info][server][Kibana][http] http server running at http://localhost:5601

所有组件都启动成功后,打开浏览器输入 localhost:5601 ,查看kibana

接下来就是见证奇迹的时刻,网页成功打开

FileBeat + Kafka + ELK搭建与简单示例

然后连接自己的ES

FileBeat + Kafka + ELK搭建与简单示例

下一步后,找到Logstash中设置好的索引

FileBeat + Kafka + ELK搭建与简单示例

接下来,就是验证整个流程了

让监控的日志中产生一些数据

[2020-07-29 22:12:49,689] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 37 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-29 22:22:49,645] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-29 22:32:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-07-29 22:42:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)

在 Kafka 中消费一下,是否有消息发送到 topic 上

./kafka-console-consumer.sh  --topic test --bootstrap-server localhost:9092
{"@timestamp":"2020-07-30T02:43:17.329Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"ecs":{"version":"1.5.0"},"host":{"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"name":"localhost.localdomain","hostname":"localhost.localdomain","architecture":"x86_64","os":{"family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"id":"17946cedccdd442b845d0cfa8693cc71","containerized":false},"agent":{"version":"7.8.1","hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2","id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat"},"message":"[2020-07-29 22:04:48,914] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-98001 for generation 1 (kafka.coordinator.group.GroupCoordinator)","log":{"offset":668,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"log":{"offset":846,"file":{"path":"/yw/log/1.log"}},"message":"[2020-07-29 22:12:49,689] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 37 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","input":{"type":"log"},"host":{"architecture":"x86_64","os":{"version":"7 (Core)","family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos"},"id":"17946cedccdd442b845d0cfa8693cc71","containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"name":"localhost.localdomain","hostname":"localhost.localdomain"},"agent":{"id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1","hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2"},"ecs":{"version":"1.5.0"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"host":{"architecture":"x86_64","name":"localhost.localdomain","os":{"family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"id":"17946cedccdd442b845d0cfa8693cc71","containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"hostname":"localhost.localdomain"},"message":"[2020-07-29 22:22:49,645] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","log":{"offset":1004,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"},"agent":{"id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1","hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2"},"ecs":{"version":"1.5.0"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"host":{"containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"name":"localhost.localdomain","mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"hostname":"localhost.localdomain","architecture":"x86_64","os":{"family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos","version":"7 (Core)"},"id":"17946cedccdd442b845d0cfa8693cc71"},"agent":{"ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2","id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1","hostname":"localhost.localdomain"},"message":"[2020-07-29 22:32:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","log":{"offset":1161,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"},"ecs":{"version":"1.5.0"}}
{"@timestamp":"2020-07-30T02:43:17.330Z","@metadata":{"beat":"filebeat","type":"_doc","version":"7.8.1"},"host":{"id":"17946cedccdd442b845d0cfa8693cc71","name":"localhost.localdomain","containerized":false,"ip":["10.46.2.112","fe80::2959:f9cc:2a94:6ddc","192.168.122.1"],"mac":["00:0c:29:4e:57:2e","52:54:00:56:ed:eb","52:54:00:56:ed:eb"],"hostname":"localhost.localdomain","architecture":"x86_64","os":{"version":"7 (Core)","family":"redhat","name":"CentOS Linux","kernel":"3.10.0-1062.el7.x86_64","codename":"Core","platform":"centos"}},"agent":{"hostname":"localhost.localdomain","ephemeral_id":"3a381097-c3fe-4339-b92b-fff537b9a9f2","id":"401bee7f-115f-447d-a1e5-a6b6c57e21a1","name":"localhost.localdomain","type":"filebeat","version":"7.8.1"},"message":"[2020-07-29 22:42:49,646] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)","log":{"offset":1318,"file":{"path":"/yw/log/1.log"}},"input":{"type":"log"},"ecs":{"version":"1.5.0"}}

发现已经消费到了,格式为json,日志具体内容在message中

接下来看一下 es 中有没有数据,在Discover中,找到之前设置好的索引,我们搜索message中带有 INFO 级别的日志

FileBeat + Kafka + ELK搭建与简单示例

我们想在 INFO 日志中找到那条 37 milliseconds 数据, 只需要添加相关条件即可,如下图所示

FileBeat + Kafka + ELK搭建与简单示例

至此,简单的一个示例以及完成了