Cloud Native应用交付

  • 首页
  • 关于本站
  • 个人介绍
  • Downloads
  • Repo
    • Github
    • Container
  • F5
    • F5 Python SDK
    • F5-container
    • F5-LBaaS
  • 社交
    • 联系我
    • 微信/微博
    • 公众号
    • 打赏赞助
行至水穷处 坐看云起时
Cloud Native Application Services: cnadn.net
  1. 首页
  2. F5 with ELK
  3. 正文

基于kakfa (confluent)搭建elk(备忘)

2018年06月28日 12971点热度 0人点赞 0条评论

测试搭建一个使用kafka作为消息队列的ELK环境,数据采集转换实现结构如下:

F5 HSL-->logstash(流处理)--> kafka -->elasticsearch

测试中的elk版本为6.3, confluent版本是4.1.1

希望实现的效果是 HSL发送的日志胫骨logstash进行流处理后输出为json,该json类容原样直接保存到kafka中,kafka不再做其它方面的格式处理。

测试环境:

192.168.214.138: 安装 logstash,confluent环境

192.168.214.137: 安装ELK套件(停用logstash,只启动es和kibana)

confluent安装调试备忘:

    1. 像安装elk环境一样,安装java环境先
    2. 首先在不考虑kafka的情形下,实现F5 HSL---Logstash--ES的正常运行,并实现简单的正常kibana的展现。后面改用kafka时候直接将这里output修改为kafka plugin配置即可。
      此时logstash的相关配置
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      input {
        udp {
          port => 8514
          type => 'f5-dns'
        }
      }
       
      filter {
      if [type] == 'f5-dns' {
          grok {
                match => { "message" => "%{HOSTNAME:F5hostname} %{IP:clientip} %{POSINT:clientport} %{IP:svrip} %{NUMBER:qid} %{HOSTNAME:qname} %{GREEDYDA
      TA:qtype} %{GREEDYDATA:status} %{GREEDYDATA:origin}" }
          }
          geoip {
               source => "clientip"
               target => "geoip"
          }
          }
      }
       
       
       
      output {
         #stdout{ codec => rubydebug }
         #elasticsearch {
         # hosts => ["192.168.214.137:9200"]
         # index => "f5-dns-%{+YYYY.MM.dd}"
         #template_name => "f5-dns"
        #}
        kafka {
          codec => json
          bootstrap_servers => "localhost:9092"
          topic_id => "f5-dns-kafka"
        }
      }
    3. 发一些测试流量,确认es正常收到数据,查看cerebro上显示的状态。(截图是调试完毕后截图)
      cd /usr/share/cerebro/cerebro-0.8.1/;./bin/cerebro -Dhttp.port=9110 -Dhttp.address=0.0.0.0
    4. 安装confluent,由于是测试环境,直接confluent官方网站下载压缩包,解压后使用。位置在/root/confluent-4.1.1/下
    5. 由于是测试环境,直接用confluent的命令行来启动所有相关服务,发现kakfa启动失败
      Shell
      1
      2
      3
      4
      5
      6
      7
      8
      [root@kafka-logstash bin]# ./confluent start
      Using CONFLUENT_CURRENT: /tmp/confluent.dA0KYIWj
      Starting zookeeper
      zookeeper is [UP]
      Starting kafka
      /Kafka failed to start
      kafka is [DOWN]
      Cannot start Schema Registry, Kafka Server is not running. Check your deployment

      检查发现由于虚机内存给太少了,导致java无法分配足够内存给kafka
      1
      2
      [root@kafka-logstash bin]# ./kafka-server-start ../etc/kafka/server.properties
      OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Cannot allocate memory' (errno=12)

      扩大虚拟机内存,并将logstash的jvm配置中设置的内存调小
    6.  kafka server配置文件
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      [root@kafka-logstash kafka]# pwd
      /root/confluent-4.1.1/etc/kafka
      [root@kafka-logstash kafka]# egrep -v "^#|^$" server.properties
      broker.id=0
      listeners=PLAINTEXT://localhost:9092
      num.network.threads=3
      num.io.threads=8
      socket.send.buffer.bytes=102400
      socket.receive.buffer.bytes=102400
      socket.request.max.bytes=104857600
      log.dirs=/tmp/kafka-logs
      num.partitions=1
      num.recovery.threads.per.data.dir=1
      offsets.topic.replication.factor=1
      transaction.state.log.replication.factor=1
      transaction.state.log.min.isr=1
      log.retention.hours=168
      log.segment.bytes=1073741824
      log.retention.check.interval.ms=300000
      zookeeper.connect=localhost:2181
      zookeeper.connection.timeout.ms=6000
      confluent.support.metrics.enable=true
      confluent.support.customer.id=anonymous
      group.initial.rebalance.delay.ms=0
    7. connect 配置文件,此配置中,将原来的avro converter替换成了json,同时关闭了key vlaue的schema识别。因为我们输入的内容是直接的json类容,没有相关schema,这里只是希望kafka原样解析logstash输出的json内容到es
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      [root@kafka-logstash kafka]# pwd
      /root/confluent-4.1.1/etc/kafka
      [root@kafka-logstash kafka]# egrep -v "^#|^$" connect-standalone.properties
      bootstrap.servers=localhost:9092
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=false
      value.converter.schemas.enable=false
      internal.key.converter=org.apache.kafka.connect.json.JsonConverter
      internal.value.converter=org.apache.kafka.connect.json.JsonConverter
      internal.key.converter.schemas.enable=false
      internal.value.converter.schemas.enable=false
      offset.storage.file.filename=/tmp/connect.offsets
      offset.flush.interval.ms=10000
      plugin.path=share/java

      如果不做上述修改,connect总会在将日志sink到ES时提示无法反序列化,magic byte错误等。如果使用confluent status命令查看,会发现connect会从up变为down
      1
      2
      3
      4
      5
      6
      7
      [root@kafka-logstash confluent-4.1.1]# ./bin/confluent status
      ksql-server is [DOWN]
      connect is [DOWN]
      kafka-rest is [UP]
      schema-registry is [UP]
      kafka is [UP]
      zookeeper is [UP]

       
    8.  schema-registry 相关配置
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      [root@kafka-logstash schema-registry]# pwd
      /root/confluent-4.1.1/etc/schema-registry
      [root@kafka-logstash schema-registry]# egrep -v "^#|^$"
      connect-avro-distributed.properties  connect-avro-standalone.properties   log4j.properties                     schema-registry.properties
      [root@kafka-logstash schema-registry]# egrep -v "^#|^$" connect-avro-standalone.properties
      bootstrap.servers=localhost:9092
      key.converter.schema.registry.url=http://localhost:8081
      value.converter.schema.registry.url=http://localhost:8081
      key.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      key.converter.schemas.enable=false
      value.converter.schemas.enable=false
      internal.key.converter=org.apache.kafka.connect.json.JsonConverter
      internal.value.converter=org.apache.kafka.connect.json.JsonConverter
      internal.key.converter.schemas.enable=false
      internal.value.converter.schemas.enable=false
      offset.storage.file.filename=/tmp/connect.offsets
      plugin.path=share/java
      [root@kafka-logstash schema-registry]# egrep -v "^#|^$" schema-registry.properties
      listeners=http://0.0.0.0:8081
      kafkastore.connection.url=localhost:2181
      kafkastore.topic=_schemas
      debug=false
    9.  es-connector的配置文件
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      [root@kafka-logstash kafka-connect-elasticsearch]# pwd
      /root/confluent-4.1.1/etc/kafka-connect-elasticsearch
      [root@kafka-logstash kafka-connect-elasticsearch]# egrep -v "^#|^$" quickstart-elasticsearch.properties
      name=f5-dns
      connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
      tasks.max=1
      topics=f5-dns-kafka
      key.ignore=true
      value.ignore=true
      schema.ignore=true
      connection.url=http://192.168.214.137:9200
      type.name=doc
      transforms=MyRouter
      transforms.MyRouter.type=org.apache.kafka.connect.transforms.TimestampRouter
      transforms.MyRouter.topic.format=${topic}-${timestamp}
      transforms.MyRouter.timestamp.format=yyyyMMdd

      上述配置中topics配置是希望传输到ES的topic,通过设置transform的timestamp router来实现将topic按天动态映射为ES中的index,这样可以让ES每天产生一个index。注意需要配置schema.ignore=true,否则kafka无法将受收到的数据发送到ES上,connect的 connect.stdout 日志会显示:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      [root@kafka-logstash connect]# pwd
      /tmp/confluent.dA0KYIWj/connect
       
      Caused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
      at io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84)
      at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:221)
      at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66)
      at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)
      at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:162)
      at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:524)
    10.  配置修正完毕后,向logstash发送数据,发现日志已经可以正常发送到了ES上,且格式和没有kafka时是一致的。
      没有kafka时:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      {
      "_index": "f5-dns-2018.06.26",
      "_type": "doc",
      "_id": "KrddO2QBXB-i0ay0g5G9",
      "_version": 1,
      "_score": 1,
      "_source": {
      "message": "localhost.lan 202.202.102.100 53777 172.16.199.136 42487 www.test.com A NOERROR GTM_REWRITE ",
      "F5hostname": "localhost.lan",
      "qid": "42487",
      "clientip": "202.202.102.100",
      "geoip": {
      "region_name": "Chongqing",
      "location": {
      "lon": 106.5528,
      "lat": 29.5628
      },
      "country_code2": "CN",
      "timezone": "Asia/Shanghai",
      "country_name": "China",
      "region_code": "50",
      "continent_code": "AS",
      "city_name": "Chongqing",
      "country_code3": "CN",
      "ip": "202.202.102.100",
      "latitude": 29.5628,
      "longitude": 106.5528
      },
      "status": "NOERROR",
      "qname": "www.test.com",
      "clientport": "53777",
      "@version": "1",
      "@timestamp": "2018-06-26T09:12:21.585Z",
      "host": "192.168.214.1",
      "type": "f5-dns",
      "qtype": "A",
      "origin": "GTM_REWRITE ",
      "svrip": "172.16.199.136"
      }
      }

      有kafka时:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      {
      "_index": "f5-dns-kafka-20180628",
      "_type": "doc",
      "_id": "f5-dns-kafka-20180628+0+23",
      "_version": 1,
      "_score": 1,
      "_source": {
      "F5hostname": "localhost.lan",
      "geoip": {
      "city_name": "Chongqing",
      "timezone": "Asia/Shanghai",
      "ip": "202.202.100.100",
      "latitude": 29.5628,
      "country_name": "China",
      "country_code2": "CN",
      "continent_code": "AS",
      "country_code3": "CN",
      "region_name": "Chongqing",
      "location": {
      "lon": 106.5528,
      "lat": 29.5628
      },
      "region_code": "50",
      "longitude": 106.5528
      },
      "qtype": "A",
      "origin": "DNSX ",
      "type": "f5-dns",
      "message": "localhost.lan 202.202.100.100 53777 172.16.199.136 42487 www.myf5.net A NOERROR DNSX ",
      "qid": "42487",
      "clientport": "53777",
      "@timestamp": "2018-06-28T09:05:20.594Z",
      "clientip": "202.202.100.100",
      "qname": "www.myf5.net",
      "host": "192.168.214.1",
      "@version": "1",
      "svrip": "172.16.199.136",
      "status": "NOERROR"
      }
      }

       
    11. 相关REST API输出
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      67
      68
      69
      70
      http://192.168.214.138:8083/connectors/elasticsearch-sink/tasks
       
      [
        {
          "id": {
            "connector": "elasticsearch-sink",
            "task": 0
          },
          "config": {
            "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
            "type.name": "doc",
            "value.ignore": "true",
            "tasks.max": "1",
            "topics": "f5-dns-kafka",
            "transforms.MyRouter.topic.format": "${topic}-${timestamp}",
            "transforms": "MyRouter",
            "key.ignore": "true",
            "schema.ignore": "true",
            "transforms.MyRouter.timestamp.format": "yyyyMMdd",
            "task.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkTask",
            "name": "elasticsearch-sink",
            "connection.url": "http://192.168.214.137:9200",
            "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter"
          }
        }
      ]
       
      http://192.168.214.138:8083/connectors/elasticsearch-sink/
      {
        "name": "elasticsearch-sink",
        "config": {
          "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
          "type.name": "doc",
          "value.ignore": "true",
          "tasks.max": "1",
          "topics": "f5-dns-kafka",
          "transforms.MyRouter.topic.format": "${topic}-${timestamp}",
          "transforms": "MyRouter",
          "key.ignore": "true",
          "schema.ignore": "true",
          "transforms.MyRouter.timestamp.format": "yyyyMMdd",
          "name": "elasticsearch-sink",
          "connection.url": "http://192.168.214.137:9200",
          "transforms.MyRouter.type": "org.apache.kafka.connect.transforms.TimestampRouter"
        },
        "tasks": [
          {
            "connector": "elasticsearch-sink",
            "task": 0
          }
        ],
        "type": "sink"
      }
       
      http://192.168.214.138:8083/connectors/elasticsearch-sink/status
      {
        "name": "elasticsearch-sink",
        "connector": {
          "state": "RUNNING",
          "worker_id": "172.16.150.179:8083"
        },
        "tasks": [
          {
            "state": "RUNNING",
            "id": 0,
            "worker_id": "172.16.150.179:8083"
          }
        ],
        "type": "sink"
      }

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      54
      55
      56
      57
      58
      59
      60
      61
      62
      63
      64
      65
      66
      http://192.168.214.138:8082/brokers
      {
        "brokers": [
          0
        ]
      }
       
      http://192.168.214.138:8082/topics
      [
        "__confluent.support.metrics",
        "_confluent-ksql-default__command_topic",
        "_schemas",
        "connect-configs",
        "connect-offsets",
        "connect-statuses",
        "f5-dns-2018.06",
        "f5-dns-2018.06.27",
        "f5-dns-kafka",
        "test-elasticsearch-sink"
      ]
       
       
       
      http://192.168.214.138:8082/topics/f5-dns-kafka
      {
        "name": "f5-dns-kafka",
        "configs": {
          "file.delete.delay.ms": "60000",
          "segment.ms": "604800000",
          "min.compaction.lag.ms": "0",
          "retention.bytes": "-1",
          "segment.index.bytes": "10485760",
          "cleanup.policy": "delete",
          "follower.replication.throttled.replicas": "",
          "message.timestamp.difference.max.ms": "9223372036854775807",
          "segment.jitter.ms": "0",
          "preallocate": "false",
          "segment.bytes": "1073741824",
          "message.timestamp.type": "CreateTime",
          "message.format.version": "1.1-IV0",
          "max.message.bytes": "1000012",
          "unclean.leader.election.enable": "false",
          "retention.ms": "604800000",
          "flush.ms": "9223372036854775807",
          "delete.retention.ms": "86400000",
          "leader.replication.throttled.replicas": "",
          "min.insync.replicas": "1",
          "flush.messages": "9223372036854775807",
          "compression.type": "producer",
          "min.cleanable.dirty.ratio": "0.5",
          "index.interval.bytes": "4096"
        },
        "partitions": [
          {
            "partition": 0,
            "leader": 0,
            "replicas": [
              {
                "broker": 0,
                "leader": true,
                "in_sync": true
              }
            ]
          }
        ]
      }

       

