input { http_poller { urls => { event_queue => { method => get url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${event_topic}" } } notification_queue => { method => get url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${notification_topic}" } } request_queue => { method => get url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${request_topic}" } } } socket_timeout => 30 request_timeout => 30 interval => 60 codec => "plain" } } filter { # avoid noise if no entry in the list if [message] == "[]" { drop { } } # parse json, split the list into multiple events, and parse each event json { source => "[message]" target => "message" } split { field => "message" } json { source => "message" } mutate { remove_field => [ "message" ] } # express timestamps in milliseconds instead of microseconds ruby { code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))" } date { match => [ "closedLoopAlarmStart", UNIX_MS ] target => "closedLoopAlarmStart" } if [closedLoopAlarmEnd] { ruby { code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))" } date { match => [ "closedLoopAlarmEnd", UNIX_MS ] target => "closedLoopAlarmEnd" } } #"yyyy-MM-dd HH:mm:ss" if [notificationTime] { mutate { gsub => [ "notificationTime", " ", "T" ] } date { match => [ "notificationTime", ISO8601 ] target => "notificationTime" } } } output { stdout { codec => rubydebug } if [http_request_failure] { elasticsearch { codec => "json" hosts => ["${elasticsearch_base_url}"] index => "errors-%{+YYYY.MM.DD}" doc_as_upsert => true } } else { elasticsearch { codec => "json" hosts => ["${elasticsearch_base_url}"] index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes doc_as_upsert => true } } }