2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 package org.onap.crud.service;
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
28 import com.google.gson.JsonElement;
30 import org.onap.aai.cl.api.LogFields;
31 import org.onap.aai.cl.api.Logger;
32 import org.onap.aai.cl.eelf.LoggerFactory;
33 import org.onap.aai.cl.mdc.MdcContext;
34 import org.onap.aai.cl.mdc.MdcOverride;
35 import org.onap.crud.dao.GraphDao;
36 import org.onap.crud.entity.Edge;
37 import org.onap.crud.entity.Vertex;
38 import org.onap.crud.event.GraphEvent;
39 import org.onap.crud.event.GraphEvent.GraphEventOperation;
40 import org.onap.crud.event.GraphEvent.GraphEventResult;
41 import org.onap.crud.event.GraphEventEdge;
42 import org.onap.crud.event.GraphEventVertex;
43 import org.onap.crud.exception.CrudException;
44 import org.onap.crud.logging.CrudServiceMsgs;
45 import org.onap.crud.parser.CrudResponseBuilder;
46 import org.onap.crud.util.CrudProperties;
47 import org.onap.crud.util.CrudServiceConstants;
48 import org.onap.schema.OxmModelValidator;
49 import org.onap.schema.RelationshipSchemaValidator;
51 import java.text.SimpleDateFormat;
52 import java.util.ArrayList;
53 import java.util.HashMap;
54 import java.util.List;
56 import java.util.Timer;
57 import java.util.concurrent.Callable;
58 import java.util.concurrent.CountDownLatch;
59 import java.util.concurrent.ExecutionException;
60 import java.util.concurrent.ExecutorService;
61 import java.util.concurrent.Executors;
62 import java.util.concurrent.Future;
63 import java.util.concurrent.TimeUnit;
64 import java.util.concurrent.TimeoutException;
65 import javax.annotation.PreDestroy;
66 import javax.ws.rs.core.Response.Status;
68 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
70 private static Integer requestTimeOut;
72 private EventPublisher asyncRequestPublisher;
76 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
77 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
79 private static Logger logger = LoggerFactory.getInstance()
80 .getLogger(CrudAsyncGraphDataService.class.getName());
81 private static Logger metricsLogger = LoggerFactory.getInstance()
82 .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
83 private static LogFields OK_FIELDS = new LogFields();
86 OK_FIELDS.setField(Status.OK, Status.OK.toString());
89 public static Integer getRequestTimeOut() {
90 return requestTimeOut;
93 public CrudAsyncGraphDataService(GraphDao dao,
94 EventPublisher asyncRequestPublisher,
95 EventConsumer asyncResponseConsumer) throws CrudException {
99 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
102 = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
103 } catch (NumberFormatException ex) {
104 // Leave it as the default
107 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
109 responsePollInterval = Integer
110 .parseInt(CrudProperties
111 .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
112 } catch (Exception ex) {
113 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
114 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
115 + " error: " + ex.getMessage());
118 // Start the Response Consumer timer
119 CrudAsyncResponseConsumer crudAsyncResponseConsumer
120 = new CrudAsyncResponseConsumer(asyncResponseConsumer);
121 timer = new Timer("crudAsyncResponseConsumer-1");
122 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
124 this.asyncRequestPublisher = asyncRequestPublisher;
126 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
127 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
130 public class CollectGraphResponse implements Callable<GraphEvent> {
131 private volatile GraphEvent graphEvent;
132 private volatile CountDownLatch latch = new CountDownLatch(1);
135 public GraphEvent call() throws TimeoutException {
137 // Wait until graphEvent is available
138 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
139 } catch (InterruptedException e) {
141 if (this.graphEvent != null) {
142 return this.graphEvent;
144 throw new TimeoutException();
147 return this.graphEvent;
150 public void populateGraphEvent(GraphEvent event) {
151 this.graphEvent = event;
156 private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
158 long startTimeInMs = System.currentTimeMillis();
159 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
160 MdcOverride override = new MdcOverride();
161 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
163 // publish to request queue
165 asyncRequestPublisher.sendSync(event.toJson());
166 } catch (Exception e) {
167 throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
170 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
172 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
173 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
174 + " , transaction-id: " + event.getTransactionId() + " , operation: "
175 + event.getOperation().toString());
177 ExecutorService executor = Executors
178 .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
179 CollectGraphResponse collector = new CollectGraphResponse();
180 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
182 Future<GraphEvent> future = executor.submit(collector);
184 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
186 } catch (InterruptedException | ExecutionException | TimeoutException e) {
187 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
188 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
189 "Request timed out for transactionId: " + event.getTransactionId());
191 throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
192 + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
194 //Kill the thread as the work is completed
195 executor.shutdownNow();
197 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
198 "Total elapsed time for operation: " + event.getOperation().toString()
199 + " , transactionId: " + event.getTransactionId() + " is "
200 + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
204 public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
205 // Validate the incoming payload
206 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
207 type, payload.getProperties());
208 // Create graph request event
209 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
210 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
212 GraphEvent response = sendAndWait(event);
213 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
214 logSuccessResponse(event, response);
215 return CrudResponseBuilder.buildUpsertVertexResponse(
216 OxmModelValidator.validateOutgoingPayload(version,
217 response.getVertex().toVertex()), version);
219 logErrorResponse(event, response);
220 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
221 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
226 public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
227 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
228 // Create graph request event
229 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
230 .edge(GraphEventEdge.fromEdge(edge, version)).build();
232 GraphEvent response = sendAndWait(event);
233 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
234 logSuccessResponse(event, response);
235 return CrudResponseBuilder.buildUpsertEdgeResponse(
236 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
239 logErrorResponse(event, response);
240 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
241 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
245 public String updateVertex(String version, String id, String type, VertexPayload payload)
246 throws CrudException {
247 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
248 type, payload.getProperties());
249 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
250 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
252 GraphEvent response = sendAndWait(event);
253 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
254 logSuccessResponse(event, response);
255 return CrudResponseBuilder.buildUpsertVertexResponse(
256 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
259 logErrorResponse(event, response);
260 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
261 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
266 public String patchVertex(String version, String id, String type, VertexPayload payload)
267 throws CrudException {
268 Vertex existingVertex
269 = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
270 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
271 type, payload.getProperties(),
273 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
274 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
276 GraphEvent response = sendAndWait(event);
277 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
278 logSuccessResponse(event, response);
279 return CrudResponseBuilder.buildUpsertVertexResponse(
280 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
283 logErrorResponse(event, response);
284 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
285 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
290 public String deleteVertex(String version, String id, String type) throws CrudException {
291 type = OxmModelValidator.resolveCollectionType(version, type);
292 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
293 .vertex(new GraphEventVertex(id, version, type, null)).build();
295 GraphEvent response = sendAndWait(event);
296 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
297 logSuccessResponse(event, response);
300 logErrorResponse(event, response);
301 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
302 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
307 public String deleteEdge(String version, String id, String type) throws CrudException {
308 RelationshipSchemaValidator.validateType(version, type);
309 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
310 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
312 GraphEvent response = sendAndWait(event);
313 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
314 logSuccessResponse(event, response);
317 logErrorResponse(event, response);
318 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
319 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
324 public String updateEdge(String version, String id, String type, EdgePayload payload)
325 throws CrudException {
326 Edge edge = dao.getEdge(id, type);
327 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
329 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
330 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
332 GraphEvent response = sendAndWait(event);
333 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
334 logSuccessResponse(event, response);
335 return CrudResponseBuilder.buildUpsertEdgeResponse(
336 RelationshipSchemaValidator.validateOutgoingPayload(version,
337 response.getEdge().toEdge()), version);
339 logErrorResponse(event, response);
340 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
341 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
346 public String patchEdge(String version, String id, String type, EdgePayload payload)
347 throws CrudException {
348 Edge edge = dao.getEdge(id, type);
349 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
351 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
352 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
354 GraphEvent response = sendAndWait(event);
355 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
356 logSuccessResponse(event, response);
357 return CrudResponseBuilder.buildUpsertEdgeResponse(
358 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
361 logErrorResponse(event, response);
362 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
363 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
369 protected void preShutdown() {
375 public String addBulk(String version, BulkPayload payload) throws CrudException {
376 HashMap<String, Vertex> vertices = new HashMap<String, Vertex>();
377 HashMap<String, Edge> edges = new HashMap<String, Edge>();
378 String txId = dao.openTransaction();
382 for (JsonElement v : payload.getObjects()) {
383 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
384 v.getAsJsonObject().entrySet());
386 if (entries.size() != 2) {
387 throw new CrudException("", Status.BAD_REQUEST);
389 Map.Entry<String, JsonElement> opr = entries.get(0);
390 Map.Entry<String, JsonElement> item = entries.get(1);
392 VertexPayload vertexPayload = VertexPayload.fromJson(item.getValue().getAsJsonObject().toString());
394 if (opr.getValue().getAsString().equalsIgnoreCase("add")
395 || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
396 Vertex validatedVertex;
399 if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
400 // Publish add-vertex event
401 validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, vertexPayload.getType(),
402 vertexPayload.getProperties());
403 event = GraphEvent.builder(GraphEventOperation.CREATE)
404 .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();
406 // Publish update-vertex event
407 validatedVertex = OxmModelValidator.validateIncomingUpsertPayload(vertexPayload.getId(), version,
408 vertexPayload.getType(), vertexPayload.getProperties());
409 event = GraphEvent.builder(GraphEventOperation.UPDATE)
410 .vertex(GraphEventVertex.fromVertex(validatedVertex, version)).build();
413 event.setDbTransactionId(txId);
414 GraphEvent response = publishEvent(event);
415 Vertex persistedVertex = response.getVertex().toVertex();
416 Vertex outgoingVertex = OxmModelValidator.validateOutgoingPayload(version, persistedVertex);
417 vertices.put(item.getKey(), outgoingVertex);
418 } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
419 // Publish delete-vertex event
420 String type = OxmModelValidator.resolveCollectionType(version, vertexPayload.getType());
421 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
422 .vertex(new GraphEventVertex(vertexPayload.getId(), version, type, null)).build();
423 event.setDbTransactionId(txId);
429 for (JsonElement v : payload.getRelationships()) {
430 List<Map.Entry<String, JsonElement>> entries = new ArrayList<Map.Entry<String, JsonElement>>(
431 v.getAsJsonObject().entrySet());
433 if (entries.size() != 2) {
434 throw new CrudException("", Status.BAD_REQUEST);
436 Map.Entry<String, JsonElement> opr = entries.get(0);
437 Map.Entry<String, JsonElement> item = entries.get(1);
439 EdgePayload edgePayload = EdgePayload.fromJson(item.getValue().getAsJsonObject().toString());
441 if (opr.getValue().getAsString().equalsIgnoreCase("add")
442 || opr.getValue().getAsString().equalsIgnoreCase("modify")) {
445 if (opr.getValue().getAsString().equalsIgnoreCase("add")) {
446 // Fix the source/destination
447 if (edgePayload.getSource().startsWith("$")) {
448 Vertex source = vertices.get(edgePayload.getSource().substring(1));
449 if (source == null) {
450 throw new CrudException("Not able to find vertex: " + edgePayload.getSource().substring(1),
451 Status.INTERNAL_SERVER_ERROR);
454 .setSource("services/inventory/" + version + "/" + source.getType() + "/" + source.getId().get());
456 if (edgePayload.getTarget().startsWith("$")) {
457 Vertex target = vertices.get(edgePayload.getTarget().substring(1));
458 if (target == null) {
459 throw new CrudException("Not able to find vertex: " + edgePayload.getTarget().substring(1),
460 Status.INTERNAL_SERVER_ERROR);
463 .setTarget("services/inventory/" + version + "/" + target.getType() + "/" + target.getId().get());
465 validatedEdge = RelationshipSchemaValidator.validateIncomingAddPayload(version, edgePayload.getType(),
467 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
468 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
469 event.setDbTransactionId(txId);
470 GraphEvent response = publishEvent(event);
471 persistedEdge = response.getEdge().toEdge();
473 Edge edge = dao.getEdge(edgePayload.getId(), edgePayload.getType(), txId);
474 validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, edgePayload);
475 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
476 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
477 event.setDbTransactionId(txId);
478 GraphEvent response = publishEvent(event);
479 persistedEdge = response.getEdge().toEdge();
482 Edge outgoingEdge = RelationshipSchemaValidator.validateOutgoingPayload(version, persistedEdge);
483 edges.put(item.getKey(), outgoingEdge);
484 } else if (opr.getValue().getAsString().equalsIgnoreCase("delete")) {
485 RelationshipSchemaValidator.validateType(version, edgePayload.getType());
486 // Publish delete-vertex event
487 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
488 .edge(new GraphEventEdge(edgePayload.getId(), version, edgePayload.getType(), null, null, null)).build();
489 event.setDbTransactionId(txId);
494 // commit transaction
495 dao.commitTransaction(txId);
496 } catch (CrudException ex) {
497 dao.rollbackTransaction(txId);
499 } catch (Exception ex) {
500 dao.rollbackTransaction(txId);
503 if (dao.transactionExists(txId)) {
504 dao.rollbackTransaction(txId);
508 return CrudResponseBuilder.buildUpsertBulkResponse(vertices, edges, version, payload);
511 private GraphEvent publishEvent(GraphEvent event) throws CrudException {
512 GraphEvent response = sendAndWait(event);
513 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
514 logSuccessResponse(event, response);
516 logErrorResponse(event, response);
517 throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
518 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
524 private void logSuccessResponse(GraphEvent event, GraphEvent response) {
525 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
526 "Event response received: " + response.getObjectType() + " with key: "
527 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
528 + " , operation: " + event.getOperation().toString() + " , result: "
529 + response.getResult());
532 private void logErrorResponse(GraphEvent event, GraphEvent response) {
533 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
534 "Event response received: " + response.getObjectType() + " with key: "
535 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
536 + " , operation: " + event.getOperation().toString() + " , result: "
537 + response.getResult() + " , error: " + response.getErrorMessage());