2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dcae.commonFunction;
23 import java.io.FileReader;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.Method;
26 import java.text.SimpleDateFormat;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 import com.att.nsa.clock.SaClock;
31 import com.att.nsa.logging.LoggingContext;
32 import com.att.nsa.logging.log4j.EcompFields;
33 import com.google.gson.JsonArray;
34 import com.google.gson.JsonParser;
36 import java.util.Arrays;
37 import java.util.Date;
38 import java.util.HashMap;
39 import java.util.TimeZone;
40 import java.util.UUID;
42 import org.json.JSONArray;
43 import org.json.JSONObject;
45 public class EventProcessor implements Runnable {
46 private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
48 private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
49 private JSONObject event = null;
51 public EventProcessor() {
52 log.debug("EventProcessor: Default Constructor");
54 String list[] = CommonStartup.streamid.split("\\|");
55 for (int i = 0; i < list.length; i++) {
56 String domain = list[i].split("=")[0];
57 //String streamIdList[] = list[i].split("=")[1].split(",");
58 String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(",");
60 log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList));
61 streamid_hash.put(domain, streamIdList);
71 event = CommonStartup.fProcessingInputQueue.take();
72 log.info("EventProcessor\tRemoving element: " + event);
74 //EventPublisher Ep=new EventPublisher();
75 while (event != null) {
76 // As long as the producer is running we remove elements from the queue.
78 //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString());
79 String uuid = event.get("VESuniqueId").toString();
80 LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
81 localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
83 log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
84 String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
85 log.debug("streamIdList:" + streamIdList);
87 if (streamIdList.length == 0) {
88 log.error("No StreamID defined for publish - Message dropped" + event.toString());
92 for (int i=0; i < streamIdList.length; i++)
94 log.info("Invoking publisher for streamId:" + streamIdList[i]);
96 EventPublisher.getInstance(streamIdList[i]).sendEvent(event);
100 log.debug("Message published" + event.toString());
101 event = CommonStartup.fProcessingInputQueue.take();
102 // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
104 } catch (InterruptedException e) {
105 log.error("EventProcessor InterruptedException" + e.getMessage());
111 @SuppressWarnings({ "unchecked", "rawtypes" })
112 public void overrideEvent()
114 //Set collector timestamp in event payload before publish
115 final Date currentTime = new Date();
116 final SimpleDateFormat sdf = new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z");
117 sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
119 /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
120 JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray );
121 JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
122 commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/
126 "commonEventHeader": {
127 "internalHeaderFields": {
128 "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT"
132 //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
133 JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) );
134 JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
135 commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
136 event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey);
138 if (CommonStartup.eventTransformFlag == 1)
140 // read the mapping json file
141 final JsonParser parser = new JsonParser();
143 final JsonArray jo = (JsonArray) parser.parse ( new FileReader ( "./etc/eventTransform.json" ) );
144 log.info("parse eventTransform.json");
145 // now convert to org.json
146 final String jsonText = jo.toString ();
147 final JSONArray topLevel = new JSONArray ( jsonText );
148 //log.info("topLevel == " + topLevel);
150 Class[] paramJSONObject = new Class[1];
151 paramJSONObject[0] = JSONObject.class;
152 //load VESProcessors class at runtime
153 Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors");
154 Constructor constr = cls.getConstructor(paramJSONObject);
155 Object obj = constr.newInstance(event);
157 for (int j=0; j<topLevel.length(); j++)
159 JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter");
160 Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject);
161 boolean filterMet = (boolean) method.invoke (obj, filterObj );
164 final JSONArray processors = (JSONArray)topLevel.getJSONObject(j).getJSONArray("processors");
166 //call the processor method
167 for (int i=0; i < processors.length(); i++)
169 final JSONObject processorList = processors.getJSONObject(i);
170 final String functionName = processorList.getString("functionName");
171 final JSONObject args = processorList.getJSONObject("args");
172 //final JSONObject filter = processorList.getJSONObject("filter");
174 log.info("functionName==" + functionName + " | args==" + args);
175 //reflect method call
176 method = cls.getDeclaredMethod(functionName, paramJSONObject);
177 method.invoke(obj, args);
182 } catch (Exception e) {
184 log.error("EventProcessor Exception" + e.getMessage() + e);
185 log.error("EventProcessor Exception" + e.getCause());
188 log.debug("Modified event:" + event);