Samba (Linux) + Syslog + Logstash + Elasticsearch 转发smb日志
Published in:2025-03-10 |
Words: 1.8k | Reading time: 9min | reading:

方案简介

Elasticsearch 集群部署: 为了高可用性和可扩展性,可以将 Elasticsearch 部署为集群。

Logstash 管道优化: 根据日志量和复杂度,调整 Logstash 的 worker 数量、批处理大小等参数。

索引生命周期管理 (ILM): 使用 ILM 策略自动管理 Elasticsearch 索引(例如,定期滚动、删除旧索引)。

安全性: 启用 Elasticsearch 和 Kibana 的安全认证(用户名/密码、TLS/SSL)。
监控: 监控 ELK Stack 的性能和健康状态. 可以使用 Elastic Stack Monitoring 功能, 或者 Prometheus + Grafana.

安装使用

  • 安装 Docker 和 Docker Compose:

    • Ubuntu:

      1
      2
      3
      4
      5
      6
      sudo apt update
      sudo apt install apt-transport-https ca-certificates curl software-properties-common
      curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg
      echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null
      sudo apt update
      sudo apt install docker-ce docker-ce-cli containerd.io docker-compose-plugin
    • CentOS/RHEL:

      1
      2
      3
      sudo yum install -y yum-utils
      sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
      sudo yum install docker-ce docker-ce-cli containerd.io docker-compose-plugin
    • 验证安装:

      1
      2
      docker --version
      docker compose version
  • ** 增加 Docker 用户到 docker 组:** 避免每次都使用 sudo.

    1
    2
    sudo usermod -aG docker $USER
    newgrp docker # 或者注销并重新登录
  • 创建工作目录:

    1
    2
    mkdir elk-smb
    cd elk-smb

2. 创建 Docker Compose 文件 (docker-compose.yml):

elk-smb 目录下创建 docker-compose.yml 文件,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
version: '3.8'

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.10.4 # 使用指定版本
container_name: elasticsearch
environment:
- discovery.type=single-node # 单节点模式
- ES_JAVA_OPTS=-Xms1g -Xmx1g # 设置 JVM 内存 (根据需要调整)
- xpack.security.enabled=false #禁用xpack
volumes:
- esdata:/usr/share/elasticsearch/data # 数据持久化
ports:
- "9200:9200"
- "9300:9300"
networks:
- elk

logstash:
image: docker.elastic.co/logstash/logstash:8.10.4
container_name: logstash
volumes:
- ./logstash/pipeline/:/usr/share/logstash/pipeline/ # 挂载 Logstash 管道配置
ports:
- "5044:5044" # Beats input
- "514:514/udp" # Syslog input (UDP)
- "514:514/tcp" # Syslog input (TCP)
environment:
LS_JAVA_OPTS: "-Xmx256m -Xms256m"
networks:
- elk
depends_on:
- elasticsearch

kibana:
image: docker.elastic.co/kibana/kibana:8.10.4
container_name: kibana
ports:
- "5601:5601"
environment:
ELASTICSEARCH_URL: "http://elasticsearch:9200" # 通过服务名访问 Elasticsearch
#ELASTICSEARCH_HOSTS: "http://elasticsearch:9200" #8.x 之后使用 ELASTICSEARCH_URL
# 如果启用了 Elasticsearch 安全特性,需要配置用户名和密码
#ELASTICSEARCH_USERNAME: "elastic"
#ELASTICSEARCH_PASSWORD: "changeme"
networks:
- elk
depends_on:
- elasticsearch

volumes:
esdata: # 定义 Elasticsearch 数据卷

networks:
elk: # 定义网络

3. 创建 Logstash 管道配置:

elk-smb 目录下创建 logstash/pipeline 目录,并在其中创建 Logstash 管道配置文件 smb-pipeline.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
# logstash/pipeline/smb-pipeline.conf

input {
# 从 Beats 接收数据 (例如 Winlogbeat)
beats {
port => 5044
type => "windows-eventlog"
}

# 从 Syslog 接收数据 (例如 Samba)
syslog {
port => 514
type => "samba-syslog"
}
}

filter {
# Windows 事件日志处理 (示例)
if [type] == "windows-eventlog" {
if [event][code] == 5140 {
xml {
source => "[event][provider_data][data]"
target => "event_data"
xpath => [
"/Event/EventData/Data[@Name='SubjectUserSid']/text()", "SubjectUserSid",
"/Event/EventData/Data[@Name='SubjectUserName']/text()", "SubjectUserName",
"/Event/EventData/Data[@Name='SubjectDomainName']/text()", "SubjectDomainName",
"/Event/EventData/Data[@Name='SubjectLogonId']/text()", "SubjectLogonId",
"/Event/EventData/Data[@Name='ShareName']/text()", "ShareName",
"/Event/EventData/Data[@Name='ShareLocalPath']/text()", "ShareLocalPath",
"/Event/EventData/Data[@Name='ClientAddress']/text()", "[client][ip]", # 客户端 IP
"/Event/EventData/Data[@Name='ClientName']/text()","ClientName"
]
remove_field => ["[event][provider_data][data]"]
}

# 提取客户端 IP
mutate {
rename => { "[event_data][ClientAddress]" => "[client][ip]" }
}
}
# ... 其他 Windows 事件日志处理 ...
}

# Samba 日志处理 (示例)
if [type] == "samba-syslog" {
grok {
match => { "message" => [
# 匹配常见的 Samba 日志格式 (根据实际情况调整)
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} smbd\[%{POSINT:pid}\]: \[%{YEAR:year}/%{MONTHNUM:month}/%{MONTHDAY:day} %{TIME:time}\] %{WORD:smb_client} \(%{IPORHOST:client_ip}\) %{WORD:smb_action} %{GREEDYDATA:smb_path}",
# 另一种常见的格式
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} smbd_audit: %{USER:username}\|%{IP:client_ip}\|%{HOSTNAME:client_hostname}\|%{GREEDYDATA:share_name}\|%{WORD:operation}\|%{GREEDYDATA:file_path}",
# 更多匹配规则...
"%{GREEDYDATA:message}" # 最后的匹配规则捕获所有内容
]
}
}

