# Copyright © 2018 AT&T, Amdocs, Bell Canada Intellectual Property. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. input { http_poller { urls => { event_queue => { method => get url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${event_topic}" } type => "dmaap_event" } notification_queue => { method => get url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${notification_topic}" } type => "dmaap_notification" } request_queue => { method => get url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000" headers => { Accept => "application/json" } add_field => { "topic" => "${request_topic}" } type => "dmaap_request" } } socket_timeout => 30 request_timeout => 30 interval => 60 codec => "plain" } } filter { if [type] != "dmaap_log" { # avoid noise if no entry in the list if [message] == "[]" { drop { } } # parse json, split the list into multiple events, and parse each event json { source => "[message]" target => "message" } split { field => "message" add_field => { "type" => "%{type}" "topic" => "%{topic}" } } json { source => "message" } mutate { remove_field => [ "message" ] } } # express timestamps in milliseconds instead of microseconds ruby { code => " if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000) else event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10)) end " } date { match => [ "closedLoopAlarmStart", UNIX_MS ] target => "closedLoopAlarmStart" } if [closedLoopAlarmEnd] { ruby { code => " if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000) else event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10)) end " } date { match => [ "closedLoopAlarmEnd", UNIX_MS ] target => "closedLoopAlarmEnd" } } #"yyyy-MM-dd HH:mm:ss" if [notificationTime] { mutate { gsub => [ "notificationTime", " ", "T" ] } date { match => [ "notificationTime", ISO8601 ] target => "notificationTime" } } } output { stdout { codec => rubydebug } if [http_request_failure] { elasticsearch { codec => "json" hosts => ["${elasticsearch_base_url}"] index => "errors-%{+YYYY.MM.DD}" doc_as_upsert => true } } else { elasticsearch { codec => "json" hosts => ["${elasticsearch_base_url}"] index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes doc_as_upsert => true } } }