Fixing sonar and javadoc issues.
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / commonFunction / EventProcessor.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * PROJECT
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  * 
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  * 
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ============LICENSE_END=========================================================
19  */
20
21 package org.onap.dcae.commonFunction;
22
23 import java.io.FileReader;
24 import java.lang.reflect.Constructor;
25 import java.lang.reflect.Method;
26 import java.text.SimpleDateFormat;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import com.att.nsa.clock.SaClock;
31 import com.att.nsa.logging.LoggingContext;
32 import com.att.nsa.logging.log4j.EcompFields;
33 import com.google.gson.JsonArray;
34 import com.google.gson.JsonParser;
35
36 import java.util.Arrays;
37 import java.util.Date;
38 import java.util.HashMap;
39 import java.util.TimeZone;
40 import java.util.UUID;
41
42 import org.json.JSONArray;
43 import org.json.JSONObject;
44
45 public class EventProcessor implements Runnable {
46         private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
47
48         private static HashMap<String, String[]> streamid_hash = new HashMap<String, String[]>();
49         private JSONObject event = null;
50
51         public EventProcessor() {
52                 log.debug("EventProcessor: Default Constructor");
53                 
54                 String list[] = CommonStartup.streamid.split("\\|");
55                 for (int i = 0; i < list.length; i++) {
56                         String domain = list[i].split("=")[0];
57                         //String streamIdList[] = list[i].split("=")[1].split(",");
58                         String streamIdList[] = list[i].substring(list[i].indexOf("=") +1).split(",");
59                         
60                         log.debug("Domain: " + domain + " streamIdList:" + Arrays.toString(streamIdList));
61                         streamid_hash.put(domain, streamIdList);
62                 }
63                 
64         }
65
66         @Override
67         public void run() {
68
69                 try {
70                         
71                         event = CommonStartup.fProcessingInputQueue.take();
72                         log.info("EventProcessor\tRemoving element: " + event);
73                         
74                         //EventPublisher Ep=new EventPublisher();
75                         while (event != null) {
76                                 // As long as the producer is running we remove elements from the queue.
77
78                                 //UUID uuid = UUID.fromString(event.get("VESuniqueId").toString());
79                                 String uuid = event.get("VESuniqueId").toString();
80                                 LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid.toString());
81                                 localLC .put ( EcompFields.kBeginTimestampMs, SaClock.now () );
82                                 
83                                 log.debug("event.VESuniqueId" + event.get("VESuniqueId") + "event.commonEventHeader.domain:" + event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
84                                 String streamIdList[]=streamid_hash.get(event.getJSONObject("event").getJSONObject("commonEventHeader").getString("domain"));
85                                 log.debug("streamIdList:" + streamIdList);
86                                 
87                                 if (streamIdList.length == 0)           {
88                                         log.error("No StreamID defined for publish - Message dropped" + event.toString());
89                                 } 
90                                 
91                                 else {
92                                         for (int i=0; i < streamIdList.length; i++)
93                                         {
94                                                 log.info("Invoking publisher for streamId:" + streamIdList[i]);
95                                                 this.overrideEvent();
96                                                 EventPublisher.getInstance(streamIdList[i]).sendEvent(event);
97                                                 
98                                         }
99                                 }
100                                 log.debug("Message published" + event.toString());
101                                 event = CommonStartup.fProcessingInputQueue.take();
102                                 // log.info("EventProcessor\tRemoving element: " + this.queue.remove());
103                         }
104                 } catch (InterruptedException e) {
105                         log.error("EventProcessor InterruptedException" + e.getMessage());
106                 }
107
108         }
109
110         
111         @SuppressWarnings({ "unchecked", "rawtypes" })
112         public void overrideEvent()
113         {
114                 //Set collector timestamp in event payload before publish
115                 final Date currentTime = new Date();
116                 final SimpleDateFormat sdf =   new SimpleDateFormat("EEE, MM dd yyyy hh:mm:ss z"); 
117                 sdf.setTimeZone(TimeZone.getTimeZone("GMT"));
118                 
119                 /*JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
120                 JSONObject additionalParameter = new JSONObject().put("additionalParameters",additionalParametersarray );
121                 JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
122                 commonEventHeaderkey.put("internalHeaderFields", additionalParameter);*/
123                 
124
125 /*                "event": {
126             "commonEventHeader": {
127                             "internalHeaderFields": {
128                                             "collectorTimeStamp": "Fri, 04 21 2017 04:11:52 GMT"
129                             },
130 */
131                 
132                 //JSONArray additionalParametersarray = new JSONArray().put(new JSONObject().put("collectorTimeStamp", sdf.format(currentTime)));
133                 JSONObject collectorTimeStamp = new JSONObject().put("collectorTimeStamp",sdf.format(currentTime) );
134                 JSONObject commonEventHeaderkey = event.getJSONObject("event").getJSONObject("commonEventHeader");
135                 commonEventHeaderkey.put("internalHeaderFields", collectorTimeStamp);
136                 event.getJSONObject("event").put("commonEventHeader",commonEventHeaderkey);     
137                 
138                 if (CommonStartup.eventTransformFlag == 1)
139                 {
140                                 // read the mapping json file
141                                 final JsonParser parser = new JsonParser();
142                                 try {
143                                         final JsonArray jo =  (JsonArray) parser.parse ( new FileReader ( "./etc/eventTransform.json" ) );
144                                         log.info("parse eventTransform.json");
145                                         // now convert to org.json
146                                         final String jsonText = jo.toString ();
147                                         final JSONArray topLevel = new JSONArray ( jsonText );
148                                         //log.info("topLevel == " + topLevel);
149                                         
150                                         Class[] paramJSONObject = new Class[1];
151                                         paramJSONObject[0] = JSONObject.class;
152                                         //load VESProcessors class at runtime
153                                         Class cls = Class.forName("org.onap.dcae.commonFunction.ConfigProcessors");
154                                         Constructor constr = cls.getConstructor(paramJSONObject);
155                                         Object obj = constr.newInstance(event);
156                                                 
157                                         for (int j=0; j<topLevel.length(); j++)
158                                         {
159                                                 JSONObject filterObj = topLevel.getJSONObject(j).getJSONObject("filter");
160                                                 Method method = cls.getDeclaredMethod("isFilterMet", paramJSONObject);
161                                                 boolean filterMet = (boolean) method.invoke (obj, filterObj );
162                                                 if (filterMet)
163                                                 {
164                                                         final JSONArray processors = (JSONArray)topLevel.getJSONObject(j).getJSONArray("processors");
165                                                 
166                                                         //call the processor method
167                                                         for (int i=0; i < processors.length(); i++)
168                                                         {
169                                                                 final JSONObject processorList = processors.getJSONObject(i);
170                                                                 final String functionName = processorList.getString("functionName");
171                                                                 final JSONObject args = processorList.getJSONObject("args");
172                                                                 //final JSONObject filter = processorList.getJSONObject("filter");
173                                                         
174                                                                 log.info("functionName==" + functionName + " | args==" + args);
175                                                                 //reflect method call
176                                                                 method = cls.getDeclaredMethod(functionName, paramJSONObject);
177                                                                 method.invoke(obj, args);
178                                                         }
179                                                 }
180                                         }
181                                         
182                                 } catch (Exception e) {
183                                         
184                                         log.error("EventProcessor Exception" + e.getMessage() + e);
185                                         log.error("EventProcessor Exception" + e.getCause());
186                                 } 
187                 }       
188                 log.debug("Modified event:" + event);
189                 
190         }
191 }