6b5502f7bd416562f8965deaebd39002ad13b2ed
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / commonFunction / CommonStartup.java
1 /*-\r
2  * ============LICENSE_START=======================================================\r
3  * PROJECT\r
4  * ================================================================================\r
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.\r
6  * ================================================================================\r
7  * Licensed under the Apache License, Version 2.0 (the "License");\r
8  * you may not use this file except in compliance with the License.\r
9  * You may obtain a copy of the License at\r
10  * \r
11  *      http://www.apache.org/licenses/LICENSE-2.0\r
12  * \r
13  * Unless required by applicable law or agreed to in writing, software\r
14  * distributed under the License is distributed on an "AS IS" BASIS,\r
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
16  * See the License for the specific language governing permissions and\r
17  * limitations under the License.\r
18  * ============LICENSE_END=========================================================\r
19  */\r
20 \r
21 package org.onap.dcae.commonFunction;\r
22 \r
23 import com.att.nsa.apiServer.ApiServer;\r
24 import com.att.nsa.apiServer.ApiServerConnector;\r
25 import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;\r
26 import com.att.nsa.cmdLine.NsaCommandLineUtil;\r
27 import com.att.nsa.drumlin.service.framework.DrumlinServlet;\r
28 import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;\r
29 import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;\r
30 import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;\r
31 import com.att.nsa.drumlin.till.nv.rrNvReadable;\r
32 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;\r
33 import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;\r
34 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;\r
35 import com.fasterxml.jackson.core.JsonParseException;\r
36 import com.fasterxml.jackson.databind.JsonNode;\r
37 import com.github.fge.jsonschema.exceptions.ProcessingException;\r
38 import com.github.fge.jsonschema.main.JsonSchema;\r
39 import com.github.fge.jsonschema.main.JsonSchemaFactory;\r
40 import com.github.fge.jsonschema.report.ProcessingMessage;\r
41 import com.github.fge.jsonschema.report.ProcessingReport;\r
42 import com.github.fge.jsonschema.util.JsonLoader;\r
43 import org.apache.catalina.LifecycleException;\r
44 import org.json.JSONArray;\r
45 import org.json.JSONException;\r
46 import org.json.JSONObject;\r
47 import org.onap.dcae.restapi.RestfulCollectorServlet;\r
48 import org.slf4j.Logger;\r
49 import org.slf4j.LoggerFactory;\r
50 \r
51 import java.io.IOException;\r
52 import java.net.URL;\r
53 import java.nio.charset.Charset;\r
54 import java.nio.file.Files;\r
55 import java.nio.file.Paths;\r
56 import java.util.Iterator;\r
57 import java.util.LinkedList;\r
58 import java.util.List;\r
59 import java.util.Map;\r
60 import java.util.Queue;\r
61 import java.util.concurrent.ExecutorService;\r
62 import java.util.concurrent.Executors;\r
63 import java.util.concurrent.LinkedBlockingQueue;\r
64 import javax.servlet.ServletException;\r
65 \r
66 public class CommonStartup extends NsaBaseEndpoint implements Runnable {\r
67 \r
68         public static final String KCONFIG = "c";\r
69 \r
70         public static final String KSETTING_PORT = "collector.service.port";\r
71         public static final int KDEFAULT_PORT = 8080;\r
72 \r
73         public static final String KSETTING_SECUREPORT = "collector.service.secure.port";\r
74         public static final int KDEFAULT_SECUREPORT = -1;\r
75 \r
76         public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";\r
77         public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";\r
78         public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";\r
79         public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";\r
80         public static final String KSETTING_KEYALIAS = "collector.keystore.alias";\r
81         public static final String KDEFAULT_KEYALIAS = "tomcat";\r
82 \r
83         public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";\r
84         protected static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };\r
85 \r
86         public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending";\r
87         public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;\r
88 \r
89         public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";\r
90         public static final int KDEFAULT_SCHEMAVALIDATOR = -1;\r
91 \r
92         public static final String KSETTING_SCHEMAFILE = "collector.schema.file";\r
93         public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";\r
94         public static final String KSETTING_EXCEPTIONCONFIG = "exceptionConfig";\r
95 \r
96         public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";\r
97 \r
98         public static final String KSETTING_AUTHFLAG = "header.authflag";\r
99         public static final int KDEFAULT_AUTHFLAG = 0;\r
100 \r
101         public static final String KSETTING_AUTHLIST = "header.authlist";\r
102 \r
103         public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";\r
104         public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;\r
105 \r
106         public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");\r
107         public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");\r
108         public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");\r
109         public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");\r
110 \r
111         public static int schemaValidatorflag = -1;\r
112         public static int authflag = 1;\r
113         public static int eventTransformFlag = 1;\r
114         public static String schemaFile;\r
115         public static JSONObject schemaFileJson;\r
116         public static String exceptionConfig;\r
117         public static String cambriaConfigFile;\r
118         private boolean listnerstatus;\r
119         public static String streamid;\r
120 \r
121         public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;\r
122         private static ApiServer fTomcatServer = null;\r
123         private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);\r
124 \r
125         private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting,\r
126                         rrNvReadable.invalidSettingValue, ServletException, InterruptedException {\r
127                 final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();\r
128 \r
129                 if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {\r
130                         // http service\r
131                         connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)\r
132                                         .build());\r
133                 }\r
134 \r
135                 // optional https service\r
136                 final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);\r
137                 final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);\r
138                 final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);\r
139                 final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);\r
140 \r
141                 if (securePort > 0) {\r
142                         final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, Charset.defaultCharset());\r
143                         connectors.add(new ApiServerConnector.Builder(securePort).secure(true)\r
144                                         .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build());\r
145 \r
146                 }\r
147 \r
148                 // Reading other config properties\r
149 \r
150                 schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);\r
151                 if (schemaValidatorflag > 0) {\r
152                         schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);\r
153                         // System.out.println("SchemaFile:" + schemaFile);\r
154                         schemaFileJson = new JSONObject(schemaFile);\r
155 \r
156                 }\r
157                 exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null);\r
158                 authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);\r
159                 String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS,\r
160                                 CommonStartup.KDEFAULT_DMAAPCONFIGS);\r
161                 cambriaConfigFile = currentconffile[0];\r
162                 streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);\r
163                 eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);\r
164 \r
165                 fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)\r
166                                 .name("collector").build();\r
167 \r
168                 // Load override exception map\r
169                 CustomExceptionLoader.LoadMap();\r
170                 setListnerstatus(true);\r
171         }\r
172 \r
173         public static void main(String[] args) {\r
174                 ExecutorService executor = null;\r
175                 try {\r
176                         // process command line arguments\r
177                         final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);\r
178                         final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");\r
179                         final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);\r
180 \r
181                         final nvReadableStack settings = new nvReadableStack();\r
182                         settings.push(new nvPropertiesFile(settingStream));\r
183                         settings.push(new nvReadableTable(argMap));\r
184 \r
185                         fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);\r
186 \r
187                         VESLogger.setUpEcompLogging();\r
188 \r
189                         CommonStartup cs = new CommonStartup(settings);\r
190 \r
191                         Thread csmain = new Thread(cs);\r
192                         csmain.start();\r
193 \r
194                         EventProcessor ep = new EventProcessor();\r
195                         executor = Executors.newFixedThreadPool(20);\r
196                         //executor.execute(ep);\r
197                         for (int i = 0; i < 20; ++i) {\r
198                                 executor.execute(ep);\r
199                                 }\r
200 \r
201                 } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException\r
202                                 | InterruptedException e) {\r
203                         CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());\r
204                         throw new RuntimeException(e);\r
205                 } catch (Exception e) {\r
206                         System.err.println("Uncaught exception - " + e.getMessage());\r
207                         CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage());\r
208                         e.printStackTrace(System.err);\r
209                 } finally {\r
210                         // This will make the executor accept no new threads\r
211                         // and finish all existing threads in the queue\r
212                         /*if (executor != null) {\r
213                                 executor.shutdown();\r
214                         }*/\r
215 \r
216                 }\r
217         }\r
218 \r
219         public void run() {\r
220                 try {\r
221                         fTomcatServer.start();\r
222                 } catch (LifecycleException | IOException e) {\r
223 \r
224                         log.error("lifecycle or IO: ", e);\r
225                 }\r
226                 fTomcatServer.await();\r
227         }\r
228 \r
229         public boolean isListnerstatus() {\r
230                 return listnerstatus;\r
231         }\r
232 \r
233         public void setListnerstatus(boolean listnerstatus) {\r
234                 this.listnerstatus = listnerstatus;\r
235         }\r
236 \r
237         public static Queue<JSONObject> getProcessingInputQueue() {\r
238                 return fProcessingInputQueue;\r
239         }\r
240 \r
241         public static class QueueFullException extends Exception {\r
242 \r
243                 private static final long serialVersionUID = 1L;\r
244         }\r
245 \r
246         public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException {\r
247                 final Queue<JSONObject> queue = getProcessingInputQueue();\r
248                 try {\r
249 \r
250                         CommonStartup.metriclog.info("EVENT_PUBLISH_START");\r
251                         for (int i = 0; i < a.length(); i++) {\r
252                                 if (!queue.offer(a.getJSONObject(i))) {\r
253                                         throw new QueueFullException();\r
254                                 }\r
255 \r
256                         }\r
257                         log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");\r
258                         CommonStartup.metriclog.info("EVENT_PUBLISH_END");\r
259                         // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);\r
260 \r
261                 } catch (JSONException e) {\r
262                         throw e;\r
263 \r
264                 }\r
265         }\r
266 \r
267         static String readFile(String path, Charset encoding) throws IOException {\r
268                 byte[] encoded = Files.readAllBytes(Paths.get(path));\r
269                 String pwd = new String(encoded);\r
270                 return pwd.substring(0, pwd.length() - 1);\r
271         }\r
272 \r
273         public static String schemavalidate(String jsonData, String jsonSchema) {\r
274                 ProcessingReport report;\r
275                 String result = "false";\r
276 \r
277                 try {\r
278                         // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to\r
279                         // data: #<#<"+jsonData+">#>#");\r
280                         log.trace("Schema validation for event:" + jsonData);\r
281                         JsonNode schemaNode = JsonLoader.fromString(jsonSchema);\r
282                         JsonNode data = JsonLoader.fromString(jsonData);\r
283                         JsonSchemaFactory factory = JsonSchemaFactory.byDefault();\r
284                         JsonSchema schema = factory.getJsonSchema(schemaNode);\r
285                         report = schema.validate(data);\r
286                 } catch (JsonParseException e) {\r
287                         log.error("schemavalidate:JsonParseException for event:" + jsonData);\r
288                         return e.getMessage();\r
289                 } catch (ProcessingException e) {\r
290                         log.error("schemavalidate:Processing exception for event:" + jsonData);\r
291                         return e.getMessage();\r
292                 } catch (IOException e) {\r
293                         log.error(\r
294                                         "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);\r
295                         return e.getMessage();\r
296                 }\r
297                 if (report != null) {\r
298                         Iterator<ProcessingMessage> iter = report.iterator();\r
299                         while (iter.hasNext()) {\r
300                                 ProcessingMessage pm = iter.next();\r
301                                 log.trace("Processing Message: " + pm.getMessage());\r
302                         }\r
303                         result = String.valueOf(report.isSuccess());\r
304                 }\r
305                 try {\r
306                         log.debug("Validation Result:" + result + " Validation report:" + report);\r
307                 } catch (NullPointerException e) {\r
308                         log.error("schemavalidate:NullpointerException on report");\r
309                 }\r
310                 return result;\r
311         }\r
312 \r
313 \r
314 }\r