测试中kafka的配置基本都为确实配置,没有考虑任何的内存优化,kafka使用磁盘的大小考虑等

测试参考:

https://docs.confluent.io/current/installation/installing_cp.html

https://docs.confluent.io/current/connect/connect-elasticsearch/docs/elasticsearch_connector.html

https://docs.confluent.io/current/connect/connect-elasticsearch/docs/configuration_options.html

存储机制参考 https://blog.csdn.net/opensure/article/details/46048589

kafka配置参数参考 https://blog.csdn.net/lizhitao/article/details/25667831

更多kafka原理 https://blog.csdn.net/ychenfeng/article/details/74980531

confluent CLI:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
confluent: A command line interface to manage Confluent services
 
Usage: confluent <command> [<subcommand>] [<parameters>]
 
These are the available commands:
 
    acl         Specify acl for a service.
    config      Configure a connector.
    current     Get the path of the data and logs of the services managed by the current confluent run.
    destroy     Delete the data and logs of the current confluent run.
    list        List available services.
    load        Load a connector.
    log         Read or tail the log of a service.
    start       Start all services or a specific service along with its dependencies
    status      Get the status of all services or the status of a specific service along with its dependencies.
    stop        Stop all services or a specific service along with the services depending on it.
    top         Track resource usage of a service.
    unload      Unload a connector.
 
