整体架构
Vector日志的采集配置
apiVersion: v1
kind: ConfigMap
metadata:
name: vector-config
namespace: kubedoor
data:
vector.yaml: |
# 数据目录
data_dir: "/var/lib/vector"
# API
api:
enabled: true
address: "0.0.0.0:8686"
# 使用官方推荐的kubernetes_logs源
sources:
java_k8s_logs:
type: "kubernetes_logs"
# 官方推荐:自动处理多行日志合并
auto_partial_merge: true
# 只收集Java应用,通过标签过滤
extra_label_selector: "app/component=java"
ng_k8s_access:
type: "kubernetes_logs"
# 只收集ingress应用,通过标签过滤
extra_label_selector: "app.kubernetes.io/instance=ingress-nginx"
# 1、只过滤健康检查
transforms:
java_filter_logs:
type: filter
inputs:
- java_k8s_logs
condition: |
# 只排除明显的健康检查
!contains(string!(.message), "/actuator/health") &&
!contains(string!(.message), "/health")
# 2、Java多行日志增强处理
java_merge_multiline_logs:
type: reduce
inputs: [java_filter_logs]
group_by:
- pod_name
- container_name
merge_strategies:
message: concat_newline
starts_when:
type: vrl
source: |
match(string!(.message), r'^\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}[.,]\d{3}')
expire_after_ms: 5000
flush_period_ms: 1000
# 3、添加VictoriaLogs所需的_msg字段
java_add_msg_field:
type: remap
inputs: [java_merge_multiline_logs]
source: |
# 保留message字段并重命名为_msg
._msg = .message
# 用正则匹配日志级别
match, err = parse_regex(._msg, r'^(?P<ts>\S+\s+\S+)\s+(?P<level>[A-Z]+)\s+.*')
if err == null {
.level = match.level
} else {
.level = "unknown"
}
# 创建新对象只包含需要的字段
.log_type = "java"
. = {
"_msg": ._msg,
"level": .level,
"log_type": .log_type,
"kubernetes": {
"pod_namespace": .kubernetes.pod_namespace,
"pod_name": .kubernetes.pod_name,
"container_name": .kubernetes.container_name,
"pod_node_name": .kubernetes.pod_node_name
}
}
.namespace = .kubernetes.pod_namespace
del(.kubernetes.pod_namespace)
.pod = .kubernetes.pod_name
del(.kubernetes.pod_name)
.service = .kubernetes.container_name
del(.kubernetes.container_name)
.node = .kubernetes.pod_node_name
del(.kubernetes.pod_node_name)
nginx_format_access:
drop_on_error: true
reroute_dropped: true
type: remap
inputs:
- ng_k8s_access
source: |
._msg = .message
# 解析JSON到临时变量,避免覆盖整个事件
parsed_json = parse_json!(replace(.message, r'([^\x00-\x7F])', "\\\\$$1") ?? .message)
if exists(parsed_json.message) {
parsed_json = parse_json!(replace(parsed_json.message, "\\x", "\\\\x") ?? parsed_json.message)
}
# 将解析的字段合并到当前事件,保留_msg
. = merge!(., parsed_json)
.createdtime = to_unix_timestamp(now(), unit: "milliseconds")
.timestamp = to_unix_timestamp(parse_timestamp!(.timestamp , format: "%+"), unit: "milliseconds")
.url_list = split!(.url, "?", 2)
.path = .url_list[0]
.query = .url_list[1]
.path_list = split!(.path, "/", 3)
if length(.path_list) > 2 {.top_path = join!(["/", .path_list[1]])} else {.top_path = "/"}
.upstreamtime = to_float(.upstreamtime) ?? 0
.duration = round((to_float(.responsetime) ?? 0) - to_float(.upstreamtime),3)
if .xff == "-" { .xff = .remote_ip }
.client_ip = split!(.xff, ",", 2)[0]
.ua = parse_user_agent!(.http_user_agent , mode: "enriched")
.client_browser_family = .ua.browser.family
.client_browser_major = .ua.browser.major
.client_os_family = .ua.os.family
.client_os_major = .ua.os.major
.client_device_brand = .ua.device.brand
.client_device_model = .ua.device.model
# GeoIP 查询必须传字符串表名
.geoip = get_enrichment_table_record("geoip_table", {"ip": .client_ip}) ?? {"city_name":"unknown","region_name":"unknown","country_name":"unknown"}
.client_city = .geoip.city_name
.client_region = .geoip.region_name
.client_country = .geoip.country_name
.client_latitude = .geoip.latitude
.client_longitude = .geoip.longitude
# 只保留nginx字段
.log_type = "nginx"
. = {
"_msg": ._msg,
"log_type": .log_type,
"timestamp": .timestamp,
"createdtime": .createdtime,
"server_ip": .server_ip,
"remote_ip": .remote_ip,
"xff": .xff,
"remote_user": .remote_user,
"domain": .domain,
"url": .url,
"path": .path,
"query": .query,
"top_path": .top_path,
"referer": .referer,
"upstreamtime": .upstreamtime,
"responsetime": .responsetime,
"duration": .duration,
"request_method": .request_method,
"status": .status,
"response_length": .response_length,
"request_length": .request_length,
"protocol": .protocol,
"upstreamhost": .upstreamhost,
"http_user_agent": .http_user_agent,
"client_ip": .client_ip,
"client_browser_family": .client_browser_family,
"client_browser_major": .client_browser_major,
"client_os_family": .client_os_family,
"client_os_major": .client_os_major,
"client_device_brand": .client_device_brand,
"client_device_model": .client_device_model,
"client_city": .client_city,
"client_region": .client_region,
"client_country": .client_country,
"client_latitude": .client_latitude,
"client_longitude": .client_longitude
}
# 输出
sinks:
kafka_logs:
type: kafka
inputs:
- java_add_msg_field
- nginx_format_access
bootstrap_servers: "kafka1:9092,kafka2:9092,kafka3:9092"
topic: "logs-dev-k8s"
encoding:
codec: json
key_field: "pod"
batch:
max_bytes: 102400
timeout_secs: 5
buffer:
type: memory
max_events: 1000
when_full: drop_newest
healthcheck: true
dropped_console:
type: console
inputs: ["nginx_format_access.dropped"]
encoding:
codec: json
enrichment_tables:
geoip_table:
path: "/usr/share/GeoIP/GeoLite2-City.mmdb"
type: geoip
locale: "zh-CN" #获取到的地域信息使用中文显示,删掉这行默认是英文显示,能解析数据量会比中文多一点点
Vector日志处理与入库配置
data_dir: "/var/lib/vector"
api:
enabled: true
address: "0.0.0.0:8687"
sources:
kafka_logs:
type: kafka
bootstrap_servers: "kafka1:9092,kafka2:9092,kafka3:9092"
group_id: "vector-vl"
topics:
- "logs-dev-k8s"
decoding:
codec: json
sinks:
victorialogs:
type: loki
inputs:
- kafka_logs
endpoint: "http://victorialogs.kubedoor:9428"
path: /insert/loki/api/v1/push
labels:
origin_prometheus: dev-k8s
env: dev
encoding:
codec: json
batch:
max_bytes: 102400
timeout_secs: 5
buffer:
type: memory
max_events: 1000
when_full: drop_newest
healthcheck: false
request:
timeout_secs: 30