zabbix与ELK结合监控securiyt日志

1、安装logstash和logstash-output-zabbix

#配置yum仓库
cat << EOF | sudo tee /etc/yum.repos.d/elasticsearch.repo
[elasticsearch-7.x]
name=Elasticsearch repository for 7.x packages
baseurl=https://artifacts.elastic.co/packages/7.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
 autorefresh=1
type=rpm-md
EOF
#安装logstash
sudo yum install logstash
systemctl start logstash.service 
#安装插件logstash-output-zabbix
cd /usr/share/logstash/bin/
./logstash-plugin logstash-output-zabbix

2、logstash配置,这里配置两个监测状态,一个是登录失败告警,一个是登录成功告信息,失败告警会有触发器联动,zabbix_host必须与zabbix平台中的主机命名保持一致,否则无法接收到消息

input {
        file {
        path => ["/var/log/secure"]
        type => "system"
        start_position => "beginning"
        }
}
#input部分是从/var/log/secure文件中读取数据,start_position 表示从secure文件开头读取内容。
filter {
    grok {
             match => { "message" => "%{SYSLOGTIMESTAMP:message_timestamp} %{SYSLOGHOST:hostname} %{DATA:message_program}(?:\[%{POSINT:message_pid}\])?: %{GREEDYDATA:message_content}" }        
             #这里通过grok对message字段的数据进行字段划分,这里将message字段划分了5个子字段。其中,message_content字段会在output中用到。
        }

      mutate {
             add_field => [ "[zabbix_key01]", "oslogs.failed" ]
             add_field => [ "[zabbix_key02]", "oslogs.login" ]
	     #新增的字段,字段名(key)是zabbix_key,值为oslogs。
             add_field => [ "[zabbix_host]", "%{host}" ]   
             #新增的字段,字段名(key)是zabbix_host,值可以在这里直接定义,也可以引用字段变量来获取。这里的%{host}获取的就是日志数据的主机名,这个主机名与zabbix web中“主机名称”需要保持一致。
         }
        mutate {        
            #这里是删除不需要的字段
            remove_field => "@version"
            remove_field => "message"
        }
        date {      
                #这里是对日志输出中的日期字段进行转换,其中message_timestamp字段是默认输出的时间日期字段,将这个字段的值传给 @timestamp字段。
                match => [ "message_timestamp","MMM  d HH:mm:ss", "MMM dd HH:mm:ss", "ISO8601"]
        }
}
output {
        if [message_content]  =~ /(Connection|Accepted)/  {      
        #定义在message_content字段中,需要过滤的关键字信息,也就是在message_content字段中出现给出的这些关键字,那么就将这些信息发送给zabbix。
              zabbix {
                        zabbix_host => "[zabbix_host]"      
                        #这个zabbix_host将获取上面filter部分定义的字段变量%{host}的值
                        zabbix_key => "[zabbix_key02]"        #这个zabbix_key将获取上面filter部分中给出的值
                        zabbix_server_host => "172.30.64.64"  #这是指定zabbix server的IP地址
                        zabbix_server_port => "10051"           #这是指定zabbix server的监听端口
                        zabbix_value => "message_content"              #这个很重要,指定要传给zabbix监控项item(oslogs)的值, zabbix_value默认的值是"message"字段,因为上面我们已经删除了"message"字段,因此,这里需要重新指定,根据上面filter部分对"message"字段的内容划分,这里指定为"message_content"字段,其实,"message_content"字段输出的就是服务器上具体的日志内容。
                        }
                    }
        if [message_content]  =~ /(ERR|error|ERROR|Failed)/  {      
        #定义在message_content字段中,需要过滤的关键字信息,也就是在message_content字段中出现给出的这些关键字,那么就将这些信息发送给zabbix。
              zabbix {
                        zabbix_host => "[zabbix_host]"      
                        #这个zabbix_host将获取上面filter部分定义的字段变量%{host}的值
                        zabbix_key => "[zabbix_key01]"        #这个zabbix_key将获取上面filter部分中给出的值
                        zabbix_server_host => "172.30.64.64"  #这是指定zabbix server的IP地址
                        zabbix_server_port => "10051"           #这是指定zabbix server的监听端口
                        zabbix_value => "message_content"              #这个很重要,指定要传给zabbix监控项item(oslogs)的值, zabbix_value默认的值是"message"字段,因为上面我们已经删除了"message"字段,因此,这里需要重新指定,根据上面filter部分对"message"字段的内容划分,这里指定为"message_content"字段,其实,"message_content"字段输出的就是服务器上具体的日志内容。
                        }
                    }
              stdout { codec => rubydebug }   
              #这里是开启调试模式,当第一次配置的时候,建议开启,这样过滤后的日志信息直接输出的屏幕,方便进行调试,调试成功后,即可关闭。
}

3、配置zabbix,创建模板

创建模板群组

创建模板并关联模板群组

创建登录失败监控项,并关联oslogs.failed

创建登录成功监控项,关联oslogs.login

创建登录失败触发器

主机关联模板

观察日志信息