'confluent help' lists available commands. See 'confluent help <command>' to read about a
specific command.

confluent platform 服务端口表

Component Default Port
Apache Kafka brokers (plain text) 9092
Confluent Control Center 9021
Kafka Connect REST API 8083
KSQL Server REST API 8088
REST Proxy 8082
Schema Registry REST API 8081
ZooKeeper 2181

相关文章

  • 构建基于kafka(confluent)双活数据中心弹性F5 Bigdata Engine 架构-ELK_1.0
  • F5 DNS可视化让DNS运维更安全更高效-F5 ELK可视化方案系列(3)
  • F5利用Elastic stack(ELK)进行应用数据挖掘系列(2)-DNS
  • F5利用Elastic stack(ELK)进行应用数据挖掘系列(1)-HTTP
  • ELK stack (Elasticsearch,logstash,Kibana)集群安装步骤备忘
本作品采用 知识共享署名-非商业性使用 4.0 国际许可协议 进行许可
标签: confluent ELK f5 elk kafka
最后更新:2018年06月28日

纳米

linjing.io

打赏 点赞
< 上一篇
下一篇 >

文章评论

razz evil exclaim smile redface biggrin eek confused idea lol mad twisted rolleyes wink cool arrow neutral cry mrgreen drooling persevering
取消回复

