在我之前的文章,我详细地介绍了如何通过 Filebeat 来收集日志并写入到 Elasticsearch。你可以阅读我之前的文章:

  • Beats: 使用 Filebeat 进行日志结构化 – Python

  • Beats:使用 Elastic Stack 记录 Python 应用日志

在今天的文章中,我将分享如何使用 Logstash 把日志文件发送到 Elasticsearch。使用 Logstash 的好处是它可以很方便地使用它丰富的过滤器对数据进行清洗以便更好地对数据进行分析。我们使用如下的架构:

在今天的展示中,我将使用最新的 Elastic Stack 8.4.3 来进行展示。

安装

如果你还没有安装好自己的 Elasticsearch,Kibana 及 Logstash,你可以按照如下的文章来进行安装:

  • 如何在 Linux,MacOS 及 Windows 上进行安装 Elasticsearch

  • Kibana:如何在 Linux,MacOS 及 Windows 上安装 Elastic 栈中的 Kibana

  • 如何安装 Elastic 栈中的 Logstash

首先,我们参考文章 “Logstash:如何连接到带有 HTTPS 访问的集群” 来生成 truststore.p12 证书文件:

$ pwd/Users/liuxg/test/elasticsearch-8.4.3/config/certs$ ls http.p12      http_ca.crt   transport.p12$ keytool -import -file http_ca.crt -keystore truststore.p12 -storepass password -noprompt -storetype pkcs12Certificate was added to keystore$ lshttp.p12       http_ca.crt    transport.p12  truststore.p12

在上面,我们生产的 truststore.p12 的密码为 password。

我们针对 Logstash 配置如下的配置文件:

logstash.conf

input {  udp {    port => 5959    codec => json {          target => "[document]"    }  }}output {  stdout {    codec => rubydebug  }  elasticsearch {      index => "logdb"      hosts => ["https://192.168.0.3:9200"]      user => "elastic"      password => "6bTlJp388KkgJKWi+hQr"      ssl_certificate_verification => true      truststore => "/Users/liuxg/test/elasticsearch-8.4.3/config/certs/truststore.p12"      truststore_password => "password"  }}

在上面,我们需要根据自己的 Elasticsearch 账号及密码进行修改。另外你也需要根据自己的证书位置进行相应的调整。 上面的 hosts 是我的本地 Elasticsearch 集群的访问地址。你需要根据自己的进行配置。在上面,我们使用 udp input 来收集日志,并传入到 Elasticsearch。在本示例中,我们忽略了 filter 部分,以简化问题的描述。我们可以把这个 logstash.conf 置于 Logstash 的安装根目录中。

我们可以使用如下的命令来运行:

Python 日志应用

我们首先来安装一个叫做 python-logstash 的包:

 pip install python-logstash

我们设计如下的 Python 应用来通过 Logstash 写入日志:

app.py

import loggingimport logstashimport sysclass Logging(object):    def __init__(self, logger_name='python-logger',                 log_stash_host='localhost',                 log_stash_upd_port=5959                 ):        self.logger_name = logger_name        self.log_stash_host = log_stash_host        self.log_stash_upd_port = log_stash_upd_port    def get(self):        logging.basicConfig(            filename="logfile",            filemode="a",            format="%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s",            datefmt="%H:%M:%S",            level=logging.INFO,        )        self.stderrLogger = logging.StreamHandler()        logging.getLogger().addHandler(self.stderrLogger)        self.logger = logging.getLogger(self.logger_name)        self.logger.addHandler(logstash.LogstashHandler(self.log_stash_host,                                                        self.log_stash_upd_port,                                                        version=1))        return self.loggerinstance = Logging(log_stash_upd_port=5959, log_stash_host='localhost', logger_name='soumil')logger = instance.get()count = 0from time import sleepwhile True:    count = count + 1    if count % 2 == 0:        logger.error('Error Message Code Faield :{} '.format(count))    else:        logger.info('python-logstash: test logstash info message:{} '.format(count))

我们在和 Logstash 运行的同一个机器上运行上面的应用。我们使用如下的方法来运行:

python app.py

我们在 Logstash 的 terminal 中可以看到:

它表明 Logstash 运作正常。

我们再到 Kibana 中打入如下的命令:

GET _cat/indices

从上面的输出中,我们可以看到新生成的 logdb 索引。

我们可以对这个索引进行搜索:

我们可以看到日志被正常地解析并可以被搜索。