2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 package org.onap.crud.service;
26 import org.onap.aai.event.api.EventConsumer;
27 import org.onap.aai.event.api.EventPublisher;
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.schema.OxmModelValidator;
48 import org.onap.schema.RelationshipSchemaValidator;
50 import java.text.SimpleDateFormat;
51 import java.util.HashMap;
52 import java.util.Timer;
53 import java.util.concurrent.Callable;
54 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import javax.annotation.PreDestroy;
62 import javax.ws.rs.core.Response.Status;
64 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
66 private static Integer requestTimeOut;
68 private EventPublisher asyncRequestPublisher;
72 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
73 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
75 private static Logger logger = LoggerFactory.getInstance()
76 .getLogger(CrudAsyncGraphDataService.class.getName());
77 private static Logger metricsLogger = LoggerFactory.getInstance()
78 .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
79 private static LogFields OK_FIELDS = new LogFields();
82 OK_FIELDS.setField(Status.OK, Status.OK.toString());
85 public static Integer getRequestTimeOut() {
86 return requestTimeOut;
89 public CrudAsyncGraphDataService(GraphDao dao,
90 EventPublisher asyncRequestPublisher,
91 EventConsumer asyncResponseConsumer) throws CrudException {
92 this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
95 public CrudAsyncGraphDataService(GraphDao dao,
97 EventPublisher asyncRequestPublisher,
98 EventConsumer asyncResponseConsumer) throws CrudException {
102 this.daoForGet = daoForGet;
104 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
107 = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
108 } catch (NumberFormatException ex) {
109 // Leave it as the default
112 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
114 responsePollInterval = Integer
115 .parseInt(CrudProperties
116 .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
117 } catch (Exception ex) {
118 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
119 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
120 + " error: " + ex.getMessage());
123 // Start the Response Consumer timer
124 CrudAsyncResponseConsumer crudAsyncResponseConsumer
125 = new CrudAsyncResponseConsumer(asyncResponseConsumer);
126 timer = new Timer("crudAsyncResponseConsumer-1");
127 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
129 this.asyncRequestPublisher = asyncRequestPublisher;
131 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
132 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
135 public class CollectGraphResponse implements Callable<GraphEvent> {
136 private volatile GraphEvent graphEvent;
137 private volatile CountDownLatch latch = new CountDownLatch(1);
140 public GraphEvent call() throws TimeoutException {
142 // Wait until graphEvent is available
143 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
144 } catch (InterruptedException e) {
146 if (this.graphEvent != null) {
147 return this.graphEvent;
149 throw new TimeoutException();
152 return this.graphEvent;
155 public void populateGraphEvent(GraphEvent event) {
156 this.graphEvent = event;
161 private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
163 long startTimeInMs = System.currentTimeMillis();
164 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
165 MdcOverride override = new MdcOverride();
166 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
168 // publish to request queue
170 asyncRequestPublisher.sendSync(event.toJson());
171 } catch (Exception e) {
172 throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
175 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
177 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
178 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
179 + " , transaction-id: " + event.getTransactionId() + " , operation: "
180 + event.getOperation().toString());
182 ExecutorService executor = Executors
183 .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
184 CollectGraphResponse collector = new CollectGraphResponse();
185 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
187 Future<GraphEvent> future = executor.submit(collector);
189 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
191 } catch (InterruptedException | ExecutionException | TimeoutException e) {
192 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
193 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
194 "Request timed out for transactionId: " + event.getTransactionId());
196 throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
197 + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
199 //Kill the thread as the work is completed
200 executor.shutdownNow();
202 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
203 "Total elapsed time for operation: " + event.getOperation().toString()
204 + " , transactionId: " + event.getTransactionId() + " is "
205 + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
209 public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
210 // Validate the incoming payload
211 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
212 type, payload.getProperties());
213 // Create graph request event
214 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
215 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
217 GraphEvent response = sendAndWait(event);
218 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
219 logSuccessResponse(event, response);
220 return CrudResponseBuilder.buildUpsertVertexResponse(
221 OxmModelValidator.validateOutgoingPayload(version,
222 response.getVertex().toVertex()), version);
224 logErrorResponse(event, response);
225 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
226 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
231 public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
232 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
233 // Create graph request event
234 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
235 .edge(GraphEventEdge.fromEdge(edge, version)).build();
237 GraphEvent response = sendAndWait(event);
238 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
239 logSuccessResponse(event, response);
240 return CrudResponseBuilder.buildUpsertEdgeResponse(
241 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
244 logErrorResponse(event, response);
245 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
246 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
250 public String updateVertex(String version, String id, String type, VertexPayload payload)
251 throws CrudException {
252 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
253 type, payload.getProperties());
254 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
255 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
257 GraphEvent response = sendAndWait(event);
258 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
259 logSuccessResponse(event, response);
260 return CrudResponseBuilder.buildUpsertVertexResponse(
261 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
264 logErrorResponse(event, response);
265 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
266 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
271 public String patchVertex(String version, String id, String type, VertexPayload payload)
272 throws CrudException {
273 Vertex existingVertex
274 = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>());
275 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
276 type, payload.getProperties(),
278 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
279 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
281 GraphEvent response = sendAndWait(event);
282 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
283 logSuccessResponse(event, response);
284 return CrudResponseBuilder.buildUpsertVertexResponse(
285 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
288 logErrorResponse(event, response);
289 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
290 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
295 public String deleteVertex(String version, String id, String type) throws CrudException {
296 type = OxmModelValidator.resolveCollectionType(version, type);
297 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
298 .vertex(new GraphEventVertex(id, version, type, null)).build();
300 GraphEvent response = sendAndWait(event);
301 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
302 logSuccessResponse(event, response);
305 logErrorResponse(event, response);
306 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
307 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
312 public String deleteEdge(String version, String id, String type) throws CrudException {
313 RelationshipSchemaValidator.validateType(version, type);
314 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
315 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
317 GraphEvent response = sendAndWait(event);
318 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
319 logSuccessResponse(event, response);
322 logErrorResponse(event, response);
323 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
324 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
329 public String updateEdge(String version, String id, String type, EdgePayload payload)
330 throws CrudException {
331 Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
332 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
334 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
335 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
337 GraphEvent response = sendAndWait(event);
338 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
339 logSuccessResponse(event, response);
340 return CrudResponseBuilder.buildUpsertEdgeResponse(
341 RelationshipSchemaValidator.validateOutgoingPayload(version,
342 response.getEdge().toEdge()), version);
344 logErrorResponse(event, response);
345 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
346 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
351 public String patchEdge(String version, String id, String type, EdgePayload payload)
352 throws CrudException {
353 Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
354 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
356 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
357 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
359 GraphEvent response = sendAndWait(event);
360 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
361 logSuccessResponse(event, response);
362 return CrudResponseBuilder.buildUpsertEdgeResponse(
363 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
366 logErrorResponse(event, response);
367 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
368 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
374 protected void preShutdown() {
380 protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
381 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
382 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
383 event.setDbTransactionId(dbTransId);
384 GraphEvent response = publishEvent(event);
385 return response.getVertex().toVertex();
389 protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
390 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
391 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
392 event.setDbTransactionId(dbTransId);
393 GraphEvent response = publishEvent(event);
394 return response.getVertex().toVertex();
398 protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
399 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
400 event.setDbTransactionId(dbTransId);
405 protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
406 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
407 event.setDbTransactionId(dbTransId);
408 GraphEvent response = publishEvent(event);
409 return response.getEdge().toEdge();
413 protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
414 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
415 event.setDbTransactionId(dbTransId);
416 GraphEvent response = publishEvent(event);
417 return response.getEdge().toEdge();
421 protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
422 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
423 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
424 event.setDbTransactionId(dbTransId);
428 private GraphEvent publishEvent(GraphEvent event) throws CrudException {
429 GraphEvent response = sendAndWait(event);
430 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
431 logSuccessResponse(event, response);
433 logErrorResponse(event, response);
434 throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
435 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
441 private void logSuccessResponse(GraphEvent event, GraphEvent response) {
442 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
443 "Event response received: " + response.getObjectType() + " with key: "
444 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
445 + " , operation: " + event.getOperation().toString() + " , result: "
446 + response.getResult());
449 private void logErrorResponse(GraphEvent event, GraphEvent response) {
450 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
451 "Event response received: " + response.getObjectType() + " with key: "
452 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
453 + " , operation: " + event.getOperation().toString() + " , result: "
454 + response.getResult() + " , error: " + response.getErrorMessage());