Kibana上設定Ingest pipeline
在不改動到 Fluentd config 的情境下,透過 Kibana介面設定 Ingest Pipeline來做做欄位處理。等於是在資料進到 elasticsearch才進行處理,而不是在 log shipper 端處理,缺點是這樣做會加重 elasticsearch cluster 的 loading。
Ingest pipeline 有約 40 種內建的 processor,包含 Grok processor,processor之間也能疊加多種使用。
Case 1: 收進的 log 為 json,parse 出各欄位
Steps :
1. Kibana -> Stack Management -> Ingest Node Pipeline 新增一個 Pipeline,套用內建的 json Processor。field為要處理的現有欄位、target_field為轉出來的新欄位名稱(可自訂)。
有需要也可疊加多個processor,約40種內建 Processor 參考官網用法 :https://www.elastic.co/guide/en/elasticsearch/reference/current/processors.html
2. 至 Dev Tool 透過PUT _index_template語法將需要 parse json 的 index pattern 綁上剛剛建立的pipeline
PUT _index_template/name_your_index_template
{
"index_patterns" : [ "myindex-*"],
"priority" : 1,
"template" : {
"settings" : {
"index" : {
"default_pipeline" : "json"
}
}
}
}
3. GET _index/template
可檢查看到設定成功
4. 成功後 Discover 中 data 可看到多了 parse 後產生的欄位
5. Dev Tool 中下 GET _nodes/stats/ingest
檢查pipeline處理的 data 量與failed 筆數
6. 若需要對”現有”的 index 套用此pipeline,需做 Reindex
POST /_reindex?wait_for_completion=true
{
"source" : {
"index":"old_index_xxx"
},
"dest" : {
"index":"new_index_xxx",
"pipeline":"json"
}
}
Case 2 : Grok Processor 取出 string 中的 pattern
Create pipeline 中選擇 Grok , Field 為要處理的欄位,此例 Pattern 為取出 level= (格式為WORD) & ts= (格式為TIMESTAMP_ISO8601)
原始 Message :
"message": ["level=warn ts=2021-09-15T07:31:59.589Z caller=cluster.go:438 component=cluster msg=refresh result=failure"],
Processors
[
{ "grok":
{ "field": "message",
"patterns":
[ "level=%{WORD:level} ts=%{TIMESTAMP_ISO8601:alert_time}" ]
} }
]
經過此 pattern 處理後的Log 新增了兩個欄位 : Output
{ “docs”:
[ { “doc”:
{ “_index”: “index”, “_type”: “_doc”, “_id”: “id”,
“_source”:
{ “alert_time”: “2021–09–15T07:31:59.589Z”,
“message”: “level=warn ts=2021–09–15T07:31:59.589Z caller=cluster.go:438 component=cluster msg=refresh result=failure”,
“level”: “warn” },
“_ingest”: { “timestamp”: “2021–09–15T07:47:13.934322401Z” } } } ] }
Case 3 : Drop Processor 過濾不要收進 ES 的 Log
Step 1 : Add Ingest Pipeline with “drop”
Add if condition with Painless script
[
{ "drop":
{ "if": "ctx.kubernetes.container_name == 'mariadb'","ignore_failure": true }
},
{ "drop":
{ "if": "ctx.kubernetes.container_name == 'kibana'","ignore_failure": true }
}
]
or
!(ctx['kubernetes']['container_name'].contains('dontDropThis'))
Reference :