mutate {
add_field => { "[client][ip]" => "%{client_ip}" }
remove_field => [ "client_ip" ]
}

date {
match => [ "syslog_timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601" ]
target => "@timestamp"
remove_field => ["syslog_timestamp"]
}
}
}

output {
elasticsearch {
hosts => ["http://elasticsearch:9200"] # 使用服务名访问 Elasticsearch
index => "smb-audit-%{+YYYY.MM.dd}"
#user => "elastic" # 如果启用了安全认证
#password => "changeme"
}
stdout { codec => rubydebug } # 用于调试
}

说明:

  • 这个配置文件同时处理来自 Beats (Windows Event Log) 和 Syslog (Samba) 的输入.
  • Windows 部分, 使用 xml 插件解析事件日志的 XML 数据.
  • Samba 部分, 使用 grok 插件解析日志. 你需要根据实际的 Samba 日志格式调整 grok 表达式.
  • mutate 用于重命名字段, 添加字段等.
  • date 用于解析时间戳.
  • 代理服务器查询
1
2
3
4
5
6
7
systemctl show --property=Environment docker
sudo systemctl daemon-reload
sudo systemctl restart docker
# docker config 文件目录
/etc/systemd/system/docker.service.d/
/etc/default/docker
/etc/sysconfig/docker

4. 启动 ELK Stack:

elk-smb 目录下,运行以下命令:

1
2
# docker compose up -d
sudo docker-compose up -d

这将启动 Elasticsearch、Logstash 和 Kibana 容器。-d 选项表示在后台运行。

5. 验证安装:

  • Elasticsearch:

    1
    curl http://localhost:9200
  • 返回结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
"name" : "800c9626c439",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "2MXalfBNSGGwktsjV8R-eQ",
"version" : {
"number" : "8.10.4",
"build_flavor" : "default",
"build_type" : "docker",
"build_hash" : "b4a62ac808e886ff032700c391f45f1408b2538c",
"build_date" : "2023-10-11T22:04:35.506990650Z",
"build_snapshot" : false,
"lucene_version" : "9.7.0",
"minimum_wire_compatibility_version" : "7.17.0",
"minimum_index_compatibility_version" : "7.0.0"
},
"tagline" : "You Know, for Search"
}
应该会看到 JSON 响应,表明 Elasticsearch 正在运行。
  • Kibana:

    在浏览器中访问 http://localhost:5601,应该能看到 Kibana 的界面。

  • Logstash:
    Logstash 启动后会监听 5044 (Beats) 和 514 (Syslog) 端口. 可以通过查看 Logstash 的日志来确认:

    1
    docker logs logstash

6. 配置数据源 (Samba 和 Windows):

  • Samba (Linux):

    1. 配置 Samba (/etc/samba/smb.conf) 以将日志发送到 Syslog(如前所述):
      1
      2
      3
      4
      [global]
      log level = 3
      syslog only = yes
      syslog = 3
    2. 配置 rsyslog 或 syslog-ng 将 Samba 的日志转发到 Logstash 容器的 IP 地址和端口 514。 (如果 Logstash 和 Samba 在同一台机器, 可以用 127.0.0.1, 否则用 Logstash 所在机器的 IP). rsyslog 配置示例 (/etc/rsyslog.conf/etc/rsyslog.d/50-default.conf):
      1
      local7.* @<Logstash 容器 IP>:514  # UDP
    3. 重启 Samba 和 rsyslog:
      1
      2
      sudo systemctl restart smbd
      sudo systemctl restart rsyslog
  • Windows Server:

    1. 配置 Windows 审核策略(如前所述)。
    2. 安装和配置 Winlogbeat(如前所述),将 output.logstash.hosts 设置为 Logstash 容器的 IP 地址和端口 5044。 Winlogbeat 配置示例 (winlogbeat.yml):
    1
    2
    output.logstash:
    hosts: ["<Logstash 容器 IP>:5044"]
    • 获取 Logstash 容器的 IP:
      1
      docker inspect -f '{{range.NetworkSettings.Networks}}{{.IPAddress}}{{end}}' logstash
    1. 启动 Winlogbeat 服务。

7. 在 Kibana 中查看日志:

  1. 在 Kibana 中创建 Index Pattern (索引模式):

    • 首次访问 Kibana 时,它会提示您创建 Index Pattern。
    • 在 Management -> Stack Management -> Index Patterns 中创建。
    • Index Pattern 名称:smb-audit-* (与 Logstash 配置中的 index 匹配)。
    • 时间筛选字段:@timestamp
  2. 在 Discover 中查看日志:

    • 选择 smb-audit-* 索引模式。
    • 设置时间范围。
    • 您应该能看到来自 Samba 和 Windows 的 SMB 访问日志。

8. 停止和删除容器:

1
docker compose down

about me 个人微信

img

wechat offical 微信公众号

img

Prev:
rsync_nas_backup
Next:
python_spider_introduce