社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

Elasticsearch:深入理解 Dissect ingest processor

Elastic • 3 年前 • 351 次点击  
阅读 0

Elasticsearch:深入理解 Dissect ingest processor

Grok 处理器类似,dissect 处理器也从文档中的单个文本字段中提取结构化字段。 但是,与 Grok 处理器不同,解析不使用正则表达式。 这使得 Dissect 的语法更加简单,并且在某些情况下比 Grok Processor 更快。

Dissect 将单个文本字段与定义的模式匹配。在我之前的文章 “Elastic可观测性 - 运用 pipeline 使数据结构化” 中我们已经对 Grok 及 Dissect 处理器做了介绍。在今天的文章中,我们想更深入地了解 dissect 处理器。在今天的讲解中,我将以一些例子来进行展示。

 

动手实践

简单的一个例子

我们先以一个简单的例子啦进行展示:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} [%{loglevel}] %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29T00:39:02.912Z [Debug] MyApp stopped"
      }
    }
  ]
}
复制代码

在上面,我们通过 pattern 来对 message 进行提取。在 disssect 中,特别需要注意的是空格的使用。如果空格不匹配,那么也会造成错误。上面的结果是:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29T00:39:02.912Z",
          "loglevel" : "Debug",
          "message" : "2019-09-29T00:39:02.912Z [Debug] MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T04:40:40.894589Z"
        }
      }
    }
  ]
}
复制代码

显然它提取出来 loglevel, message 以及 status。请注意,我们也丢掉了里面的 [ 及 ] 字符。

 

跳过字段

由于 dissect 是一种确切地匹配,但是在实际的使用中,我们可能并不想要某个字段出现在我们的文档中,虽然它可以被结构化。我们看一下如下的一个例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} [%{?loglevel}] %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29T00:39:02.912Z [Debug] MyApp stopped"
      }
    }    
  ]
}
复制代码

在上面的例子中,我们使用了 %{?loglevel},它表明我们不需要 loglevel  出现在我们的结果中:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29T00:39:02.912Z",
          "message" : "2019-09-29T00:39:02.912Z [Debug] MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T04:47:24.7823Z"
        }
      }
    }
  ]
}
复制代码

显然在这个输出中,没有了之前的 loglevel 这个字段了。

 

处理多个空格

Dissect 处理器是非常严格的。它需要完全匹配的空格,否则解析将不会成功,比如:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29  MyApp stopped"
      }
    }    
  ]
}
复制代码

在上面,我们故意在 MyApp stopped 之前多加了一个空格,那么上面解析的结果是:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "2019-09-29  MyApp stopped",
          "status" : ""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:01:58.065065Z"
        }
      }
    }
  ]
}
复制代码

从上面的结果中可以看出来,它完全解析不了我们的 message。status 字段显示为空。那么我们该如何处理这个呢?
我们可以使用向右的 padding 修饰符 -> 忽略 padding:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp->} %{status}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019-09-29  MyApp stopped"
      }
    }    
  ]
}
复制代码

上面的运行结果是:




    
{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "2019-09-29  MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:07:23.294188Z"
        }
      }
    }
  ]
}
复制代码

我们也可以使用一个空的键来跳过不想要的空格:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "[%{@timestamp}]%{->}[%{status}]"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "[2019-09-29] [MyApp stopped]"
      }
    },
    {
      "_source": {
        "message": "[2019-09-29]  [MyApp stopped]"
      }
    }    
  ]
}
复制代码

在上面我们使用了 %{->} 来匹配不想要的空格。在上面,我们使用了两个文档,一个文档含有一个空格,另外一个文档含有两个空格。运行的结果如下:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "[2019-09-29] [MyApp stopped]",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:21:14.752694Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019-09-29",
          "message" : "[2019-09-29]  [MyApp stopped]",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:21:14.752701Z"
        }
      }
    }
  ]
}
复制代码

 

追加字段

在很多的情况下,我们甚至可以把很多的字段追加到一个字段中去,比如:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} %{+@timestamp} %{+@timestamp} %{loglevel} %{status}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "Oct 29 00:39:02 Debug MyApp stopped"
      }
    }    
  ]
}
复制代码

在上面,我们的时间表达式是 Oct 29 00:39:02。它是由三个字符串组成的。我们通过 %{@timestamp} %{+@timestamp} %{+@timestamp} 来把这三个字符串组合成一个 @timestamp 字段。运行上面的结果是:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "Oct 29 00:39:02",
          "loglevel" : "Debug",
          "message" : "Oct 29 00:39:02 Debug MyApp stopped",
          "status" : "MyApp stopped"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:27:29.785206Z"
        }
      }
    }
  ]
}
复制代码

请注意在上面的例子中,我们使用了 append_separator,并配置它为空字符串。否则在我们的结果中三个字符串将被级联起来,从而变成 Oct2900:39:02。这个在实际的使用中,可能并不是我们想要的结果。

 

提前 key-value

