Administrator
发布于 2025-11-16 / 7 阅读
0
0

vector基于K8S的大规模日志采集最佳实践

整体架构

Vector日志的采集配置

apiVersion: v1
kind: ConfigMap
metadata:
  name: vector-config
  namespace: kubedoor
data:
  vector.yaml: |
    # 数据目录
    data_dir: "/var/lib/vector"
    
    # API
    api:
      enabled: true
      address: "0.0.0.0:8686"
    
    # 使用官方推荐的kubernetes_logs源
    sources:
      java_k8s_logs:
        type: "kubernetes_logs"
        # 官方推荐:自动处理多行日志合并
        auto_partial_merge: true
        # 只收集Java应用,通过标签过滤
        extra_label_selector: "app/component=java"
      ng_k8s_access:
        type: "kubernetes_logs"
        # 只收集ingress应用,通过标签过滤
        extra_label_selector: "app.kubernetes.io/instance=ingress-nginx"
    
    # 1、只过滤健康检查
    transforms:
      java_filter_logs:
        type: filter
        inputs:
          - java_k8s_logs
        condition: |
          # 只排除明显的健康检查
          !contains(string!(.message), "/actuator/health") && 
          !contains(string!(.message), "/health")
      # 2、Java多行日志增强处理
      java_merge_multiline_logs:
        type: reduce
        inputs: [java_filter_logs]
        group_by:
          - pod_name
          - container_name
        merge_strategies:
          message: concat_newline
        starts_when:
          type: vrl
          source: |
            match(string!(.message), r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}[.,]\d{3}')
        expire_after_ms: 5000
        flush_period_ms: 1000
      
      # 3、添加VictoriaLogs所需的_msg字段
      java_add_msg_field:
        type: remap
        inputs: [java_merge_multiline_logs]
        source: |
          # 保留message字段并重命名为_msg
          ._msg = .message

          # 用正则匹配日志级别
          match, err = parse_regex(._msg, r'^(?P<ts>\S+\s+\S+)\s+(?P<level>[A-Z]+)\s+.*')
          if err == null {
            .level = match.level
          } else {
            .level = "unknown"
          }

          # 创建新对象只包含需要的字段
          .log_type  = "java"
          . = {
            "_msg": ._msg,
            "level": .level,
            "log_type": .log_type,
            "kubernetes": {
              "pod_namespace": .kubernetes.pod_namespace,
              "pod_name": .kubernetes.pod_name,
              "container_name": .kubernetes.container_name,
              "pod_node_name": .kubernetes.pod_node_name
            }
          }
          .namespace = .kubernetes.pod_namespace
          del(.kubernetes.pod_namespace)
          .pod = .kubernetes.pod_name
          del(.kubernetes.pod_name)
          .service = .kubernetes.container_name
          del(.kubernetes.container_name)
          .node = .kubernetes.pod_node_name
          del(.kubernetes.pod_node_name)

      nginx_format_access:
        drop_on_error: true
        reroute_dropped: true
        type: remap
        inputs:
          - ng_k8s_access
        source: |
          ._msg = .message
          # 解析JSON到临时变量,避免覆盖整个事件
          parsed_json = parse_json!(replace(.message, r'([^\x00-\x7F])', "\\\\$$1") ?? .message)
          if exists(parsed_json.message) {
            parsed_json = parse_json!(replace(parsed_json.message, "\\x", "\\\\x") ?? parsed_json.message)
          }
          # 将解析的字段合并到当前事件,保留_msg
          . = merge!(., parsed_json)
          .createdtime = to_unix_timestamp(now(), unit: "milliseconds")
          .timestamp = to_unix_timestamp(parse_timestamp!(.timestamp , format: "%+"), unit: "milliseconds")
          .url_list = split!(.url, "?", 2)
          .path = .url_list[0]
          .query = .url_list[1]
          .path_list = split!(.path, "/", 3)
          if length(.path_list) > 2 {.top_path = join!(["/", .path_list[1]])} else {.top_path = "/"}
          .upstreamtime = to_float(.upstreamtime) ?? 0
          .duration = round((to_float(.responsetime) ?? 0) - to_float(.upstreamtime),3)
          if .xff == "-" { .xff = .remote_ip }
          .client_ip = split!(.xff, ",", 2)[0]
          .ua = parse_user_agent!(.http_user_agent , mode: "enriched")
          .client_browser_family = .ua.browser.family
          .client_browser_major = .ua.browser.major
          .client_os_family = .ua.os.family
          .client_os_major = .ua.os.major
          .client_device_brand = .ua.device.brand
          .client_device_model = .ua.device.model
          
          # GeoIP 查询必须传字符串表名
          .geoip = get_enrichment_table_record("geoip_table", {"ip": .client_ip}) ?? {"city_name":"unknown","region_name":"unknown","country_name":"unknown"}
          .client_city = .geoip.city_name
          .client_region = .geoip.region_name
          .client_country = .geoip.country_name
          .client_latitude = .geoip.latitude
          .client_longitude = .geoip.longitude
          # 只保留nginx字段
          .log_type  = "nginx"
          . = {
            "_msg": ._msg,
            "log_type": .log_type,
            "timestamp": .timestamp,
            "createdtime": .createdtime,
            "server_ip": .server_ip,
            "remote_ip": .remote_ip,
            "xff": .xff,
            "remote_user": .remote_user,
            "domain": .domain,
            "url": .url,
            "path": .path,
            "query": .query,
            "top_path": .top_path,
            "referer": .referer,
            "upstreamtime": .upstreamtime,
            "responsetime": .responsetime,
            "duration": .duration,
            "request_method": .request_method,
            "status": .status,
            "response_length": .response_length,
            "request_length": .request_length,
            "protocol": .protocol,
            "upstreamhost": .upstreamhost,
            "http_user_agent": .http_user_agent,
            "client_ip": .client_ip,
            "client_browser_family": .client_browser_family,
            "client_browser_major": .client_browser_major,
            "client_os_family": .client_os_family,
            "client_os_major": .client_os_major,
            "client_device_brand": .client_device_brand,
            "client_device_model": .client_device_model,
            "client_city": .client_city,
            "client_region": .client_region,
            "client_country": .client_country,
            "client_latitude": .client_latitude,
            "client_longitude": .client_longitude
          }
    
    # 输出
    sinks:
      kafka_logs:
        type: kafka
        inputs:
          - java_add_msg_field
          - nginx_format_access
        bootstrap_servers: "kafka1:9092,kafka2:9092,kafka3:9092"
        topic: "logs-dev-k8s"
        encoding:
          codec: json
        key_field: "pod"
        batch:
          max_bytes: 102400
          timeout_secs: 5
        buffer:
          type: memory
          max_events: 1000
          when_full: drop_newest
        healthcheck: true
      dropped_console:
        type: console
        inputs: ["nginx_format_access.dropped"]
        encoding:
          codec: json
    enrichment_tables:
      geoip_table:
        path: "/usr/share/GeoIP/GeoLite2-City.mmdb"
        type: geoip
        locale: "zh-CN" #获取到的地域信息使用中文显示,删掉这行默认是英文显示,能解析数据量会比中文多一点点

Vector日志处理与入库配置

data_dir: "/var/lib/vector"

api:
  enabled: true
  address: "0.0.0.0:8687"

sources:
  kafka_logs:
    type: kafka
    bootstrap_servers: "kafka1:9092,kafka2:9092,kafka3:9092"
    group_id: "vector-vl"
    topics:
      - "logs-dev-k8s"
    decoding:
      codec: json

sinks:
  victorialogs:
    type: loki
    inputs:
      - kafka_logs
    endpoint: "http://victorialogs.kubedoor:9428"
    path: /insert/loki/api/v1/push
    labels:
      origin_prometheus: dev-k8s
      env: dev
    encoding:
      codec: json
    batch:
      max_bytes: 102400
      timeout_secs: 5
    buffer:
      type: memory
      max_events: 1000
      when_full: drop_newest
    healthcheck: false
    request:
      timeout_secs: 30


评论