6 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
8 Accept => "application/json"
10 add_field => { "topic" => "${event_topic}" }
13 notification_queue => {
15 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
17 Accept => "application/json"
19 add_field => { "topic" => "${notification_topic}" }
20 type => "dmaap_notification"
24 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
26 Accept => "application/json"
28 add_field => { "topic" => "${request_topic}" }
29 type => "dmaap_request"
42 "/log-input/dmaap_evt.log"
51 # parse json, split the list into multiple events, and parse each event
52 if [type] != "dmaap_log" {
53 # avoid noise if no entry in the list
54 if [message] == "[]" {
63 # code => "event.get('message').each{|m| m.set('type',event.get('type')}"
77 mutate { remove_field => [ "message" ] }
80 # express timestamps in milliseconds instead of microseconds
81 if [closedLoopAlarmStart] {
84 if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
85 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
87 event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
92 match => [ "closedLoopAlarmStart", UNIX_MS ]
93 target => "closedLoopAlarmStart"
97 if [closedLoopAlarmEnd] {
100 if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999
101 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
103 event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
108 match => [ "closedLoopAlarmEnd", UNIX_MS ]
109 target => "closedLoopAlarmEnd"
113 #"yyyy-MM-dd HH:mm:ss"
114 if [notificationTime] {
117 "notificationTime", " ", "T"
121 match => [ "notificationTime", ISO8601 ]
122 target => "notificationTime"
131 if [http_request_failure] {
134 hosts => [elasticsearch]
135 index => "errors-%{+YYYY.MM.DD}"
136 doc_as_upsert => true
141 hosts => [elasticsearch]
142 index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
143 doc_as_upsert => true