Merge "Replace nsaCore library with Spring"
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / restapi / VesRestController.java
1 /*
2  * ============LICENSE_START=======================================================
3  * PROJECT
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2018 Nokia. All rights reserved.s
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.dcae.restapi;
23
24 import static java.util.Optional.ofNullable;
25 import static java.util.stream.StreamSupport.stream;
26 import static org.springframework.http.ResponseEntity.accepted;
27 import static org.springframework.http.ResponseEntity.ok;
28
29 import com.att.nsa.clock.SaClock;
30 import com.att.nsa.logging.LoggingContext;
31 import com.att.nsa.logging.log4j.EcompFields;
32 import com.github.fge.jackson.JsonLoader;
33 import com.github.fge.jsonschema.core.report.ProcessingReport;
34 import com.github.fge.jsonschema.main.JsonSchema;
35
36 import java.util.UUID;
37 import java.util.concurrent.LinkedBlockingQueue;
38 import javax.servlet.http.HttpServletRequest;
39
40 import org.json.JSONArray;
41 import org.json.JSONObject;
42 import org.onap.dcae.ApplicationSettings;
43 import org.onap.dcae.CollectorSchemas;
44 import org.onap.dcae.commonFunction.VESLogger;
45 import org.slf4j.Logger;
46 import org.slf4j.LoggerFactory;
47 import org.springframework.beans.factory.annotation.Autowired;
48 import org.springframework.beans.factory.annotation.Qualifier;
49 import org.springframework.http.MediaType;
50 import org.springframework.http.ResponseEntity;
51 import org.springframework.web.bind.annotation.GetMapping;
52 import org.springframework.web.bind.annotation.PostMapping;
53 import org.springframework.web.bind.annotation.RequestBody;
54 import org.springframework.web.bind.annotation.RestController;
55
56 @RestController
57 public class VesRestController {
58
59     private static final Logger LOG = LoggerFactory.getLogger(VesRestController.class);
60
61     private static final String FALLBACK_VES_VERSION = "v5";
62
63     @Autowired
64     private ApplicationSettings collectorProperties;
65
66     @Autowired
67     private CollectorSchemas schemas;
68
69     @Autowired
70     @Qualifier("metriclog")
71     private Logger metriclog;
72
73     @Autowired
74     @Qualifier("incomingRequestsLogger")
75     private Logger incomingRequestsLogger;
76
77     @Autowired
78     @Qualifier("errorLog")
79     private Logger errorLog;
80
81     private LinkedBlockingQueue<JSONObject> inputQueue;
82     private String version;
83
84     @Autowired
85     VesRestController(@Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
86                       @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
87         this.incomingRequestsLogger = incomingRequestsLogger;
88         this.inputQueue = inputQueue;
89     }
90
91     @GetMapping("/")
92     String mainPage() {
93         return "Welcome to VESCollector";
94     }
95
96     //refactor in next iteration
97     @PostMapping(value = {"/eventListener/v1",
98             "/eventListener/v1/eventBatch",
99             "/eventListener/v2",
100             "/eventListener/v2/eventBatch",
101             "/eventListener/v3",
102             "/eventListener/v3/eventBatch",
103             "/eventListener/v4",
104             "/eventListener/v4/eventBatch",
105             "/eventListener/v5",
106             "/eventListener/v5/eventBatch"}, consumes = "application/json")
107     ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
108         String request = httpServletRequest.getRequestURI();
109         extractVersion(request);
110
111         JSONObject jsonObject;
112         try {
113             jsonObject = new JSONObject(jsonPayload);
114         } catch (Exception e) {
115             return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
116         }
117
118         String uuid = setUpECOMPLoggingForRequest();
119         incomingRequestsLogger.info(String.format(
120                 "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
121                 jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
122
123         if (collectorProperties.jsonSchemaValidationEnabled()) {
124             if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
125                 if (!conformsToSchema(jsonObject, version)) {
126                     return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
127                 }
128             } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
129                 if (!conformsToSchema(jsonObject, version)) {
130                     return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
131                 }
132             } else {
133                 return errorResponse(ApiException.INVALID_JSON_INPUT);
134             }
135         }
136
137         JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
138
139         if (!putEventsOnProcessingQueue(commonlyFormatted)) {
140             errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
141             return errorResponse(ApiException.NO_SERVER_RESOURCES);
142         }
143         return ok().contentType(MediaType.APPLICATION_JSON).body("Message Accepted");
144     }
145
146     private void extractVersion(String httpServletRequest) {
147         version = httpServletRequest.split("/")[2];
148     }
149
150     private ResponseEntity<String> errorResponse(ApiException noServerResources) {
151         return ResponseEntity.status(noServerResources.httpStatusCode)
152                 .body(noServerResources.toJSON().toString());
153     }
154
155     private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
156         for (int i = 0; i < arrayOfEvents.length(); i++) {
157             metriclog.info("EVENT_PUBLISH_START");
158             if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
159                 return false;
160             }
161         }
162         LOG.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
163         metriclog.info("EVENT_PUBLISH_END");
164         return true;
165     }
166
167     private boolean conformsToSchema(JSONObject payload, String version) {
168         try {
169             JsonSchema schema = ofNullable(schemas.getJSONSchemasMap(version).get(version))
170                     .orElse(schemas.getJSONSchemasMap(version).get(FALLBACK_VES_VERSION));
171             ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
172             if (!report.isSuccess()) {
173                 LOG.warn("Schema validation failed for event: " + payload);
174                 stream(report.spliterator(), false).forEach(e -> LOG.warn(e.getMessage()));
175                 return false;
176             }
177             return report.isSuccess();
178         } catch (Exception e) {
179             throw new RuntimeException("Unable to validate against schema", e);
180         }
181     }
182
183     private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
184                                                             String uuid, String version) {
185         JSONArray asArrayEvents = new JSONArray();
186         String vesUniqueIdKey = "VESuniqueId";
187         String vesVersionKey = "VESversion";
188         if (isBatchRequest(request)) {
189             JSONArray events = jsonObject.getJSONArray("eventList");
190             for (int i = 0; i < events.length(); i++) {
191                 JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
192                 event.put(vesUniqueIdKey, uuid + "-" + i);
193                 event.put(vesVersionKey, version);
194                 asArrayEvents.put(event);
195             }
196         } else {
197             jsonObject.put(vesUniqueIdKey, uuid);
198             jsonObject.put(vesVersionKey, version);
199             asArrayEvents = new JSONArray().put(jsonObject);
200         }
201         return asArrayEvents;
202     }
203
204     private static String setUpECOMPLoggingForRequest() {
205         final UUID uuid = UUID.randomUUID();
206         LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
207         localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
208         return uuid.toString();
209     }
210
211     private static boolean isBatchRequest(String request) {
212         return request.contains("eventBatch");
213     }
214 }