input { http_poller { urls => { event_queue => { method => get url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${event_topic}" } type => "dmaap_event" } notification_queue => { method => get url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${notification_topic}" } type => "dmaap_notification" } request_queue => { method => get url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${request_topic}" } type => "dmaap_request" } } socket_timeout => 30 request_timeout => 30 interval => 60 codec => "plain" } } input { file { path => [ "/log-input/dmaap_evt.log" ] type => "dmaap_log" codec => "json" } } filter { # parse json, split the list into multiple events, and parse each event if [type] != "dmaap_log" { # avoid noise if no entry in the list if [message] == "[]" { drop { } } json { source => "[message]" target => "message" } # ruby { # code => "event.get('message').each{|m| m.set('type',event.get('type')}" # } split { field => "message" add_field => { "type" => "%{type}" "topic" => "%{topic}" } } json { source => "message" } mutate { remove_field => [ "message" ] } } # express timestamps in milliseconds instead of microseconds if [closedLoopAlarmStart] { ruby { code => " if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000) else event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10)) end " } date { match => [ "closedLoopAlarmStart", UNIX_MS ] target => "closedLoopAlarmStart" } } if [closedLoopAlarmEnd] { ruby { code => " if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000) else event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10)) end " } date { match => [ "closedLoopAlarmEnd", UNIX_MS ] target => "closedLoopAlarmEnd" } } #"yyyy-MM-dd HH:mm:ss" if [notificationTime] { mutate { gsub => [ "notificationTime", " ", "T" ] } date { match => [ "notificationTime", ISO8601 ] target => "notificationTime" } } } output { stdout { codec => rubydebug } if [http_request_failure] { elasticsearch { codec => "json" hosts => [elasticsearch] index => "errors-%{+YYYY.MM.DD}" doc_as_upsert => true } } else { elasticsearch { codec => "json" hosts => [elasticsearch] index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes doc_as_upsert => true } } }