logstash input
[clamp.git] / extra / docker / elk / logstash-conf / logstash.conf
1 input {
2   http_poller {
3         urls => {
4             event_queue => {
5                 method => get
6                 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
7                 headers => {
8                     Accept => "application/json"
9                 }
10                 add_field => { "topic" => "${event_topic}" }
11                 type => "dmaap_event"
12             }
13             notification_queue => {
14                 method => get
15                 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
16                 headers => {
17                     Accept => "application/json"
18                 }
19                 add_field => { "topic" => "${notification_topic}" }
20                 type => "dmaap_notification"
21             }
22             request_queue => {
23                 method => get
24                 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
25                 headers => {
26                     Accept => "application/json"
27                 }
28                 add_field => { "topic" => "${request_topic}" }
29                 type => "dmaap_request"
30             }
31         }
32         socket_timeout => 30
33         request_timeout => 30
34         interval => 60
35         codec => "plain"
36   }
37 }
38
39 input {
40   file {
41     path => [
42       "/log-input/dmaap_evt.log"
43     ]
44     type => "dmaap_log"
45     codec => "json"
46   }
47 }
48
49 filter {
50
51     # parse json, split  the list into multiple events, and parse each event
52     if [type] != "dmaap_log" {
53             # avoid noise if no entry in the list
54             if [message] == "[]" {
55                drop { }
56             }
57             
58             json {
59                  source => "[message]"
60                  target => "message"
61             }
62 #           ruby {
63 #               code => "event.get('message').each{|m| m.set('type',event.get('type')}"
64 #           }
65             split {
66                   field => "message"
67                   add_field => {
68                         "type" => "%{type}"
69                         "topic" => "%{topic}"
70                   }
71             }
72             
73             json {
74                  source => "message"
75             }
76             
77             mutate { remove_field => [ "message" ] }
78     }
79     
80     # express timestamps in milliseconds instead of microseconds
81     if [closedLoopAlarmStart] {
82         ruby {
83             code => "
84                      if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
85                        event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
86                      else
87                        event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
88                      end
89                     "
90         }
91         date {
92             match => [ "closedLoopAlarmStart", UNIX_MS ]
93             target => "closedLoopAlarmStart"
94         }
95     }
96
97     if [closedLoopAlarmEnd] {
98         ruby {
99             code => "
100                     if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999  
101                       event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
102                     else
103                       event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
104                     end
105                     "
106         }
107         date {
108             match => [ "closedLoopAlarmEnd", UNIX_MS ]
109             target => "closedLoopAlarmEnd"
110         }
111
112     }
113     #"yyyy-MM-dd HH:mm:ss"
114     if [notificationTime] {
115        mutate {
116               gsub => [
117                    "notificationTime", " ", "T"
118               ]
119        }
120        date {
121             match => [ "notificationTime", ISO8601 ]
122             target => "notificationTime"
123        }
124     }
125 }
126 output {
127     stdout {
128         codec => rubydebug
129     }
130
131     if [http_request_failure] {
132         elasticsearch {
133             codec => "json"
134             hosts => [elasticsearch]
135             index => "errors-%{+YYYY.MM.DD}"
136             doc_as_upsert => true
137         }
138     } else {
139         elasticsearch {
140             codec => "json"
141             hosts => [elasticsearch]
142             index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
143             doc_as_upsert => true
144
145         }
146     }
147
148 }