2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
21 package org.onap.crud.service;
23 import java.text.SimpleDateFormat;
24 import java.util.HashMap;
25 import java.util.Timer;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import javax.annotation.PreDestroy;
35 import javax.ws.rs.core.Response.Status;
36 import org.onap.aai.cl.api.LogFields;
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.cl.mdc.MdcOverride;
41 import org.onap.aai.event.api.EventConsumer;
42 import org.onap.aai.event.api.EventPublisher;
43 import org.onap.crud.dao.GraphDao;
44 import org.onap.crud.entity.Edge;
45 import org.onap.crud.entity.Vertex;
46 import org.onap.crud.event.GraphEvent;
47 import org.onap.crud.event.GraphEvent.GraphEventOperation;
48 import org.onap.crud.event.GraphEventEdge;
49 import org.onap.crud.event.GraphEventVertex;
50 import org.onap.crud.event.envelope.GraphEventEnvelope;
51 import org.onap.crud.event.response.GraphEventResponseHandler;
52 import org.onap.crud.exception.CrudException;
53 import org.onap.crud.logging.CrudServiceMsgs;
54 import org.onap.crud.util.CrudProperties;
55 import org.onap.crud.util.CrudServiceConstants;
56 import org.onap.schema.OxmModelValidator;
57 import org.onap.schema.RelationshipSchemaValidator;
59 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
61 private static Integer requestTimeOut;
63 private EventPublisher asyncRequestPublisher;
67 public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
68 private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
70 private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
71 private static Logger metricsLogger =
72 LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
73 private static LogFields okFields = new LogFields();
76 okFields.setField(Status.OK, Status.OK.toString());
79 private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
81 public static Integer getRequestTimeOut() {
82 return requestTimeOut;
85 public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
86 EventConsumer asyncResponseConsumer) throws CrudException {
87 this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
90 public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
91 EventConsumer asyncResponseConsumer) throws CrudException {
95 this.daoForGet = daoForGet;
97 requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
99 requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
100 } catch (NumberFormatException ex) {
101 // Leave it as the default
104 Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
106 responsePollInterval =
107 Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
108 } catch (Exception ex) {
109 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
110 + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
113 // Start the Response Consumer timer
114 CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(asyncResponseConsumer);
115 timer = new Timer("crudAsyncResponseConsumer-1");
116 timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
118 this.asyncRequestPublisher = asyncRequestPublisher;
120 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
123 public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
124 private volatile GraphEventEnvelope graphEventEnvelope;
125 private volatile CountDownLatch latch = new CountDownLatch(1);
128 public GraphEventEnvelope call() throws TimeoutException {
130 // Wait until graphEvent is available
131 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
132 } catch (InterruptedException e) {
134 if (this.graphEventEnvelope != null) {
135 return this.graphEventEnvelope;
137 throw new TimeoutException();
140 return this.graphEventEnvelope;
143 public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
144 this.graphEventEnvelope = eventEnvelope;
149 private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
151 long startTimeInMs = System.currentTimeMillis();
152 SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
153 MdcOverride override = new MdcOverride();
154 override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
156 String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
158 // publish to request queue
160 asyncRequestPublisher.sendSync(eventEnvelopeJson);
161 } catch (Exception e) {
162 throw new CrudException(
163 "Error publishing request " + event.getTransactionId() + " Cause: " + e.getMessage(),
164 Status.INTERNAL_SERVER_ERROR);
167 logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
169 logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
170 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
171 + " , transaction-id: " + event.getTransactionId() + " , operation: "
172 + event.getOperation().toString());
174 ExecutorService executor =
175 Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
176 CollectGraphResponse collector = new CollectGraphResponse();
177 CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
178 GraphEventEnvelope response;
179 Future<GraphEventEnvelope> future = executor.submit(collector);
181 response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
183 } catch (InterruptedException | ExecutionException | TimeoutException e) {
184 CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
185 logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
186 "Request timed out for transactionId: " + event.getTransactionId());
188 throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
189 + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
191 // Kill the thread as the work is completed
192 executor.shutdownNow();
194 metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
195 "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
196 + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
202 public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
203 // Validate the incoming payload
204 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
205 vertex.getProperties().put(org.onap.schema.OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
206 // Create graph request event
207 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
208 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
210 GraphEventEnvelope response = sendAndWait(event);
211 return responseHandler.handleVertexResponse(version, event, response);
215 public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
216 Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
217 // Create graph request event
219 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
221 GraphEventEnvelope response = sendAndWait(event);
222 return responseHandler.handleEdgeResponse(version, event, response);
226 public String updateVertex(String version, String id, String type, VertexPayload payload) throws CrudException {
227 Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
228 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
229 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
231 GraphEventEnvelope response = sendAndWait(event);
232 return responseHandler.handleVertexResponse(version, event, response);
236 public String patchVertex(String version, String id, String type, VertexPayload payload) throws CrudException {
237 Vertex existingVertex = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
238 new HashMap<String, String>());
239 Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
240 payload.getProperties(), existingVertex);
241 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
242 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
244 GraphEventEnvelope response = sendAndWait(event);
245 return responseHandler.handleVertexResponse(version, event, response);
249 public String deleteVertex(String version, String id, String type) throws CrudException {
250 type = OxmModelValidator.resolveCollectionType(version, type);
251 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
252 .vertex(new GraphEventVertex(id, version, type, null)).build();
254 GraphEventEnvelope response = sendAndWait(event);
255 return responseHandler.handleDeletionResponse(event, response);
259 public String deleteEdge(String version, String id, String type) throws CrudException {
260 RelationshipSchemaValidator.validateType(version, type);
261 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
262 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
264 GraphEventEnvelope response = sendAndWait(event);
265 return responseHandler.handleDeletionResponse(event, response);
269 public String updateEdge(String version, String id, String type, EdgePayload payload) throws CrudException {
270 Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
271 Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
272 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
273 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
275 GraphEventEnvelope response = sendAndWait(event);
276 return responseHandler.handleEdgeResponse(version, event, response);
280 public String patchEdge(String version, String id, String type, EdgePayload payload) throws CrudException {
281 Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
282 Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
283 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
284 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
286 GraphEventEnvelope response = sendAndWait(event);
287 return responseHandler.handleEdgeResponse(version, event, response);
291 protected void preShutdown() {
296 protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
297 GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
298 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
299 event.setDbTransactionId(dbTransId);
300 GraphEvent response = publishEvent(event);
301 return response.getVertex().toVertex();
305 protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
306 GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
307 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
308 event.setDbTransactionId(dbTransId);
309 GraphEvent response = publishEvent(event);
310 return response.getVertex().toVertex();
314 protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
315 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
316 .vertex(new GraphEventVertex(id, version, type, null)).build();
317 event.setDbTransactionId(dbTransId);
322 protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
324 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
325 event.setDbTransactionId(dbTransId);
326 GraphEvent response = publishEvent(event);
327 return response.getEdge().toEdge();
331 protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
333 GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
334 event.setDbTransactionId(dbTransId);
335 GraphEvent response = publishEvent(event);
336 return response.getEdge().toEdge();
340 protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
341 GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
342 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
343 event.setDbTransactionId(dbTransId);
347 private GraphEvent publishEvent(GraphEvent event) throws CrudException {
348 GraphEventEnvelope response = sendAndWait(event);
349 responseHandler.handleBulkEventResponse(event, response);
350 return response.getBody();