additional testcase 35/32035/1
authorVijay VK <vv770d@att.com>
Sun, 18 Feb 2018 06:17:21 +0000 (06:17 +0000)
committerVENKATESH KUMAR <vv770d@att.com>
Sun, 18 Feb 2018 06:18:13 +0000 (01:18 -0500)
Change-Id: Ib4fcdf3754c43f4d1996ea50b888d976e728705a
Signed-off-by: VENKATESH KUMAR <vv770d@att.com>
Issue-ID: DCAEGEN2-227

etc/DmaapConfig.json
etc/collector.properties
etc/log4j.xml
src/main/java/org/onap/dcae/restapi/endpoints/EventReceipt.java
src/test/java/org/onap/dcae/vestest/TestEventReceipt.java [new file with mode: 0644]

index fd38d09..f144b38 100644 (file)
@@ -1,20 +1,28 @@
-{
-       "channels": [
-               {
-                       "name": "ves_measurement",
-                       "cambria.topic": "unauthenticated.SEC_MEASUREMENT_OUTPUT",
-                       "class": "HpCambriaOutputStream",
-                       "stripHpId": "true",
-                       "type": "out",
-                       "cambria.hosts": "onap.dmaap.org"
-               },
-               {
-                       "name": "ves_fault",
-                       "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT",
-                       "class": "HpCambriaOutputStream",
-                       "stripHpId": "true",
-                       "type": "out",
-                       "cambria.hosts": "onap.dmaap.org"
-               }
-       ]
-}
+{\r
+       "channels": [\r
+               {\r
+                       "name": "ves_measurement",\r
+                       "cambria.topic": "unauthenticated.SEC_MEASUREMENT_OUTPUT",\r
+                       "class": "HpCambriaOutputStream",\r
+                       "stripHpId": "true",\r
+                       "type": "out",\r
+                       "cambria.hosts": "onap.dmaap.org"\r
+               },\r
+               {\r
+                       "name": "ves_fault",\r
+                       "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT",\r
+                       "class": "HpCambriaOutputStream",\r
+                       "stripHpId": "true",\r
+                       "type": "out",\r
+                       "cambria.hosts": "onap.dmaap.org"\r
+               },\r
+               {\r
+                       "name": "ves_heartbeat",\r
+                       "cambria.topic": "unauthenticated.SEC_FAULT_OUTPUT",\r
+                       "class": "HpCambriaOutputStream",\r
+                       "stripHpId": "true",\r
+                       "type": "out",\r
+                       "cambria.hosts": "onap.dmaap.org"\r
+               }\r
+       ]\r
+}\r
index 26c1293..251cb02 100755 (executable)
@@ -1,74 +1,74 @@
-###############################################################################
-##
-## Collector Server config
-##
-##     - Default values are shown as commented settings.
-##
-###############################################################################
-##
-## HTTP(S) service
-##
-##     Normally:
-##
-##             - 8080 is http service
-##             - https is disabled by default (-1)
-##
-##             - At this time, the server always binds to 0.0.0.0
-##
-## The default port when header.authflag is disabled (0)
-collector.service.port=8080
-
-## The secure port is required if header.authflag is set to 1 (true)
-## Authentication is only supported via secure port
-## When enabled - require valid keystore defined
-collector.service.secure.port=8443
-
-## The keystore must be setup per installation when secure port is configured
-collector.keystore.file.location=../etc/keystore
-collector.keystore.passwordfile=./etc/passwordfile
-collector.keystore.alias=tomcat
-
-
-###############################################################################
-## Processing
-##
-## If there's a problem that prevents the collector from processing alarms,
-## it's normally better to apply back pressure to the caller than to try to
-## buffer beyond a reasonable size limit. With a limit, the server won't crash
-## due to being out of memory, and the caller will get a 5xx reply saying the
-## server is in trouble.
-collector.inputQueue.maxPending=8096
-
-## Schema Validation checkflag
-## default no validation checkflag (-1)
-## If enabled (1) - schemafile location must be specified
-collector.schema.checkflag=1
-collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\"}
-
-## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile  
-collector.dmaap.streamid=fault=ves_fault,ves_fault_secondary|syslog=ves_syslog,ves_syslog_secondary|heartbeat=ves_heartbeat,ves_heartbeat_secondary|measurementsForVfScaling=ves_measurement,ves_measurement_secondary|mobileFlow=ves_mobileflow,ves_mobileflow_secondary|other=ves_other,ves_other_secondary|stateChange=ves_statechange,ves_statechange_secondary|thresholdCrossingAlert=ves_thresholdCrossingAlert,ves_thresholdCrossingAlert_secondary|voiceQuality=ves_voicequality,ves_voicequality_secondary|sipSignaling=ves_sipsignaling,ves_sipsignaling_secondary
-collector.dmaapfile=./etc/DmaapConfig.json
-
-## Custom ExceptionConfiguration
-exceptionConfig=./etc/ExceptionConfig.json
-
-## authflag control authentication by the collector
-## If enabled (1) - then authlist has to be defined
-## When authflag is enabled, only secure port will be supported
-## To disable enter 0
-header.authflag=1
-## Combination of userid,base64 encoded pwd list to be supported
-## userid and pwd comma separated; pipe delimitation between each pair
-header.authlist=sample1,c2FtcGxlMQ==
-
-## Event transformation Flag - when set expects configurable transformation
-## defined under ./etc/eventTransform.json
-## Enabled by default; to disable set to 0
-event.transform.flag=1
-
-###############################################################################
-##
-## Tomcat control
-##
-#tomcat.maxthreads=(tomcat default, which is usually 200)
-
+###############################################################################\r
+##\r
+## Collector Server config\r
+##\r
+##     - Default values are shown as commented settings.\r
+##\r
+###############################################################################\r
+##\r
+## HTTP(S) service\r
+##\r
+##     Normally:\r
+##\r
+##             - 8080 is http service\r
+##             - https is disabled by default (-1)\r
+##\r
+##             - At this time, the server always binds to 0.0.0.0\r
+##\r
+## The default port when header.authflag is disabled (0)\r
+collector.service.port=8080\r
+\r
+## The secure port is required if header.authflag is set to 1 (true)\r
+## Authentication is only supported via secure port\r
+## When enabled - require valid keystore defined\r
+#ccollector.service.secure.port=8443\r
+\r
+## The keystore must be setup per installation when secure port is configured\r
+collector.keystore.file.location=../etc/keystore\r
+collector.keystore.passwordfile=./etc/passwordfile\r
+collector.keystore.alias=tomcat\r
+\r
+\r
+###############################################################################\r
+## Processing\r
+##\r
+## If there's a problem that prevents the collector from processing alarms,\r
+## it's normally better to apply back pressure to the caller than to try to\r
+## buffer beyond a reasonable size limit. With a limit, the server won't crash\r
+## due to being out of memory, and the caller will get a 5xx reply saying the\r
+## server is in trouble.\r
+collector.inputQueue.maxPending=8096\r
+\r
+## Schema Validation checkflag\r
+## default no validation checkflag (-1)\r
+## If enabled (1) - schemafile location must be specified\r
+collector.schema.checkflag=1\r
+collector.schema.file={\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\"}\r
+\r
+## List all streamid per domain to be supported. The streamid should match to channel name on dmaapfile  \r
+collector.dmaap.streamid=fault=ves_fault,ves_fault_secondary|syslog=ves_syslog,ves_syslog_secondary|heartbeat=ves_heartbeat,ves_heartbeat_secondary|measurementsForVfScaling=ves_measurement,ves_measurement_secondary|mobileFlow=ves_mobileflow,ves_mobileflow_secondary|other=ves_other,ves_other_secondary|stateChange=ves_statechange,ves_statechange_secondary|thresholdCrossingAlert=ves_thresholdCrossingAlert,ves_thresholdCrossingAlert_secondary|voiceQuality=ves_voicequality,ves_voicequality_secondary|sipSignaling=ves_sipsignaling,ves_sipsignaling_secondary\r
+collector.dmaapfile=./etc/DmaapConfig.json\r
+\r
+## Custom ExceptionConfiguration\r
+exceptionConfig=./etc/ExceptionConfig.json\r
+\r
+## authflag control authentication by the collector\r
+## If enabled (1) - then authlist has to be defined\r
+## When authflag is enabled, only secure port will be supported\r
+## To disable enter 0\r
+header.authflag=0\r
+## Combination of userid,base64 encoded pwd list to be supported\r
+## userid and pwd comma separated; pipe delimitation between each pair\r
+header.authlist=sample1,c2FtcGxlMQ==\r
+\r
+## Event transformation Flag - when set expects configurable transformation\r
+## defined under ./etc/eventTransform.json\r
+## Enabled by default; to disable set to 0\r
+event.transform.flag=1\r
+\r
+###############################################################################\r
+##\r
+## Tomcat control\r
+##\r
+#tomcat.maxthreads=(tomcat default, which is usually 200)\r
+\r
index dc5fe10..baf0355 100644 (file)
@@ -1,3 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
 <!--
 ================================================================================
 Copyright (c) 2017-2018 AT&T Intellectual Property. All rights reserved.
@@ -17,8 +20,7 @@ limitations under the License.
 
 ECOMP is a trademark and service mark of AT&T Intellectual Property.
 -->
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
 
 <log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
 
index 8d2ecaa..f45f60c 100644 (file)
-/*-
- * ============LICENSE_START=======================================================
- * PROJECT
- * ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
- * ================================================================================
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ============LICENSE_END=========================================================
- */
-
-package org.onap.dcae.restapi.endpoints;
-
-import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;
-import com.att.nsa.clock.SaClock;
-import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;
-import com.att.nsa.drumlin.service.standards.HttpStatusCodes;
-import com.att.nsa.drumlin.service.standards.MimeTypes;
-import com.att.nsa.logging.LoggingContext;
-import com.att.nsa.logging.log4j.EcompFields;
-import com.att.nsa.security.db.simple.NsaSimpleApiKey;
-import com.google.gson.JsonParser;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
-import org.onap.dcae.commonFunction.CommonStartup;
-import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;
-import org.onap.dcae.commonFunction.CustomExceptionLoader;
-import org.onap.dcae.commonFunction.VESLogger;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class EventReceipt extends NsaBaseEndpoint {
-
-       private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);
-       private static final String MESSAGE = " Message:";
-       static String valresult;
-       static JSONObject customerror;
-
-       public static void receiveVESEvent(DrumlinRequestContext ctx) {
-               // the request body carries events. assume for now it's an array
-               // of json objects that fits in memory. (See cambria's parsing for
-               // handling large messages)
-
-               NsaSimpleApiKey retkey = null;
-
-               JSONArray jsonArray;
-               JSONArray jsonArrayMod = new JSONArray();
-               JSONObject event;
-               JSONObject jsonObject;
-               FileReader fr = null;
-               InputStream istr = null;
-               int arrayFlag = 0;
-               String vesVersion = null;
-
-               try {
-                       // System.out.print("Version string:" + version);
-
-                       // String br = new BufferedReader(new
-                       // InputStreamReader(ctx.request().getBodyStream())).readLine();
-                       // JsonElement msg = new JsonParser().parse(new BufferedReader(new
-                       // InputStreamReader(ctx.request().getBodyStream())).readLine());
-                       // jsonArray = new JSONArray ( new JSONTokener (
-                       // ctx.request().getBodyStream () ) );
-
-                       log.debug("Request recieved :" + ctx.request().getRemoteAddress());
-                       istr = ctx.request().getBodyStream();
-                       jsonObject = new JSONObject(new JSONTokener(istr));
-
-                       log.info("ctx getPathInContext: " + ctx.request().getPathInContext());
-                       Pattern p = Pattern.compile("(v\\d+)");
-                       Matcher m = p.matcher(ctx.request().getPathInContext());
-
-                       if (m.find()) {
-                               log.info("VES version:" + m.group());
-                               vesVersion = m.group();
-                               m = null;
-                               p = null;
-
-                       }
-                       
-                       final UUID uuid = UUID.randomUUID();
-                       LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
-                       localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
-                       
-                       if (ctx.request().getPathInContext().contains("eventBatch")) {
-                               CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid
-                                               + " VES Batch Input Messsage: " + jsonObject);
-                               log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "
-                                               + jsonObject);
-                               arrayFlag = 1;
-                       } else {
-                               CommonStartup.inlog.info(
-                                               ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
-                               log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);
-
-                       }
-
-                       try {
-                               if (CommonStartup.authflag == 1) {
-                                       retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);
-                               }
-                       } catch (NullPointerException x) {
-                               log.info("Invalid user request " + ctx.request().getContentType() + MESSAGE + jsonObject);
-                               CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x);
-                               respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");
-                               return;
-                       }
-
-                       if (retkey != null || CommonStartup.authflag == 0) {
-                               if (CommonStartup.schemaValidatorflag > 0) {
-                                       if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))
-                                                       || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {
-                                               fr = new FileReader(schemaFileVersion(vesVersion));
-                                               String schema = new JsonParser().parse(fr).toString();
-
-                                               valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);
-                                               if (valresult.equals("true")) {
-                                                       log.info("Validation successful");
-                                               } else if (valresult.equals("false")) {
-                                                       log.info("Validation failed");
-                                                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
-                                                                       "Schema validation failed");
-                                                       return;
-                                               } else {
-                                                       log.error("Validation errored" + valresult);
-                                                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
-                                                                       "Couldn't parse JSON object");
-                                                       return;
-                                               }
-                                       } else {
-                                               log.info("Validation failed");
-                                               respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");
-                                               return;
-                                       }
-                                       if (arrayFlag == 1) {
-                                               jsonArray = jsonObject.getJSONArray("eventList");
-                                               log.info("Validation successful for all events in batch");
-                                               for (int i = 0; i < jsonArray.length(); i++) {
-                                                       event = new JSONObject().put("event", jsonArray.getJSONObject(i));
-                                                       event.put("VESuniqueId", uuid + "-" + i);
-                                                       event.put("VESversion", vesVersion);
-                                                       jsonArrayMod.put(event);
-                                               }
-                                               log.info("Modified jsonarray:" + jsonArrayMod.toString());
-                                       } else {
-                                               jsonObject.put("VESuniqueId", uuid);
-                                               jsonObject.put("VESversion", vesVersion);
-                                               jsonArrayMod = new JSONArray().put(jsonObject);
-                                       }
-                               }
-
-                               // reject anything that's not JSON
-                               if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {
-                                       log.info(String.format("Rejecting request with content type %s Message:%s",
-                                                       ctx.request().getContentType(), jsonObject));
-                                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,
-                                                       "Incorrect message content-type; only accepts application/json messages");
-                                       return;
-                               }
-
-                               CommonStartup.handleEvents(jsonArrayMod);
-                       } else {
-                               log.info(String.format("Unauthorized request %s%s%s", ctx.request().getContentType(), MESSAGE,
-                                               jsonObject));
-                               respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user");
-                               return;
-                       }
-               } catch (JSONException | NullPointerException | IOException x) {
-                       log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",
-                                       HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));
-                       CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);
-                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");
-                       return;
-               } catch (QueueFullException e) {
-                       log.error("Collector internal queue full  :" + e.getMessage(), e);
-                       CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);
-                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");
-                       return;
-               } finally {
-                       if (fr != null) {
-                               safeClose(fr);
-                       }
-
-                       if (istr != null) {
-                               safeClose(istr);
-                       }
-               }
-               log.info("MessageAccepted and k200_ok to be sent");
-               ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);
-       }
-
-       public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) {
-               String[] str;
-               String exceptionType = "GeneralException";
-
-               str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg);
-               log.info("Post CustomExceptionLoader.LookupMap" + str);
-
-               if (str != null) {
-
-                       if (str[0].matches("SVC")) {
-                               exceptionType = "ServiceException";
-                       } else if (str[1].matches("POL")) {
-                               exceptionType = "PolicyException";
-                       }
-
-                       JSONObject jb = new JSONObject().put("requestError",
-                                       new JSONObject().put(exceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1])));
-
-                       log.debug("Constructed json error : " + jb);
-                       ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
-               } else {
-                       JSONObject jb = new JSONObject().put("requestError",
-                                       new JSONObject().put(exceptionType, new JSONObject().put("Status", sc).put("Error", msg)));
-                       ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);
-               }
-
-       }
-
-       public static void safeClose(FileReader fr) {
-               if (fr != null) {
-                       try {
-                               fr.close();
-                       } catch (IOException e) {
-                               log.error("Error closing file reader stream : " + e);
-                       }
-               }
-
-       }
-
-       public static void safeClose(InputStream is) {
-               if (is != null) {
-                       try {
-                               is.close();
-                       } catch (IOException e) {
-                               log.error("Error closing Input stream : " + e);
-                       }
-               }
-
-       }
-
-       public static String schemaFileVersion(String version) {
-               String filename = null;
-
-               if (CommonStartup.schemaFileJson.has(version)) {
-                       filename = CommonStartup.schemaFileJson.getString(version);
-               } else {
-                       filename = CommonStartup.schemaFileJson.getString("v5");
-
-               }
-               log.info(String.format("VESversion: %s Schema File:%s", version, filename));
-               return filename;
-
-       }
-
-}
-
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ * PROJECT\r
+ * ================================================================================\r
+ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+\r
+package org.onap.dcae.restapi.endpoints;\r
+\r
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;\r
+import com.att.nsa.clock.SaClock;\r
+import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;\r
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;\r
+import com.att.nsa.drumlin.service.standards.MimeTypes;\r
+import com.att.nsa.logging.LoggingContext;\r
+import com.att.nsa.logging.log4j.EcompFields;\r
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;\r
+import com.google.gson.JsonParser;\r
+import org.json.JSONArray;\r
+import org.json.JSONException;\r
+import org.json.JSONObject;\r
+import org.json.JSONTokener;\r
+import org.onap.dcae.commonFunction.CommonStartup;\r
+import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;\r
+import org.onap.dcae.commonFunction.CustomExceptionLoader;\r
+import org.onap.dcae.commonFunction.VESLogger;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+\r
+import java.io.FileReader;\r
+import java.io.IOException;\r
+import java.io.InputStream;\r
+import java.util.UUID;\r
+import java.util.regex.Matcher;\r
+import java.util.regex.Pattern;\r
+\r
+public class EventReceipt extends NsaBaseEndpoint {\r
+\r
+       private static final Logger log = LoggerFactory.getLogger(EventReceipt.class);\r
+       private static final String MESSAGE = " Message:";\r
+       static String valresult;\r
+       static JSONObject customerror;\r
+\r
+       public static void receiveVESEvent(DrumlinRequestContext ctx) {\r
+               // the request body carries events. assume for now it's an array\r
+               // of json objects that fits in memory. (See cambria's parsing for\r
+               // handling large messages)\r
+\r
+               NsaSimpleApiKey retkey = null;\r
+\r
+\r
+               JSONObject jsonObject;\r
+               FileReader fr = null;\r
+               InputStream istr = null;\r
+               int arrayFlag = 0;\r
+               String vesVersion = null;\r
+\r
+               try {\r
+\r
+\r
+                       log.debug("Request recieved :" + ctx.request().getRemoteAddress());\r
+                       istr = ctx.request().getBodyStream();\r
+                       jsonObject = new JSONObject(new JSONTokener(istr));\r
+\r
+                       log.info("ctx getPathInContext: " + ctx.request().getPathInContext());\r
+                       Pattern p = Pattern.compile("(v\\d+)");\r
+                       Matcher m = p.matcher(ctx.request().getPathInContext());\r
+\r
+                       if (m.find()) {\r
+                               log.info("VES version:" + m.group());\r
+                               vesVersion = m.group();\r
+                               m = null;\r
+                               p = null;\r
+\r
+                       }\r
+                       \r
+                       final UUID uuid = UUID.randomUUID();\r
+                       LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);\r
+                       localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());\r
+                       \r
+                       if (ctx.request().getPathInContext().contains("eventBatch")) {\r
+                               CommonStartup.inlog.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid\r
+                                               + " VES Batch Input Messsage: " + jsonObject);\r
+                               log.info(ctx.request().getRemoteAddress() + "VESUniqueID-Prefix:" + uuid + " VES Batch Input Messsage: "\r
+                                               + jsonObject);\r
+                               arrayFlag = 1;\r
+                       } else {\r
+                               CommonStartup.inlog.info(\r
+                                               ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);\r
+                               log.info(ctx.request().getRemoteAddress() + "VESUniqueID:" + uuid + " Input Messsage: " + jsonObject);\r
+\r
+                       }\r
+\r
+                       try {\r
+                               if (CommonStartup.authflag == 1) {\r
+                                       retkey = NsaBaseEndpoint.getAuthenticatedUser(ctx);\r
+                               }\r
+                       } catch (NullPointerException x) {\r
+                               log.info("Invalid user request " + ctx.request().getContentType() + MESSAGE + jsonObject);\r
+                               CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Unauthorized user" + x);\r
+                               respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Invalid user");\r
+                               return;\r
+                       }\r
+                       \r
+                       schemaCheck( retkey,  arrayFlag, jsonObject,  vesVersion,  ctx,  uuid);\r
+\r
+               } catch (JSONException | NullPointerException | IOException x) {\r
+                       log.error(String.format("Couldn't parse JSON Array - HttpStatusCodes.k400_badRequest%d%s%s",\r
+                                       HttpStatusCodes.k400_badRequest, MESSAGE, x.getMessage()));\r
+                       CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: Invalid user request " + x);\r
+                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Couldn't parse JSON object");\r
+                       return;\r
+               } catch (QueueFullException e) {\r
+                       log.error("Collector internal queue full  :" + e.getMessage(), e);\r
+                       CommonStartup.eplog.info("EVENT_RECEIPT_FAILURE: QueueFull" + e);\r
+                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k503_serviceUnavailable, "Queue full");\r
+                       return;\r
+               } finally {\r
+                       if (fr != null) {\r
+                               safeClose(fr);\r
+                       }\r
+\r
+                       if (istr != null) {\r
+                               safeClose(istr);\r
+                       }\r
+               }\r
+               log.info("MessageAccepted and k200_ok to be sent");\r
+               ctx.response().sendErrorAndBody(HttpStatusCodes.k200_ok, "Message Accepted", MimeTypes.kAppJson);\r
+       }\r
+       \r
+       public static void schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,JSONObject jsonObject, String vesVersion,  DrumlinRequestContext ctx, UUID uuid) throws JSONException, QueueFullException, IOException\r
+       {\r
+               JSONArray jsonArray;\r
+               JSONArray jsonArrayMod = new JSONArray();\r
+               JSONObject event;\r
+               FileReader fr;\r
+               if (retkey != null || CommonStartup.authflag == 0) {\r
+                       if (CommonStartup.schemaValidatorflag > 0) {\r
+                               if ((arrayFlag == 1) && (jsonObject.has("eventList") && (!jsonObject.has("event")))\r
+                                               || ((arrayFlag == 0) && (!jsonObject.has("eventList") && (jsonObject.has("event"))))) {\r
+                                       fr = new FileReader(schemaFileVersion(vesVersion));\r
+                                       String schema = new JsonParser().parse(fr).toString();\r
+\r
+                                       valresult = CommonStartup.schemavalidate(jsonObject.toString(), schema);\r
+                                       if (valresult.equals("true")) {\r
+                                               log.info("Validation successful");\r
+                                       } else if (valresult.equals("false")) {\r
+                                               log.info("Validation failed");\r
+                                               respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,\r
+                                                               "Schema validation failed");\r
+                                               return;\r
+                                       } else {\r
+                                               log.error("Validation errored" + valresult);\r
+                                               respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,\r
+                                                               "Couldn't parse JSON object");\r
+                                               return;\r
+                                       }\r
+                               } else {\r
+                                       log.info("Validation failed");\r
+                                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest, "Schema validation failed");\r
+                                       return;\r
+                               }\r
+                               if (arrayFlag == 1) {\r
+                                       jsonArray = jsonObject.getJSONArray("eventList");\r
+                                       log.info("Validation successful for all events in batch");\r
+                                       for (int i = 0; i < jsonArray.length(); i++) {\r
+                                               event = new JSONObject().put("event", jsonArray.getJSONObject(i));\r
+                                               event.put("VESuniqueId", uuid + "-" + i);\r
+                                               event.put("VESversion", vesVersion);\r
+                                               jsonArrayMod.put(event);\r
+                                       }\r
+                                       log.info("Modified jsonarray:" + jsonArrayMod.toString());\r
+                               } else {\r
+                                       jsonObject.put("VESuniqueId", uuid);\r
+                                       jsonObject.put("VESversion", vesVersion);\r
+                                       jsonArrayMod = new JSONArray().put(jsonObject);\r
+                               }\r
+                       }\r
+\r
+                       // reject anything that's not JSON\r
+                       if (!ctx.request().getContentType().equalsIgnoreCase("application/json")) {\r
+                               log.info(String.format("Rejecting request with content type %s Message:%s",\r
+                                               ctx.request().getContentType(), jsonObject));\r
+                               respondWithCustomMsginJson(ctx, HttpStatusCodes.k400_badRequest,\r
+                                               "Incorrect message content-type; only accepts application/json messages");\r
+                               return;\r
+                       }\r
+\r
+                       CommonStartup.handleEvents(jsonArrayMod);\r
+               } else {\r
+                       log.info(String.format("Unauthorized request %s%s%s", ctx.request().getContentType(), MESSAGE,\r
+                                       jsonObject));\r
+                       respondWithCustomMsginJson(ctx, HttpStatusCodes.k401_unauthorized, "Unauthorized user");\r
+                       return;\r
+               }\r
+       }\r
+\r
+       public static void respondWithCustomMsginJson(DrumlinRequestContext ctx, int sc, String msg) {\r
+               String[] str;\r
+               String exceptionType = "GeneralException";\r
+\r
+               str = CustomExceptionLoader.LookupMap(String.valueOf(sc), msg);\r
+               log.info("Post CustomExceptionLoader.LookupMap" + str);\r
+\r
+               if (str != null) {\r
+\r
+                       if (str[0].matches("SVC")) {\r
+                               exceptionType = "ServiceException";\r
+                       } else if (str[1].matches("POL")) {\r
+                               exceptionType = "PolicyException";\r
+                       }\r
+\r
+                       JSONObject jb = new JSONObject().put("requestError",\r
+                                       new JSONObject().put(exceptionType, new JSONObject().put("MessagID", str[0]).put("text", str[1])));\r
+\r
+                       log.debug("Constructed json error : " + jb);\r
+                       ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);\r
+               } else {\r
+                       JSONObject jb = new JSONObject().put("requestError",\r
+                                       new JSONObject().put(exceptionType, new JSONObject().put("Status", sc).put("Error", msg)));\r
+                       ctx.response().sendErrorAndBody(sc, jb.toString(), MimeTypes.kAppJson);\r
+               }\r
+\r
+       }\r
+\r
+       public static void safeClose(FileReader fr) {\r
+               if (fr != null) {\r
+                       try {\r
+                               fr.close();\r
+                       } catch (IOException e) {\r
+                               log.error("Error closing file reader stream : " + e);\r
+                       }\r
+               }\r
+\r
+       }\r
+\r
+       public static void safeClose(InputStream is) {\r
+               if (is != null) {\r
+                       try {\r
+                               is.close();\r
+                       } catch (IOException e) {\r
+                               log.error("Error closing Input stream : " + e);\r
+                       }\r
+               }\r
+\r
+       }\r
+\r
+       public static String schemaFileVersion(String version) {\r
+               String filename = null;\r
+\r
+               if (CommonStartup.schemaFileJson.has(version)) {\r
+                       filename = CommonStartup.schemaFileJson.getString(version);\r
+               } else {\r
+                       filename = CommonStartup.schemaFileJson.getString("v5");\r
+\r
+               }\r
+               log.info(String.format("VESversion: %s Schema File:%s", version, filename));\r
+               return filename;\r
+\r
+       }\r
+\r
+}\r
+\r
diff --git a/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java b/src/test/java/org/onap/dcae/vestest/TestEventReceipt.java
new file mode 100644 (file)
index 0000000..668c718
--- /dev/null
@@ -0,0 +1,123 @@
+/*-\r
+ * ============LICENSE_START=======================================================\r
+ * PROJECT\r
+ * ================================================================================\r
+ * Copyright (C) 2018 AT&T Intellectual Property. All rights reserved.\r
+ * ================================================================================\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ * \r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ * \r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ============LICENSE_END=========================================================\r
+ */\r
+package org.onap.dcae.vestest;\r
+\r
+import static org.junit.Assert.assertEquals;\r
+\r
+\r
+import java.io.IOException;\r
+import java.util.UUID;\r
+\r
+import org.json.JSONException;\r
+import org.json.JSONObject;\r
+import org.junit.After;\r
+import org.junit.Before;\r
+import org.junit.Test;\r
+import org.onap.dcae.commonFunction.CommonStartup;\r
+import org.onap.dcae.commonFunction.CommonStartup.QueueFullException;\r
+import org.onap.dcae.commonFunction.CustomExceptionLoader;\r
+import org.onap.dcae.restapi.endpoints.EventReceipt;\r
+\r
+import com.att.nsa.apiServer.endpoints.NsaBaseEndpoint;\r
+import com.att.nsa.drumlin.service.framework.context.DrumlinRequestContext;\r
+import com.att.nsa.drumlin.service.standards.HttpStatusCodes;\r
+import com.att.nsa.security.db.simple.NsaSimpleApiKey;\r
+\r
+import jline.internal.Log;\r
+\r
+public class TestEventReceipt extends NsaBaseEndpoint {\r
+\r
+       DrumlinRequestContext ctx;\r
+       JSONObject jsonObject;\r
+       Boolean flag = false;\r
+       String ev = "{\"event\": {\"commonEventHeader\": {      \"reportingEntityName\": \"VM name will be provided by ECOMP\", \"startEpochMicrosec\": 1477012779802988,\"lastEpochMicrosec\": 1477012789802988,\"eventId\": \"83\",\"sourceName\": \"Dummy VM name - No Metadata available\",\"sequence\": 83,\"priority\": \"Normal\",\"functionalRole\": \"vFirewall\",\"domain\": \"measurementsForVfScaling\",\"reportingEntityId\": \"VM UUID will be provided by ECOMP\",\"sourceId\": \"Dummy VM UUID - No Metadata available\",\"version\": 1.1},\"measurementsForVfScalingFields\": {\"measurementInterval\": 10,\"measurementsForVfScalingVersion\": 1.1,\"vNicUsageArray\": [{\"multicastPacketsIn\": 0,\"bytesIn\": 3896,\"unicastPacketsIn\": 0, \"multicastPacketsOut\": 0,\"broadcastPacketsOut\": 0,          \"packetsOut\": 28,\"bytesOut\": 12178,\"broadcastPacketsIn\": 0,\"packetsIn\": 58,\"unicastPacketsOut\": 0,\"vNicIdentifier\": \"eth0\"}]}}}";\r
+       \r
+\r
+       @Before\r
+       public void setUp() throws Exception {\r
+               \r
+\r
+       }\r
+\r
+       @After\r
+       public void tearDown() throws Exception {\r
+       }\r
+\r
+       @Test\r
+       public void testschemaFileVersion() {\r
+\r
+               String filename = null;\r
+               CommonStartup.schemaFileJson = new JSONObject(\r
+                               "{\"v1\":\"./etc/CommonEventFormat_27.2.json\",\"v2\":\"./etc/CommonEventFormat_27.2.json\",\"v3\":\"./etc/CommonEventFormat_27.2.json\",\"v4\":\"./etc/CommonEventFormat_27.2.json\",\"v5\":\"./etc/CommonEventFormat_28.4.1.json\"}");\r
+               filename = EventReceipt.schemaFileVersion("v5");\r
+\r
+               if (!filename.isEmpty()) {\r
+                       flag = true;\r
+               }\r
+               assertEquals(true, flag);\r
+       }\r
+\r
+       @Test\r
+       public void testrespondWithCustomMsginJson() {\r
+\r
+               CommonStartup.exceptionConfig = "./etc/ExceptionConfig.json";\r
+        CustomExceptionLoader.LoadMap();\r
+        \r
+        try {\r
+               EventReceipt.respondWithCustomMsginJson(null, HttpStatusCodes.k401_unauthorized, "Unauthorized user");\r
+        }\r
+        catch (Exception e)\r
+        {\r
+               //As context object is null, handling null pointer exception.\r
+               Log.debug("Response object creation failure");\r
+        }\r
+               assertEquals(true, true);\r
+       }\r
+\r
+       @Test\r
+       public void testschemaCheck() {\r
+\r
+               // schemaCheck(NsaSimpleApiKey retkey, int arrayFlag,JSONObject\r
+               // jsonObject, String vesVersion, FileReader fr, DrumlinRequestContext\r
+               // ctx, UUID uuid) throws JSONException, QueueFullException, IOException\r
+               NsaSimpleApiKey retkey = null;\r
+               int arrayFlag = 0;\r
+               \r
+               CommonStartup.authflag = 0;\r
+               CommonStartup.schemaValidatorflag = 1;\r
+\r
+               jsonObject = new org.json.JSONObject(ev);\r
+\r
+               String vesVersion = "v1";\r
+\r
+               DrumlinRequestContext ctx = null;\r
+\r
+               \r
+               UUID uuid = UUID.randomUUID();\r
+\r
+               try {\r
+                       EventReceipt.schemaCheck(retkey, arrayFlag, jsonObject, vesVersion, ctx, uuid);\r
+               } catch (NullPointerException |JSONException | QueueFullException | IOException e) {\r
+                       \r
+                       Log.debug("Response object creation failure");\r
+               }\r
+               assertEquals(true, true);\r
+       }\r
+}\r