// ELK Stack

Building an Enterprise SIEM with ELK Stack: Elasticsearch, Logstash, and Kibana

Why ELK Stack for SIEM?

The ELK Stack (Elasticsearch, Logstash, Kibana) combined with Beats data shippers forms a powerful, scalable SIEM platform. At PostEx, we built our primary SIEM on ELK Stack + Wazuh before migrating to Splunk Enterprise, running it in production for over two years covering 500+ endpoints.

The ELK-based SIEM provided rich search capabilities, beautiful dashboards, and deep integration with Wazuh for HIDS and FIM. If you have the engineering resources to deploy and maintain it, the ELK Stack delivers enterprise-grade SIEM capabilities at zero licensing cost.

ELK SIEM Architecture

Data Sources
    |
    v
Beats Agents (Winlogbeat, Auditbeat, Packetbeat, Filebeat)
    |
    v
Logstash (Parsing, Enrichment, Routing)
    |
    v
Elasticsearch Cluster (Storage + Indexing)
    |
    v
Kibana + Elastic Security (Dashboards + Detection)

Elasticsearch Cluster Setup

# elasticsearch.yml for production node
cluster.name: siem-cluster
node.name: es-node-01
node.roles: [master, data, ingest]
network.host: 192.168.1.10
http.port: 9200
transport.port: 9300

# Cluster settings
discovery.seed_hosts: ["192.168.1.10", "192.168.1.11", "192.168.1.12"]
cluster.initial_master_nodes: ["es-node-01", "es-node-02", "es-node-03"]

# Security
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true

# Performance
bootstrap.memory_lock: true  # Prevent swapping - critical for performance

Logstash Pipeline Configuration

# /etc/logstash/conf.d/siem-pipeline.conf
input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
  }
  tcp {
    port => 5000
    codec => json
    tags => ["json_input"]
  }
  udp {
    port => 514
    codec => plain
    tags => ["syslog"]
  }
}

filter {
  if [agent][type] == "winlogbeat" {
    mutate { add_field => { "data_type" => "windows_event" }}
    # Normalize EventCode field
    mutate { rename => { "event.code" => "EventCode" }}
  }
  # GeoIP enrichment for external IPs
  if [source][ip] {
    geoip {
      source => "[source][ip]"
      target => "[source][geo]"
    }
  }
  # Threat intel enrichment
  translate {
    field => "[source][ip]"
    destination => "threat_intel"
    dictionary_path => "/etc/logstash/threat-intel/known-bad-ips.yml"
  }
}

output {
  if "syslog" in [tags] {
    elasticsearch { index => "syslog-%{+YYYY.MM.dd}" }
  } else {
    elasticsearch {
      hosts => ["https://192.168.1.10:9200", "https://192.168.1.11:9200"]
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
      ssl => true
      cacert => "/etc/logstash/certs/ca.crt"
    }
  }
}

Winlogbeat for Windows Event Collection

# winlogbeat.yml
winlogbeat.event_logs:
  - name: Security
    event_id: 4624, 4625, 4648, 4672, 4688, 4697, 4698, 4702, 4720, 4724
    level: critical, error, warning, information
  - name: System
    level: critical, error
  - name: Microsoft-Windows-Sysmon/Operational
    # Sysmon provides far richer process telemetry than native Windows Events

output.logstash:
  hosts: ["logstash.internal:5044"]
  ssl.certificate_authorities: ["/etc/beats/certs/ca.crt"]

Elastic Security Detection Rules

Elastic Security ships with 600+ pre-built detection rules. We supplemented these with custom rules for our environment:

// Custom Elastic Security rule: Scheduled Task via schtasks.exe
{
  "name": "Persistence via Scheduled Task",
  "type": "eql",
  "language": "eql",
  "query": "process where event.type == 'start' and process.name == 'schtasks.exe' and process.args : ('/create', '-create') and not process.parent.name : ('services.exe', 'svchost.exe')",
  "severity": "medium",
  "risk_score": 47,
  "threat": [{
    "framework": "MITRE ATT&CK",
    "technique": [{ "id": "T1053", "name": "Scheduled Task/Job", "subtechnique": [{"id": "T1053.005", "name": "Scheduled Task"}]}],
    "tactic": { "id": "TA0003", "name": "Persistence"}
  }]
}