-
Notifications
You must be signed in to change notification settings - Fork 93
/
Copy pathelasticsearch_dataset.py
58 lines (47 loc) · 1.76 KB
/
elasticsearch_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Create a datasets off a scroll search query from ElasticSearch"""
import datatable as dt
from elasticsearch import Elasticsearch
from h2oaicore.data import CustomData
_global_modules_needed_by_name = ["elasticsearch"]
"""
Refs:
https://elasticsearch-py.readthedocs.io/en/latest/api.html
https://gist.github.com/hmldd/44d12d3a61a8d8077a3091c4ff7b9307
"""
class ElasticsearchData(CustomData):
INDEX = "some_index"
QUERY = {
"query": {
"match_all": {}
}
}
COL_NAMES = ["id", "event", "timestamp"]
@staticmethod
def es_frame_generator(elastic_client, index, query, col_names):
data = elastic_client.search(index=index, body=query, scroll="10m", size=10)
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
while scroll_size > 0:
for hit in data['hits']['hits']:
yield dt.Frame(
[
[hit["_id"]],
[hit["_source"]["event"]["dataset"]],
[hit["_source"]["@timestamp"]]],
names=col_names,
)
data = elastic_client.scroll(scroll_id=sid, scroll='10m')
sid = data['_scroll_id']
scroll_size = len(data['hits']['hits'])
@staticmethod
def create_data(X: dt.Frame = None):
elastic_client = Elasticsearch() # fill up with auth details
dt_frame = dt.Frame(names=ElasticsearchData.COL_NAMES)
for frame in ElasticsearchData.es_frame_generator(
elastic_client,
ElasticsearchData.INDEX,
ElasticsearchData.QUERY,
ElasticsearchData.COL_NAMES,
):
dt_frame.rbind(frame)
return {"ES_Sample": dt_frame}