Elasticsearch Ingest Pipeline Simple Setup

Jasmine H
5 min readSep 15, 2021

Kibana上設定Ingest pipeline

在不改動到 Fluentd config 的情境下,透過 Kibana介面設定 Ingest Pipeline來做做欄位處理。等於是在資料進到 elasticsearch才進行處理,而不是在 log shipper 端處理,缺點是這樣做會加重 elasticsearch cluster 的 loading。

Ingest pipeline 有約 40 種內建的 processor,包含 Grok processor,processor之間也能疊加多種使用。

Case 1: 收進的 log 為 json,parse 出各欄位

Steps :

1. Kibana -> Stack Management -> Ingest Node Pipeline 新增一個 Pipeline,套用內建的 json Processor。field為要處理的現有欄位、target_field為轉出來的新欄位名稱(可自訂)。

有需要也可疊加多個processor,約40種內建 Processor 參考官網用法 :https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html

2. 至 Dev Tool 透過PUT _index_template語法將需要 parse json 的 index pattern 綁上剛剛建立的pipeline

PUT _index_template/name_your_index_template
{
"index_patterns" : [ "myindex-*"],
"priority" : 1,
"template" : {
"settings" : {
"index" : {
"default_pipeline" : "json"
}
}
}
}

3. GET _index/template 可檢查看到設定成功

4. 成功後 Discover 中 data 可看到多了 parse 後產生的欄位

5. Dev Tool 中下 GET _nodes/stats/ingest檢查pipeline處理的 data 量與failed 筆數

6. 若需要對”現有”的 index 套用此pipeline,需做 Reindex

POST /_reindex?wait_for_completion=true
{
"source" : {
"index":"old_index_xxx"
},
"dest" : {
"index":"new_index_xxx",
"pipeline":"json"
}
}

Case 2 : Grok Processor 取出 string 中的 pattern

Create pipeline 中選擇 Grok , Field 為要處理的欄位,此例 Pattern 為取出 level= (格式為WORD) & ts= (格式為TIMESTAMP_ISO8601)

原始 Message :

"message": ["level=warn ts=2021-09-15T07:31:59.589Z caller=cluster.go:438 component=cluster msg=refresh result=failure"],

Processors

[ 
{ "grok":
{ "field": "message",
"patterns":
[ "level=%{WORD:level} ts=%{TIMESTAMP_ISO8601:alert_time}" ]
} }
]

經過此 pattern 處理後的Log 新增了兩個欄位 : Output

{ “docs”: 
[ { “doc”:
{ “_index”: “index”, “_type”: “_doc”, “_id”: “id”,
“_source”:
{ “alert_time”: “2021–09–15T07:31:59.589Z”,
“message”: “level=warn ts=2021–09–15T07:31:59.589Z caller=cluster.go:438 component=cluster msg=refresh result=failure”,
“level”: “warn” },
“_ingest”: { “timestamp”: “2021–09–15T07:47:13.934322401Z” } } } ] }

Case 3 : Drop Processor 過濾不要收進 ES 的 Log

Step 1 : Add Ingest Pipeline with “drop”

Add if condition with Painless script

[ 
{ "drop":
{ "if": "ctx.kubernetes.container_name == 'mariadb'","ignore_failure": true }
},
{ "drop":
{ "if": "ctx.kubernetes.container_name == 'kibana'","ignore_failure": true }
}
]

or

!(ctx['kubernetes']['container_name'].contains('dontDropThis'))

Reference :

--

--

Jasmine H

Data Engineer from Taiwan, recently working on EFK and Kubernetes projects.