2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright (C) 2018 Nokia. All rights reserved.s
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.dcae.restapi;
24 import static java.util.Optional.ofNullable;
25 import static java.util.stream.StreamSupport.stream;
26 import static org.springframework.http.ResponseEntity.ok;
28 import com.att.nsa.clock.SaClock;
29 import com.att.nsa.logging.LoggingContext;
30 import com.att.nsa.logging.log4j.EcompFields;
31 import com.github.fge.jackson.JsonLoader;
32 import com.github.fge.jsonschema.core.report.ProcessingReport;
33 import com.github.fge.jsonschema.main.JsonSchema;
35 import java.util.UUID;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import javax.servlet.http.HttpServletRequest;
39 import org.json.JSONArray;
40 import org.json.JSONObject;
41 import org.onap.dcae.ApplicationSettings;
42 import org.onap.dcae.CollectorSchemas;
43 import org.onap.dcae.commonFunction.VESLogger;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.beans.factory.annotation.Qualifier;
48 import org.springframework.http.MediaType;
49 import org.springframework.http.ResponseEntity;
50 import org.springframework.web.bind.annotation.GetMapping;
51 import org.springframework.web.bind.annotation.PostMapping;
52 import org.springframework.web.bind.annotation.RequestBody;
53 import org.springframework.web.bind.annotation.RestController;
56 public class VesRestController {
58 private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class);
60 private static final String FALLBACK_VES_VERSION = "v5";
62 @Autowired private ApplicationSettings collectorProperties;
64 @Autowired private CollectorSchemas schemas;
67 @Qualifier("metriclog")
68 private Logger metriclog;
71 @Qualifier("incomingRequestsLogger")
72 private Logger incomingRequestsLogger;
75 @Qualifier("errorLog")
76 private Logger errorLog;
78 private LinkedBlockingQueue<JSONObject> inputQueue;
79 private String version;
83 @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
84 @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
85 this.incomingRequestsLogger = incomingRequestsLogger;
86 this.inputQueue = inputQueue;
91 return "Welcome to VESCollector";
94 // refactor in next iteration
98 "/eventListener/v1/eventBatch",
100 "/eventListener/v2/eventBatch",
102 "/eventListener/v3/eventBatch",
104 "/eventListener/v4/eventBatch",
106 "/eventListener/v5/eventBatch",
108 "/eventListener/v7/eventBatch"
110 consumes = "application/json")
111 ResponseEntity<String> receiveEvent(
112 @RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
113 String request = httpServletRequest.getRequestURI();
114 extractVersion(request);
116 JSONObject jsonObject;
118 jsonObject = new JSONObject(jsonPayload);
119 } catch (Exception e) {
120 return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
123 String uuid = setUpECOMPLoggingForRequest();
124 incomingRequestsLogger.info(
126 "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
127 jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
129 if (collectorProperties.jsonSchemaValidationEnabled()) {
130 if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
131 if (!conformsToSchema(jsonObject, version)) {
132 return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
134 } else if (!isBatchRequest(request)
135 && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
136 if (!conformsToSchema(jsonObject, version)) {
137 return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
140 return errorResponse(ApiException.INVALID_JSON_INPUT);
144 JSONArray commonlyFormatted =
145 convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
147 if (!putEventsOnProcessingQueue(commonlyFormatted)) {
148 errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
149 return errorResponse(ApiException.NO_SERVER_RESOURCES);
151 // HttpStatus.SC_NO_CONTENT
152 return org.springframework.http.ResponseEntity.accepted()
153 .contentType(MediaType.APPLICATION_JSON)
157 private void extractVersion(String httpServletRequest) {
158 version = httpServletRequest.split("/")[2];
161 private ResponseEntity<String> errorResponse(ApiException noServerResources) {
162 return ResponseEntity.status(noServerResources.httpStatusCode)
163 .body(noServerResources.toJSON().toString());
166 private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
167 for (int i = 0; i < arrayOfEvents.length(); i++) {
168 metriclog.info("EVENT_PUBLISH_START");
169 if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
173 LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
174 metriclog.info("EVENT_PUBLISH_END");
178 private boolean conformsToSchema(JSONObject payload, String version) {
181 ofNullable(schemas.getJSONSchemasMap(version).get(version))
182 .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION));
183 ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
184 if (!report.isSuccess()) {
185 LOG.warn("Schema validation failed for event: " + payload);
186 stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage()));
189 return report.isSuccess();
190 } catch (Exception e) {
191 throw new RuntimeException("Unable to validate against schema", e);
195 private static JSONArray convertToJSONArrayCommonFormat(
196 JSONObject jsonObject, String request, String uuid, String version) {
197 JSONArray asArrayEvents = new JSONArray();
198 String vesUniqueIdKey = "VESuniqueId";
199 String vesVersionKey = "VESversion";
200 if (isBatchRequest(request)) {
201 JSONArray events = jsonObject.getJSONArray("eventList");
202 for (int i = 0; i < events.length(); i++) {
203 JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
204 event.put(vesUniqueIdKey, uuid + "-" + i);
205 event.put(vesVersionKey, version);
206 asArrayEvents.put(event);
209 jsonObject.put(vesUniqueIdKey, uuid);
210 jsonObject.put(vesVersionKey, version);
211 asArrayEvents = new JSONArray().put(jsonObject);
213 return asArrayEvents;
216 private static String setUpECOMPLoggingForRequest() {
217 final UUID uuid = UUID.randomUUID();
218 LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
219 localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
220 return uuid.toString();
223 private static boolean isBatchRequest(String request) {
224 return request.contains("eventBatch");