VES 7.0.1 updates
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / restapi / VesRestController.java
1 /*
2  * ============LICENSE_START=======================================================
3  * PROJECT
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2018 Nokia. All rights reserved.s
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.dcae.restapi;
23
24 import static java.util.Optional.ofNullable;
25 import static java.util.stream.StreamSupport.stream;
26 import static org.springframework.http.ResponseEntity.ok;
27
28 import com.att.nsa.clock.SaClock;
29 import com.att.nsa.logging.LoggingContext;
30 import com.att.nsa.logging.log4j.EcompFields;
31 import com.github.fge.jackson.JsonLoader;
32 import com.github.fge.jsonschema.core.report.ProcessingReport;
33 import com.github.fge.jsonschema.main.JsonSchema;
34
35 import java.util.UUID;
36 import java.util.concurrent.LinkedBlockingQueue;
37 import javax.servlet.http.HttpServletRequest;
38
39 import org.json.JSONArray;
40 import org.json.JSONObject;
41 import org.onap.dcae.ApplicationSettings;
42 import org.onap.dcae.CollectorSchemas;
43 import org.onap.dcae.commonFunction.VESLogger;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.beans.factory.annotation.Qualifier;
48 import org.springframework.http.MediaType;
49 import org.springframework.http.ResponseEntity;
50 import org.springframework.web.bind.annotation.GetMapping;
51 import org.springframework.web.bind.annotation.PostMapping;
52 import org.springframework.web.bind.annotation.RequestBody;
53 import org.springframework.web.bind.annotation.RestController;
54
55 @RestController
56 public class VesRestController {
57
58   private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class);
59
60   private static final String FALLBACK_VES_VERSION = "v5";
61
62   @Autowired private ApplicationSettings collectorProperties;
63
64   @Autowired private CollectorSchemas schemas;
65
66   @Autowired
67   @Qualifier("metriclog")
68   private Logger metriclog;
69
70   @Autowired
71   @Qualifier("incomingRequestsLogger")
72   private Logger incomingRequestsLogger;
73
74   @Autowired
75   @Qualifier("errorLog")
76   private Logger errorLog;
77
78   private LinkedBlockingQueue<JSONObject> inputQueue;
79   private String version;
80
81   @Autowired
82   VesRestController(
83       @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
84       @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
85     this.incomingRequestsLogger = incomingRequestsLogger;
86     this.inputQueue = inputQueue;
87   }
88
89   @GetMapping("/")
90   String mainPage() {
91     return "Welcome to VESCollector";
92   }
93
94   // refactor in next iteration
95   @PostMapping(
96       value = {
97         "/eventListener/v1",
98         "/eventListener/v1/eventBatch",
99         "/eventListener/v2",
100         "/eventListener/v2/eventBatch",
101         "/eventListener/v3",
102         "/eventListener/v3/eventBatch",
103         "/eventListener/v4",
104         "/eventListener/v4/eventBatch",
105         "/eventListener/v5",
106         "/eventListener/v5/eventBatch",
107         "/eventListener/v7",
108         "/eventListener/v7/eventBatch"
109       },
110       consumes = "application/json")
111   ResponseEntity<String> receiveEvent(
112       @RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
113     String request = httpServletRequest.getRequestURI();
114     extractVersion(request);
115
116     JSONObject jsonObject;
117     try {
118       jsonObject = new JSONObject(jsonPayload);
119     } catch (Exception e) {
120       return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
121     }
122
123     String uuid = setUpECOMPLoggingForRequest();
124     incomingRequestsLogger.info(
125         String.format(
126             "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
127             jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
128
129     if (collectorProperties.jsonSchemaValidationEnabled()) {
130       if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
131         if (!conformsToSchema(jsonObject, version)) {
132           return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
133         }
134       } else if (!isBatchRequest(request)
135           && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
136         if (!conformsToSchema(jsonObject, version)) {
137           return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
138         }
139       } else {
140         return errorResponse(ApiException.INVALID_JSON_INPUT);
141       }
142     }
143
144     JSONArray commonlyFormatted =
145         convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
146
147     if (!putEventsOnProcessingQueue(commonlyFormatted)) {
148       errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
149       return errorResponse(ApiException.NO_SERVER_RESOURCES);
150     }
151     // HttpStatus.SC_NO_CONTENT
152     return org.springframework.http.ResponseEntity.accepted()
153         .contentType(MediaType.APPLICATION_JSON)
154         .body("Accepted");
155   }
156
157   private void extractVersion(String httpServletRequest) {
158     version = httpServletRequest.split("/")[2];
159   }
160
161   private ResponseEntity<String> errorResponse(ApiException noServerResources) {
162     return ResponseEntity.status(noServerResources.httpStatusCode)
163         .body(noServerResources.toJSON().toString());
164   }
165
166   private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
167     for (int i = 0; i < arrayOfEvents.length(); i++) {
168       metriclog.info("EVENT_PUBLISH_START");
169       if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
170         return false;
171       }
172     }
173     LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
174     metriclog.info("EVENT_PUBLISH_END");
175     return true;
176   }
177
178   private boolean conformsToSchema(JSONObject payload, String version) {
179     try {
180       JsonSchema schema =
181           ofNullable(schemas.getJSONSchemasMap(version).get(version))
182               .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION));
183       ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
184       if (!report.isSuccess()) {
185         LOG.warn("Schema validation failed for event: " + payload);
186         stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage()));
187         return false;
188       }
189       return report.isSuccess();
190     } catch (Exception e) {
191       throw new RuntimeException("Unable to validate against schema", e);
192     }
193   }
194
195   private static JSONArray convertToJSONArrayCommonFormat(
196       JSONObject jsonObject, String request, String uuid, String version) {
197     JSONArray asArrayEvents = new JSONArray();
198     String vesUniqueIdKey = "VESuniqueId";
199     String vesVersionKey = "VESversion";
200     if (isBatchRequest(request)) {
201       JSONArray events = jsonObject.getJSONArray("eventList");
202       for (int i = 0; i < events.length(); i++) {
203         JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
204         event.put(vesUniqueIdKey, uuid + "-" + i);
205         event.put(vesVersionKey, version);
206         asArrayEvents.put(event);
207       }
208     } else {
209       jsonObject.put(vesUniqueIdKey, uuid);
210       jsonObject.put(vesVersionKey, version);
211       asArrayEvents = new JSONArray().put(jsonObject);
212     }
213     return asArrayEvents;
214   }
215
216   private static String setUpECOMPLoggingForRequest() {
217     final UUID uuid = UUID.randomUUID();
218     LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
219     localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
220     return uuid.toString();
221   }
222
223   private static boolean isBatchRequest(String request) {
224     return request.contains("eventBatch");
225   }
226 }