Merge "improve logstash parsing"
[oom.git] / kubernetes / clamp / charts / clamp-dash-logstash / resources / config / pipeline.conf
1 # Copyright © 2018  AT&T, Amdocs, Bell Canada Intellectual Property.  All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 input {
15   http_poller {
16         urls => {
17             event_queue => {
18                 method => get
19                 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
20                 headers => {
21                     Accept => "application/json"
22                 }
23                 add_field => { "topic" => "${event_topic}" }
24                 type => "dmaap_event"
25             }
26             notification_queue => {
27                 method => get
28                 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
29                 headers => {
30                     Accept => "application/json"
31                 }
32                 add_field => { "topic" => "${notification_topic}" }
33                 type => "dmaap_notification"
34             }
35             request_queue => {
36                 method => get
37                 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
38                 headers => {
39                     Accept => "application/json"
40                 }
41                 add_field => { "topic" => "${request_topic}" }
42                 type => "dmaap_request"
43             }
44         }
45         socket_timeout => 30
46         request_timeout => 30
47         interval => 60
48         codec => "plain"
49   }
50 }
51
52 filter {
53         if [type] != "dmaap_log" {
54         # avoid noise if no entry in the list
55         if [message] == "[]" {
56            drop { }
57         }
58
59         # parse json, split  the list into multiple events, and parse each event
60         json {
61              source => "[message]"
62              target => "message"
63         }
64         split {
65               field => "message"
66                   add_field => {
67                         "type" => "%{type}"
68                         "topic" => "%{topic}"
69                   }
70         }
71         json {
72              source => "message"
73         }
74         mutate { remove_field => [ "message" ] }
75     }
76     
77     # express timestamps in milliseconds instead of microseconds
78     ruby {
79             code => "
80                      if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
81                        event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
82                      else
83                        event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
84                      end
85                     "
86     }
87     date {
88         match => [ "closedLoopAlarmStart", UNIX_MS ]
89         target => "closedLoopAlarmStart"
90     }
91
92     if [closedLoopAlarmEnd] {
93         ruby {
94             code => "
95                     if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999  
96                       event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
97                     else
98                       event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
99                     end
100                     "
101         }
102         date {
103             match => [ "closedLoopAlarmEnd", UNIX_MS ]
104             target => "closedLoopAlarmEnd"
105         }
106
107     }
108     #"yyyy-MM-dd HH:mm:ss"
109     if [notificationTime] {
110        mutate {
111               gsub => [
112                    "notificationTime", " ", "T"
113               ]
114        }
115        date {
116             match => [ "notificationTime", ISO8601 ]
117             target => "notificationTime"
118        }
119     }
120 }
121 output {
122     stdout {
123         codec => rubydebug
124     }
125
126     if [http_request_failure] {
127         elasticsearch {
128             codec => "json"
129             hosts => ["${elasticsearch_base_url}"]
130             index => "errors-%{+YYYY.MM.DD}"
131             doc_as_upsert => true
132         }
133     } else {
134         elasticsearch {
135             codec => "json"
136             hosts => ["${elasticsearch_base_url}"]
137             index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes
138             doc_as_upsert => true
139
140         }
141     }
142
143 }