X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=kubernetes%2Fclamp%2Fcharts%2Fclamp-dash-logstash%2Fresources%2Fconfig%2Fpipeline.conf;h=4b05910c0292f7d2cfbc43bdac6e256fe5915ac7;hb=d98cc501b887299af62d46a0d9071835d9af5ed0;hp=aa087e344549f102d10c0b5931bb6fc83ce0d428;hpb=3fcb4a062e4d1bd9c7e1a4e0553199c8cd2e77ad;p=oom.git diff --git a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf index aa087e3445..4b05910c02 100644 --- a/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf +++ b/kubernetes/clamp/charts/clamp-dash-logstash/resources/config/pipeline.conf @@ -1,5 +1,18 @@ +# Copyright (c) 2018 AT&T Intellectual Property. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. input { - http_poller { + http_poller { urls => { event_queue => { method => get @@ -7,7 +20,8 @@ input { headers => { Accept => "application/json" } - add_field => { "topic" => "${event_topic}" } + topic => "${event_topic}" + tags => [ "dmaap_source" ] } notification_queue => { method => get @@ -15,7 +29,8 @@ input { headers => { Accept => "application/json" } - add_field => { "topic" => "${notification_topic}" } + topic => "${notification_topic}" + tags => [ "dmaap_source" ] } request_queue => { method => get @@ -23,46 +38,84 @@ input { headers => { Accept => "application/json" } - add_field => { "topic" => "${request_topic}" } + topic => "${request_topic}" + tags => [ "dmaap_source" ] } } socket_timeout => 30 request_timeout => 30 - interval => 60 + schedule => { "every" => "1m" } codec => "plain" - } + cacert => "/certs.d/aafca.pem" + } } + filter { # avoid noise if no entry in the list if [message] == "[]" { - drop { } + drop { } } - # parse json, split the list into multiple events, and parse each event - json { - source => "[message]" - target => "message" - } - split { - field => "message" - } - json { - source => "message" + if [http_request_failure] or [@metadata][code] != 200 { + mutate { + add_tag => [ "error" ] + } } - mutate { remove_field => [ "message" ] } - # express timestamps in milliseconds instead of microseconds - ruby { - code => "event.set('closedLoopAlarmStart', Integer(event.get('closedLoopAlarmStart')))" + + if "dmaap_source" in [@metadata][request][tags] { + # + # Dmaap provides a json list, whose items are Strings containing the event + # provided to Dmaap, which itself is an escaped json. + # + # We first need to parse the json as we have to use the plaintext as it cannot + # work with list of events, then split that list into multiple string events, + # that we then transform into json. + # + json { + source => "[message]" + target => "message" + } + + split { + field => "message" + } + json { + source => "message" + } + mutate { + remove_field => [ "message" ] + } } - date { - match => [ "closedLoopAlarmStart", UNIX_MS ] - target => "closedLoopAlarmStart" + + # + # Some timestamps are expressed as milliseconds, some are in microseconds + # + if [closedLoopAlarmStart] { + ruby { + code => " + if event.get('closedLoopAlarmStart').to_s.to_i(10) > 9999999999999 + event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10) / 1000) + else + event.set('closedLoopAlarmStart', event.get('closedLoopAlarmStart').to_s.to_i(10)) + end + " + } + date { + match => [ "closedLoopAlarmStart", UNIX_MS ] + target => "closedLoopAlarmStart" + } } if [closedLoopAlarmEnd] { ruby { - code => "event.set('closedLoopAlarmEnd', Integer(event.get('closedLoopAlarmEnd')))" + code => " + if event.get('closedLoopAlarmEnd').to_s.to_i(10) > 9999999999999 + event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10) / 1000) + else + event.set('closedLoopAlarmEnd', event.get('closedLoopAlarmEnd').to_s.to_i(10)) + end + " } date { match => [ "closedLoopAlarmEnd", UNIX_MS ] @@ -70,39 +123,134 @@ filter { } } - #"yyyy-MM-dd HH:mm:ss" + + + # + # Notification time are expressed under the form "yyyy-MM-dd HH:mm:ss", which + # is close to ISO8601, but lacks of T as spacer: "yyyy-MM-ddTHH:mm:ss" + # if [notificationTime] { - mutate { - gsub => [ - "notificationTime", " ", "T" - ] - } - date { + mutate { + gsub => [ + "notificationTime", " ", "T" + ] + } + date { match => [ "notificationTime", ISO8601 ] target => "notificationTime" - } + } + } + + + # + # Renaming some fields for readability + # + if [AAI][generic-vnf.vnf-name] { + mutate { + add_field => { "vnfName" => "%{[AAI][generic-vnf.vnf-name]}" } + } + } + if [AAI][generic-vnf.vnf-type] { + mutate { + add_field => { "vnfType" => "%{[AAI][generic-vnf.vnf-type]}" } + } + } + if [AAI][vserver.vserver-name] { + mutate { + add_field => { "vmName" => "%{[AAI][vserver.vserver-name]}" } + } + } + if [AAI][complex.city] { + mutate { + add_field => { "locationCity" => "%{[AAI][complex.city]}" } + } + } + if [AAI][complex.state] { + mutate { + add_field => { "locationState" => "%{[AAI][complex.state]}" } + } + } + + + # + # Adding some flags to ease aggregation + # + if [closedLoopEventStatus] =~ /(?i)ABATED/ { + mutate { + add_field => { "flagAbated" => "1" } + } + } + if [notification] =~ /^.*?(?:\b|_)FINAL(?:\b|_).*?(?:\b|_)FAILURE(?:\b|_).*?$/ { + mutate { + add_field => { "flagFinalFailure" => "1" } + } + } + + + if "error" not in [@metadata][request][tags]{ + # + # Creating data for a secondary index + # + clone { + clones => [ "event-cl-aggs" ] + add_tag => [ "event-cl-aggs" ] + } + + if "event-cl-aggs" in [@metadata][request][tags]{ + # + # we only need a few fields for aggregations; remove all fields from clone except : + # vmName,vnfName,vnfType,requestID,closedLoopAlarmStart, closedLoopControlName,closedLoopAlarmEnd,abated,nbrDmaapevents,finalFailure + # + prune { + whitelist_names => ["^@.*$","^topic$","^type$","^tags$","^flagFinalFailure$","^flagAbated$","^locationState$","^locationCity$","^vmName$","^vnfName$","^vnfType$","^requestID$","^closedLoopAlarmStart$","^closedLoopControlName$","^closedLoopAlarmEnd$","^target$","^target_type$","^triggerSourceName$","^policyScope$","^policyName$","^policyVersion$"] + } + + } } } + + output { stdout { codec => rubydebug } - if [http_request_failure] { + if "error" in [tags] { elasticsearch { codec => "json" + cacert => "/clamp-cert/ca-certs.pem" + ssl_certificate_verification => false hosts => ["${elasticsearch_base_url}"] + user => ["${logstash_user}"] + password => ["${logstash_pwd}"] index => "errors-%{+YYYY.MM.DD}" doc_as_upsert => true } - } else { + + } else if "event-cl-aggs" in [tags] { elasticsearch { codec => "json" hosts => ["${elasticsearch_base_url}"] - index => "logstash-%{+YYYY.MM.DD}" # creates daily indexes + cacert => "/clamp-cert/ca-certs.pem" + ssl_certificate_verification => false + user => ["${logstash_user}"] + password => ["${logstash_pwd}"] + document_id => "%{requestID}" + index => "events-cl-%{+YYYY.MM.DD}" # creates daily indexes for control loop doc_as_upsert => true + action => "update" + } + } else { + elasticsearch { + codec => "json" + hosts => ["${elasticsearch_base_url}"] + cacert => "/clamp-cert/ca-certs.pem" + ssl_certificate_verification => false + user => ["${logstash_user}"] + password => ["${logstash_pwd}"] + index => "events-%{+YYYY.MM.DD}" # creates daily indexes + doc_as_upsert => true } } - }