2 * ============LICENSE_START=======================================================
\r
4 * ================================================================================
\r
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
\r
6 * ================================================================================
\r
7 * Licensed under the Apache License, Version 2.0 (the "License");
\r
8 * you may not use this file except in compliance with the License.
\r
9 * You may obtain a copy of the License at
\r
11 * http://www.apache.org/licenses/LICENSE-2.0
\r
13 * Unless required by applicable law or agreed to in writing, software
\r
14 * distributed under the License is distributed on an "AS IS" BASIS,
\r
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
\r
16 * See the License for the specific language governing permissions and
\r
17 * limitations under the License.
\r
18 * ============LICENSE_END=========================================================
\r
21 package org.onap.dcae.commonFunction;
\r
23 import com.att.nsa.apiServer.ApiServer;
\r
24 import com.att.nsa.apiServer.ApiServerConnector;
\r
25 import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
\r
26 import com.att.nsa.cmdLine.NsaCommandLineUtil;
\r
27 import com.att.nsa.drumlin.service.framework.DrumlinServlet;
\r
28 import com.att.nsa.drumlin.till.nv.impl.nvPropertiesFile;
\r
29 import com.att.nsa.drumlin.till.nv.impl.nvReadableStack;
\r
30 import com.att.nsa.drumlin.till.nv.impl.nvReadableTable;
\r
31 import com.att.nsa.drumlin.till.nv.rrNvReadable;
\r
32 import com.att.nsa.drumlin.till.nv.rrNvReadable.invalidSettingValue;
\r
33 import com.att.nsa.drumlin.till.nv.rrNvReadable.loadException;
\r
34 import com.att.nsa.drumlin.till.nv.rrNvReadable.missingReqdSetting;
\r
35 import com.fasterxml.jackson.core.JsonParseException;
\r
36 import com.fasterxml.jackson.databind.JsonNode;
\r
37 import com.github.fge.jsonschema.exceptions.ProcessingException;
\r
38 import com.github.fge.jsonschema.main.JsonSchema;
\r
39 import com.github.fge.jsonschema.main.JsonSchemaFactory;
\r
40 import com.github.fge.jsonschema.report.ProcessingMessage;
\r
41 import com.github.fge.jsonschema.report.ProcessingReport;
\r
42 import com.github.fge.jsonschema.util.JsonLoader;
\r
43 import org.apache.catalina.LifecycleException;
\r
44 import org.json.JSONArray;
\r
45 import org.json.JSONException;
\r
46 import org.json.JSONObject;
\r
47 import org.onap.dcae.restapi.RestfulCollectorServlet;
\r
48 import org.slf4j.Logger;
\r
49 import org.slf4j.LoggerFactory;
\r
51 import java.io.IOException;
\r
52 import java.net.URL;
\r
53 import java.nio.charset.Charset;
\r
54 import java.nio.file.Files;
\r
55 import java.nio.file.Paths;
\r
56 import java.util.Iterator;
\r
57 import java.util.LinkedList;
\r
58 import java.util.List;
\r
59 import java.util.Map;
\r
60 import java.util.Queue;
\r
61 import java.util.concurrent.ExecutorService;
\r
62 import java.util.concurrent.Executors;
\r
63 import java.util.concurrent.LinkedBlockingQueue;
\r
64 import javax.servlet.ServletException;
\r
66 public class CommonStartup extends NsaBaseEndpoint implements Runnable {
\r
68 public static final String KCONFIG = "c";
\r
70 public static final String KSETTING_PORT = "collector.service.port";
\r
71 public static final int KDEFAULT_PORT = 8080;
\r
73 public static final String KSETTING_SECUREPORT = "collector.service.secure.port";
\r
74 public static final int KDEFAULT_SECUREPORT = -1;
\r
76 public static final String KSETTING_KEYSTOREPASSFILE = "collector.keystore.passwordfile";
\r
77 public static final String KDEFAULT_KEYSTOREPASSFILE = "../etc/passwordfile";
\r
78 public static final String KSETTING_KEYSTOREFILE = "collector.keystore.file.location";
\r
79 public static final String KDEFAULT_KEYSTOREFILE = "../etc/keystore";
\r
80 public static final String KSETTING_KEYALIAS = "collector.keystore.alias";
\r
81 public static final String KDEFAULT_KEYALIAS = "tomcat";
\r
83 public static final String KSETTING_DMAAPCONFIGS = "collector.dmaapfile";
\r
84 protected static final String[] KDEFAULT_DMAAPCONFIGS = new String[] { "/etc/DmaapConfig.json" };
\r
86 public static final String KSETTING_MAXQUEUEDEVENTS = "collector.inputQueue.maxPending";
\r
87 public static final int KDEFAULT_MAXQUEUEDEVENTS = 1024 * 4;
\r
89 public static final String KSETTING_SCHEMAVALIDATOR = "collector.schema.checkflag";
\r
90 public static final int KDEFAULT_SCHEMAVALIDATOR = -1;
\r
92 public static final String KSETTING_SCHEMAFILE = "collector.schema.file";
\r
93 public static final String KDEFAULT_SCHEMAFILE = "{\"v5\":\"./etc/CommonEventFormat_28.3.json\"}";
\r
94 public static final String KSETTING_EXCEPTIONCONFIG = "exceptionConfig";
\r
96 public static final String KSETTING_DMAAPSTREAMID = "collector.dmaap.streamid";
\r
98 public static final String KSETTING_AUTHFLAG = "header.authflag";
\r
99 public static final int KDEFAULT_AUTHFLAG = 0;
\r
101 public static final String KSETTING_AUTHLIST = "header.authlist";
\r
103 public static final String KSETTING_EVENTTRANSFORMFLAG = "event.transform.flag";
\r
104 public static final int KDEFAULT_EVENTTRANSFORMFLAG = 1;
\r
106 public static final Logger inlog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.input");
\r
107 public static final Logger oplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.output");
\r
108 public static final Logger eplog = LoggerFactory.getLogger("org.onap.dcae.commonFunction.error");
\r
109 public static final Logger metriclog = LoggerFactory.getLogger("com.att.ecomp.metrics");
\r
111 public static int schemaValidatorflag = -1;
\r
112 public static int authflag = 1;
\r
113 public static int eventTransformFlag = 1;
\r
114 public static String schemaFile;
\r
115 public static JSONObject schemaFileJson;
\r
116 public static String exceptionConfig;
\r
117 public static String cambriaConfigFile;
\r
118 private boolean listnerstatus;
\r
119 public static String streamid;
\r
121 public static LinkedBlockingQueue<JSONObject> fProcessingInputQueue;
\r
122 private static ApiServer fTomcatServer = null;
\r
123 private static final Logger log = LoggerFactory.getLogger(CommonStartup.class);
\r
125 private CommonStartup(rrNvReadable settings) throws loadException, IOException, rrNvReadable.missingReqdSetting,
\r
126 rrNvReadable.invalidSettingValue, ServletException, InterruptedException {
\r
127 final List<ApiServerConnector> connectors = new LinkedList<ApiServerConnector>();
\r
129 if (settings.getInt(KSETTING_PORT, KDEFAULT_PORT) > 0) {
\r
131 connectors.add(new ApiServerConnector.Builder(settings.getInt(KSETTING_PORT, KDEFAULT_PORT)).secure(false)
\r
135 // optional https service
\r
136 final int securePort = settings.getInt(KSETTING_SECUREPORT, KDEFAULT_SECUREPORT);
\r
137 final String keystoreFile = settings.getString(KSETTING_KEYSTOREFILE, KDEFAULT_KEYSTOREFILE);
\r
138 final String keystorePasswordFile = settings.getString(KSETTING_KEYSTOREPASSFILE, KDEFAULT_KEYSTOREPASSFILE);
\r
139 final String keyAlias = settings.getString(KSETTING_KEYALIAS, KDEFAULT_KEYALIAS);
\r
141 if (securePort > 0) {
\r
142 final String KSETTING_KEYSTOREPASS = readFile(keystorePasswordFile, Charset.defaultCharset());
\r
143 connectors.add(new ApiServerConnector.Builder(securePort).secure(true)
\r
144 .keystorePassword(KSETTING_KEYSTOREPASS).keystoreFile(keystoreFile).keyAlias(keyAlias).build());
\r
148 // Reading other config properties
\r
150 schemaValidatorflag = settings.getInt(KSETTING_SCHEMAVALIDATOR, KDEFAULT_SCHEMAVALIDATOR);
\r
151 if (schemaValidatorflag > 0) {
\r
152 schemaFile = settings.getString(KSETTING_SCHEMAFILE, KDEFAULT_SCHEMAFILE);
\r
153 // System.out.println("SchemaFile:" + schemaFile);
\r
154 schemaFileJson = new JSONObject(schemaFile);
\r
157 exceptionConfig = settings.getString(KSETTING_EXCEPTIONCONFIG, null);
\r
158 authflag = settings.getInt(CommonStartup.KSETTING_AUTHFLAG, CommonStartup.KDEFAULT_AUTHFLAG);
\r
159 String[] currentconffile = settings.getStrings(CommonStartup.KSETTING_DMAAPCONFIGS,
\r
160 CommonStartup.KDEFAULT_DMAAPCONFIGS);
\r
161 cambriaConfigFile = currentconffile[0];
\r
162 streamid = settings.getString(KSETTING_DMAAPSTREAMID, null);
\r
163 eventTransformFlag = settings.getInt(KSETTING_EVENTTRANSFORMFLAG, KDEFAULT_EVENTTRANSFORMFLAG);
\r
165 fTomcatServer = new ApiServer.Builder(connectors, new RestfulCollectorServlet(settings)).encodeSlashes(true)
\r
166 .name("collector").build();
\r
168 // Load override exception map
\r
169 CustomExceptionLoader.LoadMap();
\r
170 setListnerstatus(true);
\r
173 public static void main(String[] args) {
\r
174 ExecutorService executor = null;
\r
176 // process command line arguments
\r
177 final Map<String, String> argMap = NsaCommandLineUtil.processCmdLine(args, true);
\r
178 final String config = NsaCommandLineUtil.getSetting(argMap, KCONFIG, "collector.properties");
\r
179 final URL settingStream = DrumlinServlet.findStream(config, CommonStartup.class);
\r
181 final nvReadableStack settings = new nvReadableStack();
\r
182 settings.push(new nvPropertiesFile(settingStream));
\r
183 settings.push(new nvReadableTable(argMap));
\r
185 fProcessingInputQueue = new LinkedBlockingQueue<JSONObject>(CommonStartup.KDEFAULT_MAXQUEUEDEVENTS);
\r
187 VESLogger.setUpEcompLogging();
\r
189 CommonStartup cs = new CommonStartup(settings);
\r
191 Thread csmain = new Thread(cs);
\r
194 EventProcessor ep = new EventProcessor();
\r
195 executor = Executors.newFixedThreadPool(20);
\r
196 //executor.execute(ep);
\r
197 for (int i = 0; i < 20; ++i) {
\r
198 executor.execute(ep);
\r
201 } catch (loadException | missingReqdSetting | IOException | invalidSettingValue | ServletException
\r
202 | InterruptedException e) {
\r
203 CommonStartup.eplog.error("FATAL_STARTUP_ERROR" + e.getMessage());
\r
204 throw new RuntimeException(e);
\r
205 } catch (Exception e) {
\r
206 System.err.println("Uncaught exception - " + e.getMessage());
\r
207 CommonStartup.eplog.error("FATAL_ERROR" + e.getMessage());
\r
208 e.printStackTrace(System.err);
\r
210 // This will make the executor accept no new threads
\r
211 // and finish all existing threads in the queue
\r
212 /*if (executor != null) {
\r
213 executor.shutdown();
\r
219 public void run() {
\r
221 fTomcatServer.start();
\r
222 } catch (LifecycleException | IOException e) {
\r
224 log.error("lifecycle or IO: ", e);
\r
226 fTomcatServer.await();
\r
229 public boolean isListnerstatus() {
\r
230 return listnerstatus;
\r
233 public void setListnerstatus(boolean listnerstatus) {
\r
234 this.listnerstatus = listnerstatus;
\r
237 public static Queue<JSONObject> getProcessingInputQueue() {
\r
238 return fProcessingInputQueue;
\r
241 public static class QueueFullException extends Exception {
\r
243 private static final long serialVersionUID = 1L;
\r
246 public static void handleEvents(JSONArray a) throws QueueFullException, JSONException, IOException {
\r
247 final Queue<JSONObject> queue = getProcessingInputQueue();
\r
250 CommonStartup.metriclog.info("EVENT_PUBLISH_START");
\r
251 for (int i = 0; i < a.length(); i++) {
\r
252 if (!queue.offer(a.getJSONObject(i))) {
\r
253 throw new QueueFullException();
\r
257 log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
\r
258 CommonStartup.metriclog.info("EVENT_PUBLISH_END");
\r
259 // ecomplogger.debug(secloggerMessageEnum.SEC_COLLECT_AND_PULIBISH_SUCCESS);
\r
261 } catch (JSONException e) {
\r
267 static String readFile(String path, Charset encoding) throws IOException {
\r
268 byte[] encoded = Files.readAllBytes(Paths.get(path));
\r
269 String pwd = new String(encoded);
\r
270 return pwd.substring(0, pwd.length() - 1);
\r
273 public static String schemavalidate(String jsonData, String jsonSchema) {
\r
274 ProcessingReport report;
\r
275 String result = "false";
\r
278 // System.out.println("Applying schema: @<@<"+jsonSchema+">@>@ to
\r
279 // data: #<#<"+jsonData+">#>#");
\r
280 log.trace("Schema validation for event:" + jsonData);
\r
281 JsonNode schemaNode = JsonLoader.fromString(jsonSchema);
\r
282 JsonNode data = JsonLoader.fromString(jsonData);
\r
283 JsonSchemaFactory factory = JsonSchemaFactory.byDefault();
\r
284 JsonSchema schema = factory.getJsonSchema(schemaNode);
\r
285 report = schema.validate(data);
\r
286 } catch (JsonParseException e) {
\r
287 log.error("schemavalidate:JsonParseException for event:" + jsonData);
\r
288 return e.getMessage();
\r
289 } catch (ProcessingException e) {
\r
290 log.error("schemavalidate:Processing exception for event:" + jsonData);
\r
291 return e.getMessage();
\r
292 } catch (IOException e) {
\r
294 "schemavalidate:IO exception; something went wrong trying to read json data for event:" + jsonData);
\r
295 return e.getMessage();
\r
297 if (report != null) {
\r
298 Iterator<ProcessingMessage> iter = report.iterator();
\r
299 while (iter.hasNext()) {
\r
300 ProcessingMessage pm = iter.next();
\r
301 log.trace("Processing Message: " + pm.getMessage());
\r
303 result = String.valueOf(report.isSuccess());
\r
306 log.debug("Validation Result:" + result + " Validation report:" + report);
\r
307 } catch (NullPointerException e) {
\r
308 log.error("schemavalidate:NullpointerException on report");
\r