2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.crud.service;
23 import java.io.IOException;
24 import java.security.NoSuchAlgorithmException;
25 import java.text.SimpleDateFormat;
26 import java.util.HashMap;
27 import java.util.Timer;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import javax.annotation.PreDestroy;
37 import javax.ws.rs.core.EntityTag;
38 import javax.ws.rs.core.Response.Status;
39 import org.apache.commons.lang3.tuple.ImmutablePair;
40 import org.onap.aai.cl.api.LogFields;
41 import org.onap.aai.cl.api.Logger;
42 import org.onap.aai.cl.eelf.LoggerFactory;
43 import org.onap.aai.cl.mdc.MdcContext;
44 import org.onap.aai.cl.mdc.MdcOverride;
45 import org.onap.aai.event.api.EventConsumer;
46 import org.onap.aai.event.api.EventPublisher;
47 import org.onap.aai.restclient.client.OperationResult;
48 import org.onap.crud.dao.GraphDao;
49 import org.onap.crud.entity.Edge;
50 import org.onap.crud.entity.Vertex;
51 import org.onap.crud.event.GraphEvent;
52 import org.onap.crud.event.GraphEvent.GraphEventOperation;
53 import org.onap.crud.event.GraphEventEdge;
54 import org.onap.crud.event.GraphEventVertex;
55 import org.onap.crud.event.envelope.GraphEventEnvelope;
56 import org.onap.crud.event.response.GraphEventResponseHandler;
57 import org.onap.crud.exception.CrudException;
58 import org.onap.crud.logging.CrudServiceMsgs;
59 import org.onap.crud.parser.EdgePayload;
60 import org.onap.crud.parser.VertexPayload;
61 import org.onap.crud.util.CrudProperties;
62 import org.onap.crud.util.CrudServiceConstants;
63 import org.onap.crud.util.etag.EtagGenerator;
64 import org.onap.schema.validation.OxmModelValidator;
65 import org.onap.schema.validation.RelationshipSchemaValidator;
67 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
69 private static Integer requestTimeOut;
71 private EventPublisher asyncRequestPublisher;
75 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
76 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
78 private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
79 private static Logger metricsLogger =
80 LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
81 private static LogFields okFields = new LogFields();
82 private EtagGenerator etagGenerator;
85 okFields.setField(Status.OK, Status.OK.toString());
88 private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
90 public static Integer getRequestTimeOut() {
91 return requestTimeOut;
94 public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
95 EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
96 this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
99 public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
100 EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
104 this.daoForGet = daoForGet;
106 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
108 requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
109 } catch (NumberFormatException ex) {
110 // Leave it as the default
113 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
115 responsePollInterval =
116 Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
117 } catch (Exception ex) {
118 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
119 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
122 // Start the Response Consumer timer
123 CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(
124 asyncResponseConsumer, new GraphEventUpdater()
126 timer = new Timer("crudAsyncResponseConsumer-1");
127 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
129 this.asyncRequestPublisher = asyncRequestPublisher;
130 this.etagGenerator = new EtagGenerator();
132 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
135 public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
136 private volatile GraphEventEnvelope graphEventEnvelope;
137 private volatile CountDownLatch latch = new CountDownLatch(1);
140 public GraphEventEnvelope call() throws TimeoutException {
142 // Wait until graphEvent is available
143 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
144 } catch (InterruptedException e) {
146 if (this.graphEventEnvelope != null) {
147 return this.graphEventEnvelope;
149 throw new TimeoutException();
152 return this.graphEventEnvelope;
155 public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
156 this.graphEventEnvelope = eventEnvelope;
161 private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
163 long startTimeInMs = System.currentTimeMillis();
164 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
165 MdcOverride override = new MdcOverride();
166 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
168 String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
170 // publish to request queue
172 asyncRequestPublisher.sendSync(eventEnvelopeJson);
173 } catch (Exception e) {
174 throw new CrudException(
175 "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(),
176 Status.INTERNAL_SERVER_ERROR);
179 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
181 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
182 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
183 + " , transaction-id: " + event.getTransactionId() + " , operation: "
184 + event.getOperation().toString());
186 ExecutorService executor =
187 Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
188 CollectGraphResponse collector = new CollectGraphResponse();
189 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
190 GraphEventEnvelope response;
191 Future<GraphEventEnvelope> future = executor.submit(collector);
193 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
195 } catch (InterruptedException | ExecutionException | TimeoutException e) {
196 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
197 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
198 "Request timed out for transactionId: " + event.getTransactionId());
200 throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
201 + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
203 // Kill the thread as the work is completed
204 executor.shutdownNow();
206 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
207 "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
208 + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
214 public ImmutablePair<EntityTag, String> addVertex(String version, String type, VertexPayload payload)
215 throws CrudException {
216 // Validate the incoming payload
217 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
218 vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
219 // Create graph request event
220 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
221 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
223 GraphEventEnvelope response = sendAndWait(event);
227 entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
228 } catch (IOException e) {
229 throw new CrudException(e);
231 String responsePayload = responseHandler.handleVertexResponse(version, event, response);
233 return new ImmutablePair<>(entityTag, responsePayload);
237 public ImmutablePair<EntityTag, String> addEdge(String version, String type, EdgePayload payload)
238 throws CrudException {
239 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
241 // Create graph request event
243 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
245 GraphEventEnvelope response = sendAndWait(event);
249 entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
250 } catch (IOException e) {
251 throw new CrudException(e);
253 String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
255 return new ImmutablePair<>(entityTag, responsePayload);
259 public ImmutablePair<EntityTag, String> updateVertex(String version, String id, String type, VertexPayload payload)
260 throws CrudException {
261 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
262 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
263 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
265 GraphEventEnvelope response = sendAndWait(event);
269 entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
270 } catch (IOException e) {
271 throw new CrudException(e);
273 String responsePayload = responseHandler.handleVertexResponse(version, event, response);
275 return new ImmutablePair<>(entityTag, responsePayload);
279 public ImmutablePair<EntityTag, String> patchVertex(String version, String id, String type, VertexPayload payload)
280 throws CrudException {
281 OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
282 new HashMap<String, String>());
283 Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
284 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
285 payload.getProperties(), existingVertex);
286 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
287 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
289 GraphEventEnvelope response = sendAndWait(event);
293 entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
294 } catch (IOException e) {
295 throw new CrudException(e);
297 String responsePayload = responseHandler.handleVertexResponse(version, event, response);
299 return new ImmutablePair<>(entityTag, responsePayload);
303 public String deleteVertex(String version, String id, String type) throws CrudException {
304 type = OxmModelValidator.resolveCollectionType(version, type);
305 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
306 .vertex(new GraphEventVertex(id, version, type, null)).build();
308 GraphEventEnvelope response = sendAndWait(event);
309 return responseHandler.handleDeletionResponse(event, response);
313 public String deleteEdge(String version, String id, String type) throws CrudException {
314 RelationshipSchemaValidator.validateType(version, type);
315 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
316 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
318 GraphEventEnvelope response = sendAndWait(event);
319 return responseHandler.handleDeletionResponse(event, response);
323 public ImmutablePair<EntityTag, String> updateEdge(String version, String id, String type, EdgePayload payload)
324 throws CrudException {
325 OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
326 Edge edge = Edge.fromJson(operationResult.getResult());
327 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
329 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
330 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
332 GraphEventEnvelope response = sendAndWait(event);
336 entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
337 } catch (IOException e) {
338 throw new CrudException(e);
340 String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
342 return new ImmutablePair<>(entityTag, responsePayload);
346 public ImmutablePair<EntityTag, String> patchEdge(String version, String id, String type, EdgePayload payload)
347 throws CrudException {
348 OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
349 Edge edge = Edge.fromJson(operationResult.getResult());
350 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
351 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
352 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
354 GraphEventEnvelope response = sendAndWait(event);
358 entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
359 } catch (IOException e) {
360 throw new CrudException(e);
362 String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
364 return new ImmutablePair<>(entityTag, responsePayload);
368 protected void preShutdown() {
373 protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
374 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
375 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
376 event.setDbTransactionId(dbTransId);
377 GraphEvent response = publishEvent(event);
378 return response.getVertex().toVertex();
382 protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
383 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
384 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
385 event.setDbTransactionId(dbTransId);
386 GraphEvent response = publishEvent(event);
387 return response.getVertex().toVertex();
391 protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
392 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
393 .vertex(new GraphEventVertex(id, version, type, null)).build();
394 event.setDbTransactionId(dbTransId);
399 protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
401 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
402 event.setDbTransactionId(dbTransId);
403 GraphEvent response = publishEvent(event);
404 return response.getEdge().toEdge();
408 protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
410 GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
411 event.setDbTransactionId(dbTransId);
412 GraphEvent response = publishEvent(event);
413 return response.getEdge().toEdge();
417 protected void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException {
421 Edge edge = daoForGet.getEdge(id);
422 type = edge.getType();
424 catch (CrudException ex) {
425 // Likely the client is trying to delete an edge which isn't present. Just swallow the exception
426 // and let the bulk request fail via the normal path.
429 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
430 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
431 event.setDbTransactionId(dbTransId);
435 private GraphEvent publishEvent(GraphEvent event) throws CrudException {
436 GraphEventEnvelope response = sendAndWait(event);
437 responseHandler.handleBulkEventResponse(event, response);
438 return response.getBody();