Fix bug with processing event list
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / commonFunction / EventSender.java
1 /*
2  * ============LICENSE_START=======================================================
3  * PROJECT
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2018 Nokia. All rights reserved.s
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.dcae.commonFunction;
22
23 import com.google.common.reflect.TypeToken;
24 import com.google.gson.Gson;
25 import io.vavr.collection.Map;
26 import io.vavr.control.Option;
27 import java.io.FileReader;
28 import java.io.IOException;
29 import java.lang.reflect.Type;
30 import java.text.SimpleDateFormat;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Optional;
35 import org.json.JSONObject;
36 import org.onap.dcae.ApplicationSettings;
37 import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 public class EventSender {
42
43   private Map<String, String[]> streamidHash;
44   private ApplicationSettings properties;
45   private EventPublisher eventPublisher;
46
47   static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
48   private static final Logger log = LoggerFactory.getLogger(EventSender.class);
49   private static final String EVENT_LITERAL = "event";
50   private static final String COMMON_EVENT_HEADER = "commonEventHeader";
51   private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
52
53   public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) {
54     this.eventPublisher = eventPublisher;
55     this.streamidHash = properties.dMaaPStreamsMapping();
56     this.properties = properties;
57
58   }
59
60   public void send(JSONObject event) {
61     streamidHash.get(getDomain(event))
62         .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event))
63         .forEach(streamIds -> sendEventsToStreams(event, streamIds));
64   }
65
66   public static String getDomain(JSONObject event) {
67     return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
68   }
69
70   private void sendEventsToStreams(JSONObject event, String[] streamIdList) {
71     for (String aStreamIdList : streamIdList) {
72       log.info("Invoking publisher for streamId:" + aStreamIdList);
73       eventPublisher.sendEvent(overrideEvent(event), aStreamIdList);
74     }
75   }
76
77   private JSONObject overrideEvent(JSONObject event) {
78     JSONObject jsonObject = addCurrentTimeToEvent(event);
79     if (properties.eventTransformingEnabled()) {
80       try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
81         log.info("parse eventTransform.json");
82         List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
83         parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject)));
84       } catch (IOException e) {
85         log.error("Couldn't find file ./etc/eventTransform.json" + e.toString());
86       }
87     }
88     if (jsonObject.has("VESversion"))
89       jsonObject.remove("VESversion");
90
91     log.debug("Modified event:" + jsonObject);
92     return jsonObject;
93   }
94
95   private JSONObject addCurrentTimeToEvent(JSONObject event) {
96     final Date currentTime = new Date();
97     JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
98     JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
99     commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
100     event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
101     return event;
102   }
103
104   private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
105     for (Event eventTransform : eventsTransform) {
106       JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
107       if (configProcessorAdapter.isFilterMet(filterObj)) {
108         callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
109       }
110     }
111   }
112
113   private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
114     for (Processor processor : processors) {
115       final String functionName = processor.functionName;
116       final JSONObject args = new JSONObject(processor.args.toString());
117       log.info(String.format("functionName==%s | args==%s", functionName, args));
118       try {
119         configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
120       } catch (ReflectiveOperationException e) {
121         log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());
122       }
123     }
124   }
125 }