2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 package org.onap.crud.service;
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
28 import com.google.gson.JsonElement;
30 import org.onap.aai.cl.api.LogFields;
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.eelf.LoggerFactory;
33 import org.onap.aai.cl.mdc.MdcContext;
34 import org.onap.aai.cl.mdc.MdcOverride;
35 import org.onap.crud.dao.GraphDao;
36 import org.onap.crud.entity.Edge;
37 import org.onap.crud.entity.Vertex;
38 import org.onap.crud.event.GraphEvent;
39 import org.onap.crud.event.GraphEvent.GraphEventOperation;
40 import org.onap.crud.event.GraphEvent.GraphEventResult;
41 import org.onap.crud.event.GraphEventEdge;
42 import org.onap.crud.event.GraphEventVertex;
43 import org.onap.crud.exception.CrudException;
44 import org.onap.crud.logging.CrudServiceMsgs;
45 import org.onap.crud.parser.CrudResponseBuilder;
46 import org.onap.crud.util.CrudProperties;
47 import org.onap.crud.util.CrudServiceConstants;
48 import org.onap.crud.util.CrudServiceUtil;
49 import org.onap.schema.OxmModelValidator;
50 import org.onap.schema.RelationshipSchemaValidator;
52 import java.text.SimpleDateFormat;
53 import java.util.ArrayList;
54 import java.util.HashMap;
55 import java.util.List;
57 import java.util.Timer;
58 import java.util.concurrent.Callable;
59 import java.util.concurrent.CountDownLatch;
60 import java.util.concurrent.ExecutionException;
61 import java.util.concurrent.ExecutorService;
62 import java.util.concurrent.Executors;
63 import java.util.concurrent.Future;
64 import java.util.concurrent.TimeUnit;
65 import java.util.concurrent.TimeoutException;
66 import javax.annotation.PreDestroy;
67 import javax.ws.rs.core.HttpHeaders;
68 import javax.ws.rs.core.Response.Status;
70 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
72 private static Integer requestTimeOut;
74 private EventPublisher asyncRequestPublisher;
78 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
79 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
81 private static Logger logger = LoggerFactory.getInstance()
82 .getLogger(CrudAsyncGraphDataService.class.getName());
83 private static Logger metricsLogger = LoggerFactory.getInstance()
84 .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
85 private static LogFields OK_FIELDS = new LogFields();
88 OK_FIELDS.setField(Status.OK, Status.OK.toString());
91 public static Integer getRequestTimeOut() {
92 return requestTimeOut;
95 public CrudAsyncGraphDataService(GraphDao dao,
96 EventPublisher asyncRequestPublisher,
97 EventConsumer asyncResponseConsumer) throws CrudException {
101 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
104 = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
105 } catch (NumberFormatException ex) {
106 // Leave it as the default
109 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
111 responsePollInterval = Integer
112 .parseInt(CrudProperties
113 .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
114 } catch (Exception ex) {
115 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
116 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
117 + " error: " + ex.getMessage());
120 // Start the Response Consumer timer
121 CrudAsyncResponseConsumer crudAsyncResponseConsumer
122 = new CrudAsyncResponseConsumer(asyncResponseConsumer);
123 timer = new Timer("crudAsyncResponseConsumer-1");
124 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
126 this.asyncRequestPublisher = asyncRequestPublisher;
128 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
129 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
132 public class CollectGraphResponse implements Callable<GraphEvent> {
133 private volatile GraphEvent graphEvent;
134 private volatile CountDownLatch latch = new CountDownLatch(1);
137 public GraphEvent call() throws TimeoutException {
139 // Wait until graphEvent is available
140 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
141 } catch (InterruptedException e) {
143 if (this.graphEvent != null) {
144 return this.graphEvent;
146 throw new TimeoutException();
149 return this.graphEvent;
152 public void populateGraphEvent(GraphEvent event) {
153 this.graphEvent = event;
158 private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
160 long startTimeInMs = System.currentTimeMillis();
161 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
162 MdcOverride override = new MdcOverride();
163 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
165 // publish to request queue
167 asyncRequestPublisher.sendSync(event.toJson());
168 } catch (Exception e) {
169 throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
172 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
174 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
175 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
176 + " , transaction-id: " + event.getTransactionId() + " , operation: "
177 + event.getOperation().toString());
179 ExecutorService executor = Executors
180 .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
181 CollectGraphResponse collector = new CollectGraphResponse();
182 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
184 Future<GraphEvent> future = executor.submit(collector);
186 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
188 } catch (InterruptedException | ExecutionException | TimeoutException e) {
189 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
190 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
191 "Request timed out for transactionId: " + event.getTransactionId());
193 throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
194 + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
196 //Kill the thread as the work is completed
197 executor.shutdownNow();
199 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
200 "Total elapsed time for operation: " + event.getOperation().toString()
201 + " , transactionId: " + event.getTransactionId() + " is "
202 + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
206 public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
207 // Validate the incoming payload
208 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
209 type, payload.getProperties());
210 // Create graph request event
211 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
212 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
214 GraphEvent response = sendAndWait(event);
215 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
216 logSuccessResponse(event, response);
217 return CrudResponseBuilder.buildUpsertVertexResponse(
218 OxmModelValidator.validateOutgoingPayload(version,
219 response.getVertex().toVertex()), version);
221 logErrorResponse(event, response);
222 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
223 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
228 public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
229 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
230 // Create graph request event
231 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
232 .edge(GraphEventEdge.fromEdge(edge, version)).build();
234 GraphEvent response = sendAndWait(event);
235 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
236 logSuccessResponse(event, response);
237 return CrudResponseBuilder.buildUpsertEdgeResponse(
238 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
241 logErrorResponse(event, response);
242 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
243 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
247 public String updateVertex(String version, String id, String type, VertexPayload payload)
248 throws CrudException {
249 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
250 type, payload.getProperties());
251 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
252 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
254 GraphEvent response = sendAndWait(event);
255 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
256 logSuccessResponse(event, response);
257 return CrudResponseBuilder.buildUpsertVertexResponse(
258 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
261 logErrorResponse(event, response);
262 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
263 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
268 public String patchVertex(String version, String id, String type, VertexPayload payload)
269 throws CrudException {
270 Vertex existingVertex
271 = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
272 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
273 type, payload.getProperties(),
275 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
276 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
278 GraphEvent response = sendAndWait(event);
279 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
280 logSuccessResponse(event, response);
281 return CrudResponseBuilder.buildUpsertVertexResponse(
282 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
285 logErrorResponse(event, response);
286 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
287 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
292 public String deleteVertex(String version, String id, String type) throws CrudException {
293 type = OxmModelValidator.resolveCollectionType(version, type);
294 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
295 .vertex(new GraphEventVertex(id, version, type, null)).build();
297 GraphEvent response = sendAndWait(event);
298 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
299 logSuccessResponse(event, response);
302 logErrorResponse(event, response);
303 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
304 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
309 public String deleteEdge(String version, String id, String type) throws CrudException {
310 RelationshipSchemaValidator.validateType(version, type);
311 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
312 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
314 GraphEvent response = sendAndWait(event);
315 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
316 logSuccessResponse(event, response);
319 logErrorResponse(event, response);
320 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
321 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
326 public String updateEdge(String version, String id, String type, EdgePayload payload)
327 throws CrudException {
328 Edge edge = dao.getEdge(id, type);
329 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
331 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
332 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
334 GraphEvent response = sendAndWait(event);
335 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
336 logSuccessResponse(event, response);
337 return CrudResponseBuilder.buildUpsertEdgeResponse(
338 RelationshipSchemaValidator.validateOutgoingPayload(version,
339 response.getEdge().toEdge()), version);
341 logErrorResponse(event, response);
342 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
343 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
348 public String patchEdge(String version, String id, String type, EdgePayload payload)
349 throws CrudException {
350 Edge edge = dao.getEdge(id, type);
351 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
353 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
354 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
356 GraphEvent response = sendAndWait(event);
357 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
358 logSuccessResponse(event, response);
359 return CrudResponseBuilder.buildUpsertEdgeResponse(
360 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
363 logErrorResponse(event, response);
364 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
365 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
371 protected void preShutdown() {
377 public String addBulk(String version, BulkPayload payload, HttpHeaders headers) throws CrudException {
378 HashMap<String, Vertex> vertices = new HashMap<String, Vertex>();
379 HashMap<String, Edge> edges = new HashMap<String, Edge>();
380 String txId = dao.openTransaction();
384 for (JsonElement v : payload.getObjects()) {
385 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
386 v.getAsJsonObject().entrySet());
388 if (entries.size() != 2) {
389 throw new CrudException("", Status.BAD_REQUEST);
391 Map.Entry<String, JsonElement> opr = entries.get(0);
392 Map.Entry<String, JsonElement> item = entries.get(1);
394 VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
396 if (opr.getValue().getAsString().equalsIgnoreCase("add")
397 || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
398 Vertex validatedVertex;
401 if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
402 vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
404 // Publish add-vertex event
405 validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(),
406 vertexPayload.getProperties());
407 event = GraphEvent.builder(GraphEventOperation.CREATE)
408 .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();
410 vertexPayload.setProperties(CrudServiceUtil.mergeHeaderInFoToPayload(vertexPayload.getProperties(),
412 // Publish update-vertex event
413 validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version,
414 vertexPayload.getType(), vertexPayload.getProperties());
415 event = GraphEvent.builder(GraphEventOperation.UPDATE)
416 .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();
419 event.setDbTransactionId(txId);
420 GraphEvent response = publishEvent(event);
421 Vertex persistedVertex = response.getVertex().toVertex();
422 Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
423 vertices.put(item.getKey(), outgoingVertex);
424 } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
425 // Publish delete-vertex event
426 String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType());
427 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
428 .vertex(new GraphEventVertex(vertexPayload.getId(), version, type, null)).build();
429 event.setDbTransactionId(txId);
435 for (JsonElement v : payload.getRelationships()) {
436 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
437 v.getAsJsonObject().entrySet());
439 if (entries.size() != 2) {
440 throw new CrudException("", Status.BAD_REQUEST);
442 Map.Entry<String, JsonElement> opr = entries.get(0);
443 Map.Entry<String, JsonElement> item = entries.get(1);
445 EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
447 if (opr.getValue().getAsString().equalsIgnoreCase("add")
448 || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
451 if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
452 // Fix the source/destination
453 if (edgePayload.getSource().startsWith("$")) {
454 Vertex source = vertices.get(edgePayload.getSource().substring(1));
455 if (source == null) {
456 throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1),
457 Status.INTERNAL_SERVER_ERROR);
460 .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get());
462 if (edgePayload.getTarget().startsWith("$")) {
463 Vertex target = vertices.get(edgePayload.getTarget().substring(1));
464 if (target == null) {
465 throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1),
466 Status.INTERNAL_SERVER_ERROR);
469 .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get());
471 validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),
473 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
474 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
475 event.setDbTransactionId(txId);
476 GraphEvent response = publishEvent(event);
477 persistedEdge = response.getEdge().toEdge();
479 Edge edge = dao.getEdge(edgePayload.getId(), edgePayload.getType(), txId);
480 validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload);
481 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
482 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
483 event.setDbTransactionId(txId);
484 GraphEvent response = publishEvent(event);
485 persistedEdge = response.getEdge().toEdge();
488 Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge);
489 edges.put(item.getKey(), outgoingEdge);
490 } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
491 RelationshipSchemaValidator.validateType(version, edgePayload.getType());
492 // Publish delete-vertex event
493 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
494 .edge(new GraphEventEdge(edgePayload.getId(), version, edgePayload.getType(), null, null, null)).build();
495 event.setDbTransactionId(txId);
500 // commit transaction
501 dao.commitTransaction(txId);
502 } catch (CrudException ex) {
503 dao.rollbackTransaction(txId);
505 } catch (Exception ex) {
506 dao.rollbackTransaction(txId);
509 if (dao.transactionExists(txId)) {
510 dao.rollbackTransaction(txId);
514 return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload);
517 private GraphEvent publishEvent(GraphEvent event) throws CrudException {
518 GraphEvent response = sendAndWait(event);
519 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
520 logSuccessResponse(event, response);
522 logErrorResponse(event, response);
523 throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
524 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
530 private void logSuccessResponse(GraphEvent event, GraphEvent response) {
531 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
532 "Event response received: " + response.getObjectType() + " with key: "
533 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
534 + " , operation: " + event.getOperation().toString() + " , result: "
535 + response.getResult());
538 private void logErrorResponse(GraphEvent event, GraphEvent response) {
539 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
540 "Event response received: " + response.getObjectType() + " with key: "
541 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
542 + " , operation: " + event.getOperation().toString() + " , result: "
543 + response.getResult() + " , error: " + response.getErrorMessage());