这个站点使用 Akismet 来减少垃圾评论。了解你的评论数据如何被处理。

页面AI聊天助手

纳米

linjing.io

☁️迈向Cloud Native ADC ☁️

认证获得:
TOGAF: ID 152743
Kubernetes: CKA #664
Microsoft: MCSE MCDBA
Cisco: CCNP
Juniper: JNCIS
F5:
F5 Certified Solution Expert, Security
F5 Certified Technology Specialist, LTM/GTM/APM/ASM
F5 Certified BIG-IP Administrator
  • 点击查看本博技术要素列表
  • 归档
    分类
    • AI
    • Automation
    • Avi Networks
    • Cisco ACI
    • CISCO资源
    • F5 with ELK
    • F5-Tech tips
    • F5技术
    • Juniper
    • Linux
    • NGINX
    • SDN
    • ServiceMesh
    • WEB编程
    • WINDOWS相关
    • 业界文章
    • 交换机技术
    • 化云为雨/Openstack
    • 协议原理
    • 容器/k8s
    • 我的工作
    • 我的生活
    • 网站技术
    • 路由器技术
    • 项目案例
    标签聚合
    gtm network irule flannel bigip DNS api neutron istio F5 k8s openstack docker nginx envoy
    最近评论
    汤姆 发布于 8 个月前(09月10日) 嗨,楼主,里面的json怎么下载啊,怎么收费啊?
    汤姆 发布于 8 个月前(09月09日) 大佬,kib的页面可以分享下吗?谢谢
    zhangsha 发布于 1 年前(05月12日) 资料发给我下,谢谢纳米同志!!!!lyx895@qq.com
    李成才 发布于 1 年前(01月02日) 麻烦了,谢谢大佬
    纳米 发布于 1 年前(01月02日) 你好。是的,因为以前下载系统插件在一次升级后将所有的下载生成信息全弄丢了。所以不少文件无法下载。DN...
    浏览次数
    • Downloads - 183,756 views
    • 联系我 - 118,966 views
    • 迄今为止最全最深入的BIGIP-DNS/GTM原理及培训资料 - 116,489 views
    • Github - 103,645 views
    • F5常见log日志解释 - 79,768 views
    • 从传统ADC迈向CLOUD NATIVE ADC - 下载 - 74,619 views
    • Sniffer Pro 4 70 530抓包软件 中文版+视频教程 - 74,320 views
    • 迄今为止最全最深入的BIGIP-DNS/GTM原理及培训资料 - 67,770 views
    • 关于本站 - 60,888 views
    • 这篇文档您是否感兴趣 - 55,491 views
    链接表
    • F5SE创新
    • Jimmy Song‘s Blog
    • SDNlab
    • Service Mesh社区
    • 三斗室
    • 个人profile
    • 云原生社区

    COPYRIGHT © 2023 Cloud Native 应用交付. ALL RIGHTS RESERVED.

    Theme Kratos Made By Seaton Jiang

    京ICP备14048088号-1

    京公网安备 11010502041506号