社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Elasticsearch

Tshark + Elasticsearch 打造流量回溯分析系统

puyu • 4 年前 • 489 次点击  
阅读 36

Tshark + Elasticsearch 打造流量回溯分析系统

tshark是网络分析工具wireshark下的一个工具,主要用于命令行环境进行抓包、分析,尤其对协议深层解析时,tcpdump难以胜任的场景中。本文将介绍与tshark相关的流量解决方案。

img
Photo by hao wang on Unsplash

tshark + elastic stack

利用tshark,不仅可以对现有的pcap文件进行分析,由于可以输出其他格式,也就可以结合ES的强大搜索能力,达到对数据报文进行记录、分析处理的能力,可以实现回溯分析,结合kibana可视化工具,甚至达到实时可视化监控。

elastic stack全家桶

最早ELK全家桶,logstash性能一直被诟病,后来另起炉灶,针对采集使用golang构建出一套beats,用于不同的采集场景。其中针对网络流量,开发出packetbeat

packetbeat的优势是定制了elasticsearchmappingkibana一系列可视化图表,可以满足一般对tcp、dns、udp等常规报文的分析。基本达到开箱即用程度。

packetbeat也有不足,对报文的分析采用会话分析,没有一个个报文单独分析,倾向于应用层,对网络层面分析不足(尤其是故障排查时),此外,支持的协议有限,仅常见协议与tshark的2000多种存在明显差距,当遇到不支持时,需要等待支持或手动写插件,难度极高。

离线导入elasticsearch

tshark支持将pcap报文分析后生成json文件导入elasticsearch,同时支持elasticsearch的批量导入接口_bulk的格式,命令如下:

tshark -r test_trace.pcap -T ek > test_trace.pcap.json

之后可以将json文件通过curl导入。

curl -s -H “Content-Type: application/x-ndjson” -XPOST “localhost:9200/foo/_bulk” — data-binary “@/Users/test-elastic/test_trace.pcap.json”

注:

  1. 导入时可能存在导入失败,由于_bulk接口对post的文件大小有限制,尽量不要超过15MB,最好在10MB以内。如果超过,建议使用tshark的输出条件生成多个json文件,使用curl依次导入。
  2. 如果导入失败,可以在curl-v查看提示信息。
  3. 默认索引名为类似 packets-2019-04-23(报文记录的日期),可以导入后重新索引

可以使用类似以下命令查看导入情况:

curl ‘http://127.0.0.1:9200/packets-2019-04-23/_search/?size=10&pretty=true'

实时监控方案

主要思路

  • 使用tshark实时抓取报文,并启用过滤策略
  • 使用tshark解析捕获的报文,提取指定的字段并写入csv文件中,或者使用json格式在下一步进行ETL
  • 使用filebeat持续检测csv文件,并发个logstash用于字段过滤之类(如无需过滤可以跳过logstash直接发给elasticsearch
  • logstash对字段进行过滤,格式转化等,之后发到elasticsearch
  • 使用kibana进行数据可视化,对报文统计分析

简单实现

1. tshark部分

tshark -i phy0.mon -t ad -t ad -lT fields -E separator=, -E quote=d   -e _ws.col.Time  -e wlan.fc.type -e wlan.fc.type_subtype -e radiotap.dbm_antsignal -e frame.len -e radiotap.datarate   > tshark.csv
复制代码

2. filebeat

简单filebeat.yml配置文件

filebeat.modules:
- module: system
  syslog:
    enabled: false
  auth:
    enabled: true
    var.paths: ["/home/tshark.csv"]
name: test
output.logstash:
  hosts: ["localhost:5044"]
复制代码

3. logstash

logstash.yml文件,主要分为:

  • 监听5044端口接收filebeat的数据
  • 对数据按照csv格式解析,字段分割
  • 对日期处理,转换格式
  • 添加时、分、秒,便于索引
  • 对部分字段转换为数字格式
  • 替换部分字段
  • 输出到elasticsearch
input {
  beats {
    port => 5044
  }
}

csv {
     source => "message"
     columns => [ "col.time","frame.type","frame.subtype","rssi","frame.size","data.rate" ]
   }

date {
     match => [ "col.time", "YYYY-MM-DD HH:mm:ss.SSSSSSSSS" ]
     target => "@timestamp"
   }

mutate {
  add_field => {"[hour]" => "%{+HH}"}
  add_field => {"[minute]" => "%{+mm}"}
  add_field => {"[second]" => "%{+ss}"}

  }

mutate {
     convert => [ "rssi", "integer" ]
     convert => [ "frame.size", "integer" ]
     convert => [ "data.rate", "integer" ]   
     convert => [ "second", "integer" ]  
     convert => [ "minute", "integer" ]  
     convert => [ "hour", "integer" ]  

   }

if[frame.type]=="0"{
   mutate {
     replace => [ "frame.type", "Management" ]
   }}
   if[frame.type]=="1"{
   mutate {
     replace => [ "frame.type", "Control" ]
   }}
   if[frame.type]=="2"{
   mutate {
     replace => [ "frame.type", "Data" ]
   }}

output {
  elasticsearch {
    hosts => ["http://localhost:9200"]
    index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}" 
    document_type => "%{[@metadata][type]}" 
  }
}
复制代码

4. elasticsearch

可以预先导入索引定义mapping,这块可以查elasticsearch文档

5. kibana

利用图表、面板等进行数据可视化,实现监控功能,这块可根据业务需求进行发挥~

小结

使用tshak的报文解析、数据导出功能,可以根据需求灵活处理,借助开源的大数据工具,可以实现更贴合业务的工具,实现快速对网络分析、实时监控、故障排查、高级检索、回溯分析、统计报表等,甚至在部分场景下可以替代商业的网络回溯分析系统。

参考

未经许可,禁止转载

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/34318
 
489 次点击