# Logstash Configuration for LogServer # Handles Beats input with API key authentication input { # Beats input for Filebeat clients beats { port => 5044 ssl => false # Set to true for production with proper certificates # API key authentication handled via filter below } # Optional: Syslog input for direct syslog shipping tcp { port => 514 type => "syslog" } udp { port => 514 type => "syslog" } } filter { # API Key validation - check if client provided a valid key # The API key should be in the [fields][api_key] field from Filebeat if [fields][api_key] { # Load and validate API key ruby { init => " require 'yaml' @api_keys = {} # Load API keys from file begin if File.exist?('/usr/share/logstash/config/api-keys.yml') config = YAML.load_file('/usr/share/logstash/config/api-keys.yml') if config && config['api_keys'] config['api_keys'].each do |hostname, key| @api_keys[key.to_s.strip] = hostname.to_s.strip end end end rescue => e @logger.error('Failed to load API keys', :error => e.message) end " code => " api_key = event.get('[fields][api_key]') if api_key && @api_keys.has_key?(api_key) # Valid API key - add hostname to event event.set('[@metadata][client_hostname]', @api_keys[api_key]) event.set('[@metadata][authenticated]', true) else # Invalid API key event.set('[@metadata][authenticated]', false) event.tag('_authfailure') end " } # Drop unauthorized events if "_authfailure" in [tags] { drop { } } } else { # No API key provided - mark as unauthenticated # You can choose to drop these or allow them based on your security requirements mutate { add_tag => [ "no_api_key" ] } # Uncomment to require API keys for all connections: # drop { } } # Parse Docker logs if [docker] { # Docker metadata is already parsed by Filebeat mutate { add_field => { "container_name" => "%{[docker][container][name]}" "container_id" => "%{[docker][container][id]}" "container_image" => "%{[docker][container][image]}" } } } # Parse syslog if [type] == "syslog" { grok { match => { "message" => "%{SYSLOGLINE}" } } date { match => [ "timestamp", "MMM d HH:mm:ss", "MMM dd HH:mm:ss" ] } } # Parse JSON logs if they exist if [message] =~ /^\{.*\}$/ { json { source => "message" target => "json_message" } } # Add timestamp if not present if ![timestamp] { mutate { add_field => { "timestamp" => "%{@timestamp}" } } } # Clean up metadata mutate { remove_field => [ "@version", "beat", "offset", "prospector" ] } } output { # Send to Elasticsearch with authentication elasticsearch { hosts => ["elasticsearch:9200"] user => "elastic" password => "${ELASTIC_PASSWORD:changeme}" # Use different indices based on input type index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}" # Manage index templates manage_template => true template_overwrite => true } # Optional: Debug output (comment out in production) # stdout { # codec => rubydebug # } }