68aecce5639674f2fb9db5d62eed82c31e6f0497
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / restapi / VesRestController.java
1 /*
2  * ============LICENSE_START=======================================================
3  * PROJECT
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2018 Nokia. All rights reserved.s
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.dcae.restapi;
23
24 import static java.util.stream.StreamSupport.stream;
25 import static org.springframework.http.ResponseEntity.accepted;
26
27 import com.att.nsa.clock.SaClock;
28 import com.att.nsa.logging.LoggingContext;
29 import com.att.nsa.logging.log4j.EcompFields;
30 import com.github.fge.jackson.JsonLoader;
31 import com.github.fge.jsonschema.core.report.ProcessingReport;
32 import com.github.fge.jsonschema.main.JsonSchema;
33
34 import java.util.UUID;
35 import java.util.concurrent.LinkedBlockingQueue;
36 import javax.servlet.http.HttpServletRequest;
37
38 import org.json.JSONArray;
39 import org.json.JSONObject;
40 import org.onap.dcae.ApplicationSettings;
41 import org.onap.dcae.common.VESLogger;
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.springframework.beans.factory.annotation.Autowired;
45 import org.springframework.beans.factory.annotation.Qualifier;
46 import org.springframework.http.MediaType;
47 import org.springframework.http.ResponseEntity;
48 import org.springframework.web.bind.annotation.GetMapping;
49 import org.springframework.web.bind.annotation.PostMapping;
50 import org.springframework.web.bind.annotation.RequestBody;
51 import org.springframework.web.bind.annotation.RestController;
52
53 @RestController
54 public class VesRestController {
55     private static final Logger log = LoggerFactory.getLogger(VesRestController.class);
56
57     private final ApplicationSettings applicationSettings;
58     private final LinkedBlockingQueue<JSONObject> inputQueue;
59
60     private final Logger metricsLog;
61     private final Logger errorLog;
62     private final Logger incomingRequestsLogger;
63
64     @Autowired
65     VesRestController(ApplicationSettings applicationSettings,
66                       @Qualifier("metricsLog") Logger metricsLog,
67                       @Qualifier("errorLog") Logger errorLog,
68                       @Qualifier("incomingRequestsLogger") Logger incomingRequestsLogger,
69                       @Qualifier("inputQueue") LinkedBlockingQueue<JSONObject> inputQueue) {
70         this.applicationSettings = applicationSettings;
71         this.metricsLog = metricsLog;
72         this.errorLog = errorLog;
73         this.incomingRequestsLogger = incomingRequestsLogger;
74         this.inputQueue = inputQueue;
75     }
76
77     @GetMapping("/")
78     String mainPage() {
79         return "Welcome to VESCollector";
80     }
81
82     //refactor in next iteration
83     @PostMapping(value = {"/eventListener/v1",
84             "/eventListener/v1/eventBatch",
85             "/eventListener/v2",
86             "/eventListener/v2/eventBatch",
87             "/eventListener/v3",
88             "/eventListener/v3/eventBatch",
89             "/eventListener/v4",
90             "/eventListener/v4/eventBatch",
91             "/eventListener/v5",
92             "/eventListener/v5/eventBatch",
93             "/eventListener/v7",
94             "/eventListener/v7/eventBatch"}, consumes = "application/json")
95     ResponseEntity<String> receiveEvent(@RequestBody String jsonPayload, HttpServletRequest httpServletRequest) {
96         String request = httpServletRequest.getRequestURI();
97         String version = extractVersion(request);
98
99         JSONObject jsonObject;
100         try {
101             jsonObject = new JSONObject(jsonPayload);
102         } catch (Exception e) {
103             return ResponseEntity.badRequest().body(ApiException.INVALID_JSON_INPUT.toJSON().toString());
104         }
105
106         String uuid = setUpECOMPLoggingForRequest();
107         incomingRequestsLogger.info(String.format(
108                 "Received a VESEvent '%s', marked with unique identifier '%s', on api version '%s', from host: '%s'",
109                 jsonObject, uuid, version, httpServletRequest.getRemoteHost()));
110
111         if (applicationSettings.jsonSchemaValidationEnabled()) {
112             if (isBatchRequest(request) && (jsonObject.has("eventList") && (!jsonObject.has("event")))) {
113                 if (!conformsToSchema(jsonObject, version)) {
114                     return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
115                 }
116             } else if (!isBatchRequest(request) && (!jsonObject.has("eventList") && (jsonObject.has("event")))) {
117                 if (!conformsToSchema(jsonObject, version)) {
118                     return errorResponse(ApiException.SCHEMA_VALIDATION_FAILED);
119                 }
120             } else {
121                 return errorResponse(ApiException.INVALID_JSON_INPUT);
122             }
123         }
124
125         JSONArray commonlyFormatted = convertToJSONArrayCommonFormat(jsonObject, request, uuid, version);
126
127         if (!putEventsOnProcessingQueue(commonlyFormatted)) {
128             errorLog.error("EVENT_RECEIPT_FAILURE: QueueFull " + ApiException.NO_SERVER_RESOURCES);
129             return errorResponse(ApiException.NO_SERVER_RESOURCES);
130         }
131         return accepted()
132                 .contentType(MediaType.APPLICATION_JSON)
133                 .body("Accepted");
134     }
135
136     private String extractVersion(String httpServletRequest) {
137         return httpServletRequest.split("/")[2];
138     }
139
140     private ResponseEntity<String> errorResponse(ApiException noServerResources) {
141         return ResponseEntity.status(noServerResources.httpStatusCode)
142                 .body(noServerResources.toJSON().toString());
143     }
144
145     private boolean putEventsOnProcessingQueue(JSONArray arrayOfEvents) {
146         for (int i = 0; i < arrayOfEvents.length(); i++) {
147             metricsLog.info("EVENT_PUBLISH_START");
148             if (!inputQueue.offer((JSONObject) arrayOfEvents.get(i))) {
149                 return false;
150             }
151         }
152         log.debug("CommonStartup.handleEvents:EVENTS has been published successfully!");
153         metricsLog.info("EVENT_PUBLISH_END");
154         return true;
155     }
156
157     private boolean conformsToSchema(JSONObject payload, String version) {
158         try {
159             JsonSchema schema = applicationSettings.jsonSchema(version);
160             ProcessingReport report = schema.validate(JsonLoader.fromString(payload.toString()));
161             if (!report.isSuccess()) {
162                 log.warn("Schema validation failed for event: " + payload);
163                 stream(report.spliterator(), false).forEach(e -> log.warn(e.getMessage()));
164                 return false;
165             }
166             return report.isSuccess();
167         } catch (Exception e) {
168             throw new RuntimeException("Unable to validate against schema", e);
169         }
170     }
171
172     private static JSONArray convertToJSONArrayCommonFormat(JSONObject jsonObject, String request,
173                                                             String uuid, String version) {
174         JSONArray asArrayEvents = new JSONArray();
175         String vesUniqueIdKey = "VESuniqueId";
176         String vesVersionKey = "VESversion";
177         if (isBatchRequest(request)) {
178             JSONArray events = jsonObject.getJSONArray("eventList");
179             for (int i = 0; i < events.length(); i++) {
180                 JSONObject event = new JSONObject().put("event", events.getJSONObject(i));
181                 event.put(vesUniqueIdKey, uuid + "-" + i);
182                 event.put(vesVersionKey, version);
183                 asArrayEvents.put(event);
184             }
185         } else {
186             jsonObject.put(vesUniqueIdKey, uuid);
187             jsonObject.put(vesVersionKey, version);
188             asArrayEvents = new JSONArray().put(jsonObject);
189         }
190         return asArrayEvents;
191     }
192
193     private static String setUpECOMPLoggingForRequest() {
194         final UUID uuid = UUID.randomUUID();
195         LoggingContext localLC = VESLogger.getLoggingContextForThread(uuid);
196         localLC.put(EcompFields.kBeginTimestampMs, SaClock.now());
197         return uuid.toString();
198     }
199
200     private static boolean isBatchRequest(String request) {
201         return request.contains("eventBatch");
202     }
203 }