用Data-pipeline模式将nginx日志存储到elasticsearch中(续)

接上篇的内容,这篇我们要解决第二个问题,就是如何将我们转换完成的json数据发送到es中去,上篇提到了要存储到es中,我们要使用kafka来做消息队列,实现发布和订阅消息流模式,因为涉及kafka内容,所以我们先说一下Kafka的一些基本知识,然后再看代码不然一是一头雾水。

kafka介绍:

1、什么是kafka?

2、kafka特性

3、kafka解决了什么问题?

4、kafka的一些概念和名词

除了上面的一些内容外,我也谈一下我自己的一些理解,为什么kafka变得如此流行,很大程度上因为的特性决定的,

比如数据的聚合,怎么理解,就是我们可以把产生数据的系统就做生产者,有多少生产者无所谓,只要建立对应的topic,把数据发送到这个topic中就可以,如果有不同部门向要相同的数据,就去订阅这个topic即可,这样最大的好处是使我们系统结构清晰,同时能减少我们很多重复的工作。

另外一个重要的特性就是高并发性,并支持分布式部署,在一个繁忙的系统中,产生的日志或者其它数据是非常庞大的,要对这些数据进行处理,必须要一个高吞吐量、低延迟的处理系统,Kafka正好满足这个需求,每个topic可以建立一个或多个分区,每个分区你可以简单理解为一个公路上的多个车道,每个车就是数据,因为车道多所以它可以加速数据的传输。

经过上面的介绍,相信大家都对Kafka有了一个基本的了解,那接下来回到我们上面的问题,看如何在我们这个例子中使用:

1、安装kafka,这个因为比较简单就不在费篇幅写了。

2、安装完毕后建立一个topic, 命令:

bin/kafka-topics.sh –create –zookeeper 127.0.0.1:2181 –replication-factor 1 –partitions 1 –topic www_logs

3、安装:pip install kafka-python

4、实现生产者类:

代码不是很多,先定义了初始化类的__init__方法,需要两个参数,一个就是broker这个没什么可说的,value_serializer的意思是用于将用户提供的消息值转换为字节,send_page_data()方法是将数据发送给对应的topic。

5、修改下上篇中的脚本,使其实现转换后发送到kafka中,修改如下:

增加的行都已注释,不多解释了。

到这我再梳理下我们都干了什么。

第一,我们实现了access.log日志的实时读取,并传唤成json格式。

第二,我们讲转换完毕的数据发送到kafka中,topic名词是www_logs

以上我们都已完成,接下来的问题是要怎么把数据存储到es中,不过先别急,先让我们验证下我们之前的工作是否正确吧,要验证是否可以从topic中读取数据,我们还需要一个消费者程序,为简单验证,我这边实现一个最简单的消费者程序,如下:

就是从topic中读取数据,然后打印,如果没问题,就可以证明这条通路是通的,首先在一个终端运行,我们的日志分析程序,结果如下:

行太多,我只截取了2行,因为我们有打印,说明转换是没有问题的,但是否发送到了kafka呢,我们在另一个终端运行我们的消费程序,结果如下:

可以看到我们消费者成功的从kafka中取得了数据,说明这条通路我们已经打通了,这条数据流已经没有问题,那接下来就剩下最后存储到es中的问题了,我们后续再接续,这篇已经很长了。