2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 package org.onap.crud.service;
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.schema.OxmModelValidator;
48 import org.onap.schema.RelationshipSchemaValidator;
50 import java.text.SimpleDateFormat;
51 import java.util.Timer;
52 import java.util.concurrent.Callable;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.ExecutionException;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.TimeoutException;
60 import javax.annotation.PreDestroy;
61 import javax.ws.rs.core.Response.Status;
63 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
65 private static Integer requestTimeOut;
67 private EventPublisher asyncRequestPublisher;
71 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
72 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
74 private static Logger logger = LoggerFactory.getInstance()
75 .getLogger(CrudAsyncGraphDataService.class.getName());
76 private static Logger metricsLogger = LoggerFactory.getInstance()
77 .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
78 private static LogFields OK_FIELDS = new LogFields();
81 OK_FIELDS.setField(Status.OK, Status.OK.toString());
84 public static Integer getRequestTimeOut() {
85 return requestTimeOut;
88 public CrudAsyncGraphDataService(GraphDao dao,
89 EventPublisher asyncRequestPublisher,
90 EventConsumer asyncResponseConsumer) throws CrudException {
94 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
97 = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
98 } catch (NumberFormatException ex) {
99 // Leave it as the default
102 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
104 responsePollInterval = Integer
105 .parseInt(CrudProperties
106 .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
107 } catch (Exception ex) {
108 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
109 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
110 + " error: " + ex.getMessage());
113 // Start the Response Consumer timer
114 CrudAsyncResponseConsumer crudAsyncResponseConsumer
115 = new CrudAsyncResponseConsumer(asyncResponseConsumer);
116 timer = new Timer("crudAsyncResponseConsumer-1");
117 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
119 this.asyncRequestPublisher = asyncRequestPublisher;
121 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
122 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
125 public class CollectGraphResponse implements Callable<GraphEvent> {
126 private volatile GraphEvent graphEvent;
127 private volatile CountDownLatch latch = new CountDownLatch(1);
130 public GraphEvent call() throws TimeoutException {
132 // Wait until graphEvent is available
133 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
134 } catch (InterruptedException e) {
136 if (this.graphEvent != null) {
137 return this.graphEvent;
139 throw new TimeoutException();
142 return this.graphEvent;
145 public void populateGraphEvent(GraphEvent event) {
146 this.graphEvent = event;
151 private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
153 long startTimeInMs = System.currentTimeMillis();
154 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
155 MdcOverride override = new MdcOverride();
156 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
158 // publish to request queue
160 asyncRequestPublisher.sendSync(event.toJson());
161 } catch (Exception e) {
162 throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
165 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
167 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
168 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
169 + " , transaction-id: " + event.getTransactionId() + " , operation: "
170 + event.getOperation().toString());
172 ExecutorService executor = Executors
173 .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
174 CollectGraphResponse collector = new CollectGraphResponse();
175 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
177 Future<GraphEvent> future = executor.submit(collector);
179 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
181 } catch (InterruptedException | ExecutionException | TimeoutException e) {
182 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
183 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
184 "Request timed out for transactionId: " + event.getTransactionId());
186 throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
187 + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
189 //Kill the thread as the work is completed
190 executor.shutdownNow();
192 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
193 "Total elapsed time for operation: " + event.getOperation().toString()
194 + " , transactionId: " + event.getTransactionId() + " is "
195 + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
199 public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
200 // Validate the incoming payload
201 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
202 type, payload.getProperties());
203 // Create graph request event
204 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
205 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
207 GraphEvent response = sendAndWait(event);
208 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
209 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
210 "Event response received: " + response.getObjectType() + " with key: "
211 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
212 + " , operation: " + event.getOperation().toString() + " , result: "
213 + response.getResult());
214 return CrudResponseBuilder.buildUpsertVertexResponse(
215 OxmModelValidator.validateOutgoingPayload(version,
216 response.getVertex().toVertex()), version);
218 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
219 "Event response received: " + response.getObjectType() + " with key: "
220 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
221 + " , operation: " + event.getOperation().toString() + " , result: "
222 + response.getResult() + " , error: " + response.getErrorMessage());
223 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
224 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
229 public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
230 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
231 // Create graph request event
232 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
233 .edge(GraphEventEdge.fromEdge(edge, version)).build();
235 GraphEvent response = sendAndWait(event);
236 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
237 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
238 "Event response received: " + response.getObjectType() + " with key: "
239 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
240 + " , operation: " + event.getOperation().toString() + " , result: "
241 + response.getResult());
242 return CrudResponseBuilder.buildUpsertEdgeResponse(
243 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
246 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
247 "Event response received: " + response.getObjectType() + " with key: "
248 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
249 + " , operation: " + event.getOperation().toString() + " , result: "
250 + response.getResult() + " , error: " + response.getErrorMessage());
251 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
252 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
256 public String updateVertex(String version, String id, String type, VertexPayload payload)
257 throws CrudException {
258 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
259 type, payload.getProperties());
260 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
261 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
263 GraphEvent response = sendAndWait(event);
264 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
265 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
266 "Event response received: " + response.getObjectType() + " with key: "
267 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
268 + " , operation: " + event.getOperation().toString() + " , result: "
269 + response.getResult());
270 return CrudResponseBuilder.buildUpsertVertexResponse(
271 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
274 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
275 "Event response received: " + response.getObjectType() + " with key: "
276 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
277 + " , operation: " + event.getOperation().toString() + " , result: "
278 + response.getResult() + " , error: " + response.getErrorMessage());
279 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
280 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
285 public String patchVertex(String version, String id, String type, VertexPayload payload)
286 throws CrudException {
287 Vertex existingVertex
288 = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
289 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
290 type, payload.getProperties(),
292 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
293 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
295 GraphEvent response = sendAndWait(event);
296 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
297 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
298 "Event response received: " + response.getObjectType() + " with key: "
299 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
300 + " , operation: " + event.getOperation().toString() + " , result: "
301 + response.getResult());
302 return CrudResponseBuilder.buildUpsertVertexResponse(
303 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
306 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
307 "Event response received: " + response.getObjectType() + " with key: "
308 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
309 + " , operation: " + event.getOperation().toString() + " , result: "
310 + response.getResult() + " , error: " + response.getErrorMessage());
311 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
312 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
317 public String deleteVertex(String version, String id, String type) throws CrudException {
318 type = OxmModelValidator.resolveCollectionType(version, type);
319 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
320 .vertex(new GraphEventVertex(id, version, type, null)).build();
322 GraphEvent response = sendAndWait(event);
323 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
324 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
325 "Event response received: " + response.getObjectType() + " with key: "
326 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
327 + " , operation: " + event.getOperation().toString() + " , result: "
328 + response.getResult());
331 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
332 "Event response received: " + response.getObjectType() + " with key: "
333 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
334 + " , operation: " + event.getOperation().toString() + " , result: "
335 + response.getResult() + " , error: " + response.getErrorMessage());
336 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
337 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
342 public String deleteEdge(String version, String id, String type) throws CrudException {
343 RelationshipSchemaValidator.validateType(version, type);
344 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
345 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
347 GraphEvent response = sendAndWait(event);
348 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
349 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
350 "Event response received: " + response.getObjectType() + " with key: "
351 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
352 + " , operation: " + event.getOperation().toString() + " , result: "
353 + response.getResult());
356 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
357 "Event response received: " + response.getObjectType() + " with key: "
358 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
359 + " , operation: " + event.getOperation().toString() + " , result: "
360 + response.getResult() + " , error: " + response.getErrorMessage());
361 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
362 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
367 public String updateEdge(String version, String id, String type, EdgePayload payload)
368 throws CrudException {
369 Edge edge = dao.getEdge(id, type);
370 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
372 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
373 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
375 GraphEvent response = sendAndWait(event);
376 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
377 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
378 "Event response received: " + response.getObjectType() + " with key: "
379 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
380 + " , operation: " + event.getOperation().toString() + " , result: "
381 + response.getResult());
382 return CrudResponseBuilder.buildUpsertEdgeResponse(
383 RelationshipSchemaValidator.validateOutgoingPayload(version,
384 response.getEdge().toEdge()), version);
386 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
387 "Event response received: " + response.getObjectType() + " with key: "
388 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
389 + " , operation: " + event.getOperation().toString() + " , result: "
390 + response.getResult() + " , error: " + response.getErrorMessage());
391 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
392 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
397 public String patchEdge(String version, String id, String type, EdgePayload payload)
398 throws CrudException {
399 Edge edge = dao.getEdge(id, type);
400 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
402 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
403 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
405 GraphEvent response = sendAndWait(event);
406 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
407 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
408 "Event response received: " + response.getObjectType() + " with key: "
409 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
410 + " , operation: " + event.getOperation().toString() + " , result: "
411 + response.getResult());
412 return CrudResponseBuilder.buildUpsertEdgeResponse(
413 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
416 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
417 "Event response received: " + response.getObjectType() + " with key: "
418 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
419 + " , operation: " + event.getOperation().toString() + " , result: "
420 + response.getResult() + " , error: " + response.getErrorMessage());
421 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
422 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
428 protected void preShutdown() {
434 public String addBulk(String version, BulkPayload payload) throws CrudException {
435 throw new CrudException("Bulk operation not supported in async mode", Status.BAD_REQUEST);