我们可以使用 %{*field} 当做 key,并把 %{&field} 当做 value 来匹配:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Example using dissect processor key-value",
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{@timestamp} %{*field1}=%{&field1} %{*field2}=%{&field2}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "2019009-29T00:39:02.912Z host=AppServer status=STATUS_OK"
      }
    }
  ]
}
复制代码

上面的运行结果是:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "@timestamp" : "2019009-29T00:39:02.912Z",
          "host" : "AppServer",
          "message" : "2019009-29T00:39:02.912Z host=AppServer status=STATUS_OK",
          "status" : "STATUS_OK"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:34:30.47561Z"
        }
      }
    }
  ]
}
复制代码

 

挑战自己

从上面的练习中,可能你已经感觉到这个 dissect 处理器是非常有用的,而且也是非常简单易用的。那么我们现在来做一个真正实用的一个例子:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

上面是一个 haproxy 的例子。信息很长。我们该如何使用 processor 来处理上面的信息并使之成为一个结构化的文档呢?

我们可以使用 dissect 处理器。按照我们上面所学的东西,我们可以先这么处理:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp}"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面,我们把前面的三个字符串连接成为一个 timestamp 的字段。运行上面的命令:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "timestamp" : """Mar2201:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:38:44.674567Z"
        }
      }
    }
  ]
}
复制代码

显然前面的三个字符串连成一个字符串,并且它很贪婪。它把后面所有的字符串都匹配到这个字符串中。我们需要重新进行修改:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

我们添加了 append_separator,并使用 %{host} 来匹配后面所有的字符串:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "host" : """localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "timestamp" : "Mar 22 01:27:39"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:41:53.667182Z"
        }
      }
    }
  ]
}
复制代码

显然这次,我们可以清楚地看到 timestamp 这个字段,但是 host 字段还是一个很长的字符串。我们接着处理:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{rest}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面,我们提取 process 以及其 id,并把其它的内容放入到 %{rest} 中去:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "rest" : """ Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "process" : "haproxy",
          "host" : "localhost",
          "id" : "14415",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "timestamp" : "Mar 22 01:27:39"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:46:11.833548Z"
        }
      }
    }
  ]
}
复制代码

从上面的 rest 中,我们可以看出来前面的部分是一个 status,而后面的是一个 kv 类型的数据。我们可以使用 kv processor 来对它进行处理。

我们首先来提取 status:




    
POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
          "append_separator": " "
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

运行上面的命令:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "rest" : """reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "process" : "haproxy",
          "host" : "localhost",
          "id" : "14415",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "status" : " Server updates /appServer02 is UP",
          "timestamp" : "Mar 22 01:27:39"
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:50:18.300969Z"
        }
      }
    }
  ]
}
复制代码

显然,我们可以得到 status 这个字段。在接下来的 rest 字段中显然是一个 key-value 这样的信息。我们可以使用  kv processor 来进行处理:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
          "append_separator": " "
        }
      },
      {
        "kv": {
          "field": "rest",
          "field_split": ", ",
          "value_split": ":"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面我们添加了一个叫做 kv 的处理器:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "rest" : """reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "reason" : " Layer7 check passed",
          "process" : "haproxy",
          "code" : "2000",
          "check duration" : "3ms.",
          "message" : """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms.""",
          "host" : "localhost",
          "id" : "14415",
          "status" : " Server updates /appServer02 is UP",
          "timestamp" : "Mar 22 01:27:39",
          "info" : "\"OK\""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T06:00:37.990909Z"
        }
      }
    }
  ]
}
复制代码

从上面的结果中,我们可以看出来我们得到了所有的想要的字段。我们接下来删除那个不想要的 message 及 rest 字段:

POST _ingest/pipeline/_simulate
{
  "pipeline": {
    "processors": [
      {
        "dissect": {
          "field": "message",
          "pattern": "%{timestamp} %{+timestamp} %{+timestamp} %{host} %{process}[%{id}]:%{status}, %{rest}",
          "append_separator": " "
        }
      },
      {
        "kv": {
          "field": "rest",
          "field_split": ", ",
          "value_split": ":"
        }
      },
      {
        "remove": {
          "field": "message"
        }
      },
      {
        "remove": {
          "field": "rest"
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": """Mar 22 01:27:39 localhost haproxy[14415]: Server updates /appServer02 is UP, reason: Layer7 check passed, code:2000, info:"OK", check duration:3ms."""
      }
    }
  ]
}
复制代码

在上面,我运用 remove 处理器删除了 message 以及 rest 字段:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_doc",
        "_id" : "_id",
        "_source" : {
          "reason" : " Layer7 check passed",
          "process" : "haproxy",
          "code" : "2000",
          "check duration" : "3ms.",
          "host" : "localhost",
          "id" : "14415",
          "status" : " Server updates /appServer02 is UP",
          "timestamp" : "Mar 22 01:27:39",
          "info" : "\"OK\""
        },
        "_ingest" : {
          "timestamp" : "2020-12-09T05:59:44.138394Z"
        }
      }
    }
  ]
}
复制代码

从上面的一步一步的过程中,我们可以看出来如何对一个非结构化的数据进行结构化。

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/109955
 
351 次点击