2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 package org.onap.crud.service;
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.schema.OxmModelValidator;
48 import org.onap.schema.RelationshipSchemaValidator;
50 import java.text.SimpleDateFormat;
51 import java.util.Timer;
52 import java.util.concurrent.Callable;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.ExecutionException;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.TimeoutException;
60 import javax.annotation.PreDestroy;
61 import javax.ws.rs.core.Response.Status;
63 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
65 private static Integer requestTimeOut;
67 private EventPublisher asyncRequestPublisher;
71 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
72 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
74 private static Logger logger = LoggerFactory.getInstance()
75 .getLogger(CrudAsyncGraphDataService.class.getName());
76 private static Logger metricsLogger = LoggerFactory.getInstance()
77 .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
78 private static LogFields OK_FIELDS = new LogFields();
81 OK_FIELDS.setField(Status.OK, Status.OK.toString());
84 public static Integer getRequestTimeOut() {
85 return requestTimeOut;
88 public CrudAsyncGraphDataService(GraphDao dao,
89 EventPublisher asyncRequestPublisher,
90 EventConsumer asyncResponseConsumer) throws CrudException {
91 this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
94 public CrudAsyncGraphDataService(GraphDao dao,
96 EventPublisher asyncRequestPublisher,
97 EventConsumer asyncResponseConsumer) throws CrudException {
101 this.daoForGet = daoForGet;
103 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
106 = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
107 } catch (NumberFormatException ex) {
108 // Leave it as the default
111 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
113 responsePollInterval = Integer
114 .parseInt(CrudProperties
115 .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
116 } catch (Exception ex) {
117 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
118 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
119 + " error: " + ex.getMessage());
122 // Start the Response Consumer timer
123 CrudAsyncResponseConsumer crudAsyncResponseConsumer
124 = new CrudAsyncResponseConsumer(asyncResponseConsumer);
125 timer = new Timer("crudAsyncResponseConsumer-1");
126 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
128 this.asyncRequestPublisher = asyncRequestPublisher;
130 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
131 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
134 public class CollectGraphResponse implements Callable<GraphEvent> {
135 private volatile GraphEvent graphEvent;
136 private volatile CountDownLatch latch = new CountDownLatch(1);
139 public GraphEvent call() throws TimeoutException {
141 // Wait until graphEvent is available
142 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
143 } catch (InterruptedException e) {
145 if (this.graphEvent != null) {
146 return this.graphEvent;
148 throw new TimeoutException();
151 return this.graphEvent;
154 public void populateGraphEvent(GraphEvent event) {
155 this.graphEvent = event;
160 private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
162 long startTimeInMs = System.currentTimeMillis();
163 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
164 MdcOverride override = new MdcOverride();
165 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
167 // publish to request queue
169 asyncRequestPublisher.sendSync(event.toJson());
170 } catch (Exception e) {
171 throw new CrudException("Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
174 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
176 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
177 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
178 + " , transaction-id: " + event.getTransactionId() + " , operation: "
179 + event.getOperation().toString());
181 ExecutorService executor = Executors
182 .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
183 CollectGraphResponse collector = new CollectGraphResponse();
184 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
186 Future<GraphEvent> future = executor.submit(collector);
188 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
190 } catch (InterruptedException | ExecutionException | TimeoutException e) {
191 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
192 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
193 "Request timed out for transactionId: " + event.getTransactionId());
195 throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
196 + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
198 //Kill the thread as the work is completed
199 executor.shutdownNow();
201 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
202 "Total elapsed time for operation: " + event.getOperation().toString()
203 + " , transactionId: " + event.getTransactionId() + " is "
204 + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
208 public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
209 // Validate the incoming payload
210 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
211 type, payload.getProperties());
212 // Create graph request event
213 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
214 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
216 GraphEvent response = sendAndWait(event);
217 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
218 logSuccessResponse(event, response);
219 return CrudResponseBuilder.buildUpsertVertexResponse(
220 OxmModelValidator.validateOutgoingPayload(version,
221 response.getVertex().toVertex()), version);
223 logErrorResponse(event, response);
224 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
225 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
230 public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
231 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
232 // Create graph request event
233 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
234 .edge(GraphEventEdge.fromEdge(edge, version)).build();
236 GraphEvent response = sendAndWait(event);
237 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
238 logSuccessResponse(event, response);
239 return CrudResponseBuilder.buildUpsertEdgeResponse(
240 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
243 logErrorResponse(event, response);
244 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
245 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
249 public String updateVertex(String version, String id, String type, VertexPayload payload)
250 throws CrudException {
251 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
252 type, payload.getProperties());
253 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
254 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
256 GraphEvent response = sendAndWait(event);
257 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
258 logSuccessResponse(event, response);
259 return CrudResponseBuilder.buildUpsertVertexResponse(
260 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
263 logErrorResponse(event, response);
264 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
265 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
270 public String patchVertex(String version, String id, String type, VertexPayload payload)
271 throws CrudException {
272 Vertex existingVertex
273 = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version);
274 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
275 type, payload.getProperties(),
277 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
278 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
280 GraphEvent response = sendAndWait(event);
281 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
282 logSuccessResponse(event, response);
283 return CrudResponseBuilder.buildUpsertVertexResponse(
284 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
287 logErrorResponse(event, response);
288 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
289 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
294 public String deleteVertex(String version, String id, String type) throws CrudException {
295 type = OxmModelValidator.resolveCollectionType(version, type);
296 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
297 .vertex(new GraphEventVertex(id, version, type, null)).build();
299 GraphEvent response = sendAndWait(event);
300 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
301 logSuccessResponse(event, response);
304 logErrorResponse(event, response);
305 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
306 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
311 public String deleteEdge(String version, String id, String type) throws CrudException {
312 RelationshipSchemaValidator.validateType(version, type);
313 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
314 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
316 GraphEvent response = sendAndWait(event);
317 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
318 logSuccessResponse(event, response);
321 logErrorResponse(event, response);
322 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
323 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
328 public String updateEdge(String version, String id, String type, EdgePayload payload)
329 throws CrudException {
330 Edge edge = dao.getEdge(id, type);
331 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
333 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
334 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
336 GraphEvent response = sendAndWait(event);
337 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
338 logSuccessResponse(event, response);
339 return CrudResponseBuilder.buildUpsertEdgeResponse(
340 RelationshipSchemaValidator.validateOutgoingPayload(version,
341 response.getEdge().toEdge()), version);
343 logErrorResponse(event, response);
344 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
345 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
350 public String patchEdge(String version, String id, String type, EdgePayload payload)
351 throws CrudException {
352 Edge edge = dao.getEdge(id, type);
353 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
355 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
356 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
358 GraphEvent response = sendAndWait(event);
359 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
360 logSuccessResponse(event, response);
361 return CrudResponseBuilder.buildUpsertEdgeResponse(
362 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
365 logErrorResponse(event, response);
366 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
367 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
373 protected void preShutdown() {
379 protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
380 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
381 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
382 event.setDbTransactionId(dbTransId);
383 GraphEvent response = publishEvent(event);
384 return response.getVertex().toVertex();
388 protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
389 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
390 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
391 event.setDbTransactionId(dbTransId);
392 GraphEvent response = publishEvent(event);
393 return response.getVertex().toVertex();
397 protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
398 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
399 event.setDbTransactionId(dbTransId);
404 protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
405 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
406 event.setDbTransactionId(dbTransId);
407 GraphEvent response = publishEvent(event);
408 return response.getEdge().toEdge();
412 protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
413 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
414 event.setDbTransactionId(dbTransId);
415 GraphEvent response = publishEvent(event);
416 return response.getEdge().toEdge();
420 protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
421 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
422 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
423 event.setDbTransactionId(dbTransId);
427 private GraphEvent publishEvent(GraphEvent event) throws CrudException {
428 GraphEvent response = sendAndWait(event);
429 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
430 logSuccessResponse(event, response);
432 logErrorResponse(event, response);
433 throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
434 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
440 private void logSuccessResponse(GraphEvent event, GraphEvent response) {
441 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
442 "Event response received: " + response.getObjectType() + " with key: "
443 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
444 + " , operation: " + event.getOperation().toString() + " , result: "
445 + response.getResult());
448 private void logErrorResponse(GraphEvent event, GraphEvent response) {
449 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
450 "Event response received: " + response.getObjectType() + " with key: "
451 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
452 + " , operation: " + event.getOperation().toString() + " , result: "
453 + response.getResult() + " , error: " + response.getErrorMessage());