2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
24 package org.onap.crud.service;
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.crud.util.CrudServiceUtil;
48 import org.onap.schema.OxmModelValidator;
49 import org.onap.schema.RelationshipSchemaValidator;
51 import java.text.SimpleDateFormat;
52 import java.util.List;
54 import java.util.Timer;
55 import java.util.concurrent.Callable;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.ExecutionException;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.Future;
61 import java.util.concurrent.TimeUnit;
62 import java.util.concurrent.TimeoutException;
63 import javax.annotation.PreDestroy;
64 import javax.ws.rs.core.Response.Status;
66 public class CrudAsyncGraphDataService {
68 private static Integer requestTimeOut;
72 private EventPublisher asyncRequestPublisher;
76 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
77 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
79 private static Logger logger = LoggerFactory.getInstance()
80 .getLogger(CrudAsyncGraphDataService.class.getName());
81 private static Logger metricsLogger = LoggerFactory.getInstance()
82 .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
83 private static LogFields OK_FIELDS = new LogFields();
86 OK_FIELDS.setField(Status.OK, Status.OK.toString());
89 public static Integer getRequestTimeOut() {
90 return requestTimeOut;
93 public CrudAsyncGraphDataService(GraphDao dao,
94 EventPublisher asyncRequestPublisher,
95 EventConsumer asyncResponseConsumer) throws CrudException {
97 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
100 = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
101 } catch (NumberFormatException ex) {
102 // Leave it as the default
105 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
107 responsePollInterval = Integer
108 .parseInt(CrudProperties
109 .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
110 } catch (Exception ex) {
111 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
112 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
113 + " error: " + ex.getMessage());
118 // Start the Response Consumer timer
119 CrudAsyncResponseConsumer crudAsyncResponseConsumer
120 = new CrudAsyncResponseConsumer(asyncResponseConsumer);
121 timer = new Timer("crudAsyncResponseConsumer-1");
122 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
124 this.asyncRequestPublisher = asyncRequestPublisher;
127 CrudServiceUtil.loadModels();
128 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
129 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
132 public class CollectGraphResponse implements Callable<GraphEvent> {
133 private volatile GraphEvent graphEvent;
134 private volatile CountDownLatch latch = new CountDownLatch(1);
137 public GraphEvent call() throws TimeoutException {
139 // Wait until graphEvent is available
140 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
141 } catch (InterruptedException e) {
143 if (this.graphEvent != null) {
144 return this.graphEvent;
146 throw new TimeoutException();
149 return this.graphEvent;
152 public void populateGraphEvent(GraphEvent event) {
153 this.graphEvent = event;
158 private GraphEvent sendAndWait(GraphEvent event) throws Exception {
160 long startTimeInMs = System.currentTimeMillis();
161 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
162 MdcOverride override = new MdcOverride();
163 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
165 // publish to request queue
166 asyncRequestPublisher.sendSync(event.toJson());
168 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
169 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
170 + " , transaction-id: " + event.getTransactionId() + " , operation: "
171 + event.getOperation().toString());
173 ExecutorService executor = Executors
174 .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
175 CollectGraphResponse collector = new CollectGraphResponse();
176 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
178 Future<GraphEvent> future = executor.submit(collector);
180 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
182 } catch (InterruptedException | ExecutionException | TimeoutException e) {
183 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
184 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
185 "Request timed out for transactionId: " + event.getTransactionId());
187 throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
188 + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
190 //Kill the thread as the work is completed
191 executor.shutdownNow();
193 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
194 "Total elapsed time for operation: " + event.getOperation().toString()
195 + " , transactionId: " + event.getTransactionId() + " is "
196 + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
200 public String addVertex(String version, String type, VertexPayload payload) throws Exception {
201 // Validate the incoming payload
202 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
203 type, payload.getProperties());
204 // Create graph request event
205 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
206 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
208 GraphEvent response = sendAndWait(event);
209 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
210 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
211 "Event response received: " + response.getObjectType() + " with key: "
212 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
213 + " , operation: " + event.getOperation().toString() + " , result: "
214 + response.getResult());
215 return CrudResponseBuilder.buildUpsertVertexResponse(
216 OxmModelValidator.validateOutgoingPayload(version,
217 response.getVertex().toVertex()), version);
219 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
220 "Event response received: " + response.getObjectType() + " with key: "
221 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
222 + " , operation: " + event.getOperation().toString() + " , result: "
223 + response.getResult() + " , error: " + response.getErrorMessage());
224 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
225 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
230 public String addEdge(String version, String type, EdgePayload payload) throws Exception {
231 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
232 // Create graph request event
233 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
234 .edge(GraphEventEdge.fromEdge(edge, version)).build();
236 GraphEvent response = sendAndWait(event);
237 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
238 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
239 "Event response received: " + response.getObjectType() + " with key: "
240 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
241 + " , operation: " + event.getOperation().toString() + " , result: "
242 + response.getResult());
243 return CrudResponseBuilder.buildUpsertEdgeResponse(
244 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
247 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
248 "Event response received: " + response.getObjectType() + " with key: "
249 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
250 + " , operation: " + event.getOperation().toString() + " , result: "
251 + response.getResult() + " , error: " + response.getErrorMessage());
252 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
253 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
257 public String updateVertex(String version, String id, String type, VertexPayload payload)
259 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
260 type, payload.getProperties());
261 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
262 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
264 GraphEvent response = sendAndWait(event);
265 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
266 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
267 "Event response received: " + response.getObjectType() + " with key: "
268 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
269 + " , operation: " + event.getOperation().toString() + " , result: "
270 + response.getResult());
271 return CrudResponseBuilder.buildUpsertVertexResponse(
272 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
275 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
276 "Event response received: " + response.getObjectType() + " with key: "
277 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
278 + " , operation: " + event.getOperation().toString() + " , result: "
279 + response.getResult() + " , error: " + response.getErrorMessage());
280 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
281 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
286 public String patchVertex(String version, String id, String type, VertexPayload payload)
288 Vertex existingVertex
289 = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
290 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
291 type, payload.getProperties(),
293 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
294 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
296 GraphEvent response = sendAndWait(event);
297 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
298 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
299 "Event response received: " + response.getObjectType() + " with key: "
300 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
301 + " , operation: " + event.getOperation().toString() + " , result: "
302 + response.getResult());
303 return CrudResponseBuilder.buildUpsertVertexResponse(
304 OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
307 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
308 "Event response received: " + response.getObjectType() + " with key: "
309 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
310 + " , operation: " + event.getOperation().toString() + " , result: "
311 + response.getResult() + " , error: " + response.getErrorMessage());
312 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
313 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
318 public String deleteVertex(String version, String id, String type) throws Exception {
319 type = OxmModelValidator.resolveCollectionType(version, type);
320 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
321 .vertex(new GraphEventVertex(id, version, type, null)).build();
323 GraphEvent response = sendAndWait(event);
324 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
325 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
326 "Event response received: " + response.getObjectType() + " with key: "
327 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
328 + " , operation: " + event.getOperation().toString() + " , result: "
329 + response.getResult());
332 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
333 "Event response received: " + response.getObjectType() + " with key: "
334 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
335 + " , operation: " + event.getOperation().toString() + " , result: "
336 + response.getResult() + " , error: " + response.getErrorMessage());
337 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
338 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
343 public String deleteEdge(String version, String id, String type) throws Exception {
344 RelationshipSchemaValidator.validateType(version, type);
345 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
346 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
348 GraphEvent response = sendAndWait(event);
349 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
350 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
351 "Event response received: " + response.getObjectType() + " with key: "
352 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
353 + " , operation: " + event.getOperation().toString() + " , result: "
354 + response.getResult());
357 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
358 "Event response received: " + response.getObjectType() + " with key: "
359 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
360 + " , operation: " + event.getOperation().toString() + " , result: "
361 + response.getResult() + " , error: " + response.getErrorMessage());
362 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
363 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
368 public String updateEdge(String version, String id, String type, EdgePayload payload)
370 Edge edge = dao.getEdge(id, type);
371 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
373 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
374 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
376 GraphEvent response = sendAndWait(event);
377 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
378 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
379 "Event response received: " + response.getObjectType() + " with key: "
380 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
381 + " , operation: " + event.getOperation().toString() + " , result: "
382 + response.getResult());
383 return CrudResponseBuilder.buildUpsertEdgeResponse(
384 RelationshipSchemaValidator.validateOutgoingPayload(version,
385 response.getEdge().toEdge()), version);
387 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
388 "Event response received: " + response.getObjectType() + " with key: "
389 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
390 + " , operation: " + event.getOperation().toString() + " , result: "
391 + response.getResult() + " , error: " + response.getErrorMessage());
392 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
393 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
398 public String patchEdge(String version, String id, String type, EdgePayload payload)
400 Edge edge = dao.getEdge(id, type);
401 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
403 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
404 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
406 GraphEvent response = sendAndWait(event);
407 if (response.getResult().equals(GraphEventResult.SUCCESS)) {
408 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
409 "Event response received: " + response.getObjectType() + " with key: "
410 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
411 + " , operation: " + event.getOperation().toString() + " , result: "
412 + response.getResult());
413 return CrudResponseBuilder.buildUpsertEdgeResponse(
414 RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
417 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
418 "Event response received: " + response.getObjectType() + " with key: "
419 + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
420 + " , operation: " + event.getOperation().toString() + " , result: "
421 + response.getResult() + " , error: " + response.getErrorMessage());
422 throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
423 + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
428 public String getEdge(String version, String id, String type) throws CrudException {
429 RelationshipSchemaValidator.validateType(version, type);
430 Edge edge = dao.getEdge(id, type);
432 return CrudResponseBuilder.buildGetEdgeResponse(RelationshipSchemaValidator
433 .validateOutgoingPayload(version, edge),
437 public String getEdges(String version, String type, Map<String, String> filter)
438 throws CrudException {
439 RelationshipSchemaValidator.validateType(version, type);
440 List<Edge> items = dao.getEdges(type,
441 RelationshipSchemaValidator.resolveCollectionfilter(version, type, filter));
442 return CrudResponseBuilder.buildGetEdgesResponse(items, version);
445 public String getVertex(String version, String id, String type) throws CrudException {
446 type = OxmModelValidator.resolveCollectionType(version, type);
447 Vertex vertex = dao.getVertex(id, type);
448 List<Edge> edges = dao.getVertexEdges(id);
449 return CrudResponseBuilder.buildGetVertexResponse(OxmModelValidator
450 .validateOutgoingPayload(version, vertex), edges,
454 public String getVertices(String version, String type, Map<String, String> filter)
455 throws CrudException {
456 type = OxmModelValidator.resolveCollectionType(version, type);
457 List<Vertex> items = dao.getVertices(type,
458 OxmModelValidator.resolveCollectionfilter(version, type, filter));
459 return CrudResponseBuilder.buildGetVerticesResponse(items, version);
463 protected void preShutdown() {