2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright (C) 2018 Nokia. All rights reserved.s
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.dcae.commonFunction;
23 import com.google.common.reflect.TypeToken;
24 import com.google.gson.Gson;
25 import io.vavr.collection.Map;
26 import io.vavr.control.Option;
27 import java.io.FileReader;
28 import java.io.IOException;
29 import java.lang.reflect.Type;
30 import java.text.SimpleDateFormat;
31 import java.util.Arrays;
32 import java.util.Date;
33 import java.util.List;
34 import java.util.Optional;
35 import org.json.JSONObject;
36 import org.onap.dcae.ApplicationSettings;
37 import org.onap.dcae.commonFunction.event.publishing.EventPublisher;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
41 public class EventSender {
43 private Map<String, String[]> streamidHash;
44 private ApplicationSettings properties;
45 private EventPublisher eventPublisher;
47 static final Type EVENT_LIST_TYPE = new TypeToken<List<Event>>() {}.getType();
48 private static final Logger log = LoggerFactory.getLogger(EventSender.class);
49 private static final String EVENT_LITERAL = "event";
50 private static final String COMMON_EVENT_HEADER = "commonEventHeader";
51 private final SimpleDateFormat dateFormat = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
53 public EventSender( EventPublisher eventPublisher, ApplicationSettings properties) {
54 this.eventPublisher = eventPublisher;
55 this.streamidHash = properties.dMaaPStreamsMapping();
56 this.properties = properties;
60 public void send(JSONObject event) {
61 streamidHash.get(getDomain(event))
62 .onEmpty(() -> log.error("No StreamID defined for publish - Message dropped" + event))
63 .forEach(streamIds -> sendEventsToStreams(event, streamIds));
66 public static String getDomain(JSONObject event) {
67 return event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER).getString("domain");
70 private void sendEventsToStreams(JSONObject event, String[] streamIdList) {
71 for (String aStreamIdList : streamIdList) {
72 log.info("Invoking publisher for streamId:" + aStreamIdList);
73 eventPublisher.sendEvent(overrideEvent(event), aStreamIdList);
77 private JSONObject overrideEvent(JSONObject event) {
78 JSONObject jsonObject = addCurrentTimeToEvent(event);
79 if (properties.eventTransformingEnabled()) {
80 try (FileReader fr = new FileReader("./etc/eventTransform.json")) {
81 log.info("parse eventTransform.json");
82 List<Event> events = new Gson().fromJson(fr, EVENT_LIST_TYPE);
83 parseEventsJson(events, new ConfigProcessorAdapter(new ConfigProcessors(jsonObject)));
84 } catch (IOException e) {
85 log.error("Couldn't find file ./etc/eventTransform.json" + e.toString());
88 if (jsonObject.has("VESversion"))
89 jsonObject.remove("VESversion");
91 log.debug("Modified event:" + jsonObject);
95 private JSONObject addCurrentTimeToEvent(JSONObject event) {
96 final Date currentTime = new Date();
97 JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp", dateFormat.format(currentTime));
98 JSONObject commonEventHeaderkey = event.getJSONObject(EVENT_LITERAL).getJSONObject(COMMON_EVENT_HEADER);
99 commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
100 event.getJSONObject(EVENT_LITERAL).put(COMMON_EVENT_HEADER, commonEventHeaderkey);
104 private void parseEventsJson(List<Event> eventsTransform, ConfigProcessorAdapter configProcessorAdapter) {
105 for (Event eventTransform : eventsTransform) {
106 JSONObject filterObj = new JSONObject(eventTransform.filter.toString());
107 if (configProcessorAdapter.isFilterMet(filterObj)) {
108 callProcessorsMethod(configProcessorAdapter, eventTransform.processors);
113 private void callProcessorsMethod(ConfigProcessorAdapter configProcessorAdapter, List<Processor> processors) {
114 for (Processor processor : processors) {
115 final String functionName = processor.functionName;
116 final JSONObject args = new JSONObject(processor.args.toString());
117 log.info(String.format("functionName==%s | args==%s", functionName, args));
119 configProcessorAdapter.runConfigProcessorFunctionByName(functionName, args);
120 } catch (ReflectiveOperationException e) {
121 log.error("EventProcessor Exception" + e.getMessage() + e + e.getCause());