[CONSUL] Add limits to consul chart.
[oom.git] / kubernetes / clamp / components / clamp-dash-logstash / resources / config / pipeline.conf
1 # Copyright (c) 2018 AT&T Intellectual Property.  All rights reserved.
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #       http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 input {
15     http_poller {
16         urls => {
17             event_queue => {
18                 method => get
19                 url => "${dmaap_base_url}/events/${event_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
20                 headers => {
21                     Accept => "application/json"
22                 }
23                 topic => "${event_topic}"
24                 tags => [ "dmaap_source" ]
25             }
26             notification_queue => {
27                 method => get
28                 url => "${dmaap_base_url}/events/${notification_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
29                 headers => {
30                     Accept => "application/json"
31                 }
32                 topic => "${notification_topic}"
33                 tags => [ "dmaap_source" ]
34             }
35             request_queue => {
36                 method => get
37                 url => "${dmaap_base_url}/events/${request_topic}/${dmaap_consumer_group}/${dmaap_consumer_id}?timeout=15000"
38                 headers => {
39                     Accept => "application/json"
40                 }
41                 topic => "${request_topic}"
42                 tags => [ "dmaap_source" ]
43             }
44         }
45         socket_timeout => 30
46         request_timeout => 30
47         schedule => { "every" => "1m" }
48         codec => "plain"
49 {{- if .Values.global.aafEnabled }}
50         cacert => "{{ .Values.certInitializer.credsPath }}/{{ .Values.certInitializer.clamp_ca_certs_pem }}"
51 {{- else }}
52         cacert => "/certs.d/aafca.pem"
53 {{- end }}
54     }
55 }
56
57
58 filter {
59     # avoid noise if no entry in the list
60     if [message] == "[]" {
61         drop { }
62     }
63
64     if [http_request_failure] or [@metadata][code] != 200 {
65        mutate {
66               add_tag => [ "error" ]
67        }
68     }
69
70     if "dmaap_source" in [@metadata][request][tags] {
71         #
72         # Dmaap provides a json list, whose items are Strings containing the event
73         # provided to Dmaap, which itself is an escaped json.
74         #
75         # We first need to parse the json as we have to use the plaintext as it cannot
76         # work with list of events, then split that list into multiple string events,
77         # that we then transform into json.
78         #
79         json {
80             source => "[message]"
81             target => "message"
82         }
83
84         split {
85             field => "message"
86         }
87         json {
88             source => "message"
89         }
90         mutate {
91             remove_field => [ "message" ]
92         }
93     }
94
95     #
96     # Some timestamps are expressed as milliseconds, some are in microseconds
97     #
98     if [closedLoopAlarmStart] {
99         ruby {
100             code => "
101             if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999
102               event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000)
103             else
104               event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10))
105             end
106             "
107         }
108         date {
109             match => [ "closedLoopAlarmStart", UNIX_MS ]
110             target => "closedLoopAlarmStart"
111         }
112     }
113
114     if [closedLoopAlarmEnd] {
115         ruby {
116             code => "
117             if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999
118               event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000)
119             else
120               event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10))
121             end
122             "
123         }
124         date {
125             match => [ "closedLoopAlarmEnd", UNIX_MS ]
126             target => "closedLoopAlarmEnd"
127         }
128
129     }
130
131
132     #
133     # Notification time are expressed under the form "yyyy-MM-dd HH:mm:ss", which
134     # is close to ISO8601, but lacks of T as spacer: "yyyy-MM-ddTHH:mm:ss"
135     #
136     if [notificationTime] {
137         mutate {
138             gsub => [
139                 "notificationTime", " ", "T"
140                 ]
141         }
142         date {
143             match => [ "notificationTime", ISO8601 ]
144             target => "notificationTime"
145         }
146     }
147
148
149     #
150     # Renaming some fields for readability
151     #
152         if [AAI][generic-vnf.vnf-name] {
153             mutate {
154                 add_field => { "vnfName" => "%{[AAI][generic-vnf.vnf-name]}" }
155             }
156         }
157         if [AAI][generic-vnf.vnf-type] {
158             mutate {
159                 add_field => { "vnfType" => "%{[AAI][generic-vnf.vnf-type]}" }
160             }
161         }
162         if [AAI][vserver.vserver-name] {
163             mutate {
164                 add_field => { "vmName" => "%{[AAI][vserver.vserver-name]}" }
165             }
166         }
167         if [AAI][complex.city] {
168             mutate {
169                 add_field => { "locationCity" => "%{[AAI][complex.city]}" }
170             }
171         }
172         if [AAI][complex.state] {
173             mutate {
174                 add_field => { "locationState" => "%{[AAI][complex.state]}" }
175             }
176         }
177
178
179     #
180     # Adding some flags to ease aggregation
181     #
182     if [closedLoopEventStatus] =~ /(?i)ABATED/ {
183         mutate {
184             add_field => { "flagAbated" => "1" }
185         }
186     }
187     if [notification] =~ /^.*?(?:\b|_)FINAL(?:\b|_).*?(?:\b|_)FAILURE(?:\b|_).*?$/ {
188         mutate {
189             add_field => { "flagFinalFailure" => "1" }
190         }
191     }
192
193
194     if "error" not in [@metadata][request][tags]{
195         #
196         # Creating data for a secondary index
197         #
198         clone {
199             clones => [ "event-cl-aggs" ]
200             add_tag => [ "event-cl-aggs" ]
201         }
202
203         if  "event-cl-aggs" in [@metadata][request][tags]{
204             #
205             # we only need a few fields for aggregations; remove all fields from clone except :
206             #   vmName,vnfName,vnfType,requestID,closedLoopAlarmStart, closedLoopControlName,closedLoopAlarmEnd,abated,nbrDmaapevents,finalFailure
207             #
208             prune {
209                 whitelist_names => ["^@.*$","^topic$","^type$","^tags$","^flagFinalFailure$","^flagAbated$","^locationState$","^locationCity$","^vmName$","^vnfName$","^vnfType$","^requestID$","^closedLoopAlarmStart$","^closedLoopControlName$","^closedLoopAlarmEnd$","^target$","^target_type$","^triggerSourceName$","^policyScope$","^policyName$","^policyVersion$"]
210             }
211
212         }
213     }
214 }
215
216
217 output {
218     stdout {
219         codec => rubydebug
220     }
221
222     if "error" in [tags] {
223         elasticsearch {
224             ilm_enabled => false
225             codec => "json"
226 {{- if .Values.global.aafEnabled }}
227             cacert => "{{ .Values.certInitializer.credsPath }}/{{ .Values.certInitializer.clamp_ca_certs_pem }}"
228 {{- else }}
229             cacert => "/clamp-cert/ca-certs.pem"
230 {{- end }}
231             ssl_certificate_verification => false
232             hosts => ["${elasticsearch_base_url}"]
233             user => ["${logstash_user}"]
234             password => ["${logstash_pwd}"]
235             index => "errors-%{+YYYY.MM.DD}"
236             doc_as_upsert => true
237         }
238
239     } else if "event-cl-aggs" in [tags] {
240         elasticsearch {
241             ilm_enabled => false
242             codec => "json"
243             hosts => ["${elasticsearch_base_url}"]
244 {{- if .Values.global.aafEnabled }}
245             cacert => "{{ .Values.certInitializer.credsPath }}/{{ .Values.certInitializer.clamp_ca_certs_pem }}"
246 {{- else }}
247             cacert => "/clamp-cert/ca-certs.pem"
248 {{- end }}
249             ssl_certificate_verification => false
250             user => ["${logstash_user}"]
251             password => ["${logstash_pwd}"]
252             document_id => "%{requestID}"
253             index => "events-cl-%{+YYYY.MM.DD}" # creates daily indexes for control loop
254             doc_as_upsert => true
255             action => "update"
256         }
257
258     } else {
259         elasticsearch {
260             ilm_enabled => false
261             codec => "json"
262             hosts => ["${elasticsearch_base_url}"]
263 {{- if .Values.global.aafEnabled }}
264             cacert => "{{ .Values.certInitializer.credsPath }}/{{ .Values.certInitializer.clamp_ca_certs_pem }}"
265 {{- else }}
266             cacert => "/clamp-cert/ca-certs.pem"
267 {{- end }}
268             ssl_certificate_verification => false
269             user => ["${logstash_user}"]
270             password => ["${logstash_pwd}"]
271             index => "events-%{+YYYY.MM.DD}" # creates daily indexes
272             doc_as_upsert => true
273         }
274     }
275 }