python 使用elasticsearch模块获取ELK系统上的索引数据
这次想要开发用户行为分析系统,所以要把网站日志导入到数据库中,而刚好我们部署了EKL日志系统,所以此次写了个脚本,直接写python脚本,利用elasticsearch模块,直接从ELK上获取所需的数据。
以下所示范内容与真实需求有出入,这只是为了方便理解脚本代码的意思
EKL中,索引文件名如是:logstash-nginx-2017-01-20这样的格式,
我们只要在elasticsearch中获取时间,用户IP,用户访问的路由等(其实真实场景还有更多)。
代码如下:
#!/usr/bin/env python #coding=utf-8 """ __author__ = '戴儒锋' 使用elasticsearch模块获取昨天某站点访问日志的所有数据 elasticsearch模块中指定使用scroll用来避免深度分页查找数据时的性能消耗 scan(扫描)搜索类型是和scroll(滚屏)API一起使用来从Elasticsearch里高效地取回巨大数量的结果而不需要付出深分页的代价。 size被应用到每一个分片上,所以我们在每个批次里最多或获得size * number_of_primary_shards(size*主分片数) scroll= "1m" 指定快照时间为1分钟 """ import datetime from elasticsearch import Elasticsearch # 格式为:2016.7.19 的昨日日期 yesterday = (datetime.datetime.now() + datetime.timedelta(days = -1)).strftime("%Y.%m.%d") # 格式为:2016-7-19 的昨日日期 filter_yesterday = (datetime.datetime.now() + datetime.timedelta(days = -1)).strftime("%Y-%m-%d") # 格式为:2016.7.18 的前天日期 before_yesterday = (datetime.datetime.now() + datetime.timedelta(days = -2)).strftime("%Y.%m.%d") # 请求elasticsearch节点的url url = "http://192.168.1.41:9200/" # 使用的索引,因日期时区问题,所以要指定昨天和前天的索引名 index_name = "logstash-apache-www.linuxyw.com-{date},logstash-apache-www.linuxyw.com-{b_date}".format(date=yesterday,b_date=before_yesterday) # 实例化Elasticsearch类,并设置超时间为120秒,默认是10秒的,如果数据量很大,时间设置更长一些 es = Elasticsearch(url,timeout=120) # DSL查询语法,在下面es.search使用 data = { "size": 10000000, #指定每个分片最大返回的数据量,可根据日志量进行设置 "query" : { "bool":{ # 指定要匹配的字符,这里是查找所有数据 "must" : {"match_all":{}}, # 过滤,指定时间范围,这里设置成昨天0点到24点,代码上||-8h,因为ELK用的是UTC时间,跟北京时间误差8小时,所以要减8小时,这就是日志里的北京时间了 "filter" : { "range" : { "@timestamp" : { "gt" : "{date}T00:00:00||-8h".format(date=filter_yesterday), "lt" : "{date}T23:59:59||-8h".format(date=filter_yesterday), } } } } } } # 设置要过滤返回的字段值,要什么字段,就在这里添加,这样可以节约返回的数据量(带宽,内存等) return_fields = [ '_scroll_id', 'hits.hits._source.timestamp', 'hits.hits._source.@timestamp', 'hits.hits._source.clientip', 'hits.hits._source.request', ] def main(): # 指定search_type="scan"模式,并返回_scroll_id给es.scroll获取数据使用 res = es.search( index=index_name, body=data, search_type="scan", scroll="1m" ) scrollId=res["_scroll_id"] # 获取scrollID response= es.scroll(scroll_id=scrollId, scroll= "1m",filter_path=return_fields,) print len(response['hits']['hits']) # 打印获取到的日志数量 # for hit in response['hits']['hits']: # print hit['_source'] if __name__ == "__main__": main()
for hit in response['hits']['hits']:
print hit
输出的结果如:
{u'timestamp': u'19/Jul/2016:08:00:34 +0800', u'@timestamp': u'2016-07-19T00:00:35.380Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'60.221.255.44'} {u'timestamp': u'19/Jul/2016:08:00:34 +0800', u'@timestamp': u'2016-07-19T00:00:36.507Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'112.5.16.13'} {u'timestamp': u'19/Jul/2016:08:00:36 +0800', u'@timestamp': u'2016-07-19T00:00:36.508Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'115.238.250.237'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'115.231.218.18'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'123.134.186.178'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'218.61.8.10'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.934Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'118.212.147.71'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'210.32.125.68'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'222.163.80.18'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'222.186.33.170'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'218.199.110.39'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'114.80.201.18'} {u'timestamp': u'19/Jul/2016:00:02:34 +0800', u'@timestamp': u'2016-07-18T16:02:34.935Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'115.238.250.237'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'58.215.186.208'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'122.227.164.103'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'222.186.33.170'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.208Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'223.100.7.69'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.209Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'218.6.9.4'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'183.136.232.136'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.114Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'221.203.236.213'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'124.119.87.204'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'43.254.144.227'} {u'timestamp': u'19/Jul/2016:08:04:35 +0800', u'@timestamp': u'2016-07-19T00:04:36.115Z', u'request': u'/apk/dwuliu.apk', u'clientip': u'59.173.18.243'}
最后推荐些Elasticsearch教程资料:
Elasticsearch 权威指南(中文版):http://es.xiaoleilu.com/index.html
Python Elasticsearch Client:http://elasticsearch-py.readthedocs.io/en/master/