Fix GET ALL for vertices created asynchronously
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.crud.service;
22
23 import java.text.SimpleDateFormat;
24 import java.util.HashMap;
25 import java.util.Timer;
26 import java.util.concurrent.Callable;
27 import java.util.concurrent.CountDownLatch;
28 import java.util.concurrent.ExecutionException;
29 import java.util.concurrent.ExecutorService;
30 import java.util.concurrent.Executors;
31 import java.util.concurrent.Future;
32 import java.util.concurrent.TimeUnit;
33 import java.util.concurrent.TimeoutException;
34 import javax.annotation.PreDestroy;
35 import javax.ws.rs.core.Response.Status;
36 import org.onap.aai.cl.api.LogFields;
37 import org.onap.aai.cl.api.Logger;
38 import org.onap.aai.cl.eelf.LoggerFactory;
39 import org.onap.aai.cl.mdc.MdcContext;
40 import org.onap.aai.cl.mdc.MdcOverride;
41 import org.onap.aai.event.api.EventConsumer;
42 import org.onap.aai.event.api.EventPublisher;
43 import org.onap.crud.dao.GraphDao;
44 import org.onap.crud.entity.Edge;
45 import org.onap.crud.entity.Vertex;
46 import org.onap.crud.event.GraphEvent;
47 import org.onap.crud.event.GraphEvent.GraphEventOperation;
48 import org.onap.crud.event.GraphEventEdge;
49 import org.onap.crud.event.GraphEventVertex;
50 import org.onap.crud.event.envelope.GraphEventEnvelope;
51 import org.onap.crud.event.response.GraphEventResponseHandler;
52 import org.onap.crud.exception.CrudException;
53 import org.onap.crud.logging.CrudServiceMsgs;
54 import org.onap.crud.util.CrudProperties;
55 import org.onap.crud.util.CrudServiceConstants;
56 import org.onap.schema.OxmModelValidator;
57 import org.onap.schema.RelationshipSchemaValidator;
58
59 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
60
61     private static Integer requestTimeOut;
62
63     private EventPublisher asyncRequestPublisher;
64
65     private Timer timer;
66
67     public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
68     private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
69
70     private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
71     private static Logger metricsLogger =
72             LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
73     private static LogFields okFields = new LogFields();
74
75     static {
76         okFields.setField(Status.OK, Status.OK.toString());
77     }
78
79     private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
80
81     public static Integer getRequestTimeOut() {
82         return requestTimeOut;
83     }
84
85     public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
86             EventConsumer asyncResponseConsumer) throws CrudException {
87         this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
88     }
89
90     public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
91             EventConsumer asyncResponseConsumer) throws CrudException {
92
93         super();
94         this.dao = dao;
95         this.daoForGet = daoForGet;
96
97         requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
98         try {
99             requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
100         } catch (NumberFormatException ex) {
101             // Leave it as the default
102         }
103
104         Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
105         try {
106             responsePollInterval =
107                     Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
108         } catch (Exception ex) {
109             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
110                     + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
111         }
112
113         // Start the Response Consumer timer
114         CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(asyncResponseConsumer);
115         timer = new Timer("crudAsyncResponseConsumer-1");
116         timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
117
118         this.asyncRequestPublisher = asyncRequestPublisher;
119
120         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
121     }
122
123     public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
124         private volatile GraphEventEnvelope graphEventEnvelope;
125         private volatile CountDownLatch latch = new CountDownLatch(1);
126
127         @Override
128         public GraphEventEnvelope call() throws TimeoutException {
129             try {
130                 // Wait until graphEvent is available
131                 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
132             } catch (InterruptedException e) {
133                 latch.countDown();
134                 if (this.graphEventEnvelope != null) {
135                     return this.graphEventEnvelope;
136                 } else {
137                     throw new TimeoutException();
138                 }
139             }
140             return this.graphEventEnvelope;
141         }
142
143         public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
144             this.graphEventEnvelope = eventEnvelope;
145             latch.countDown();
146         }
147     }
148
149     private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
150
151         long startTimeInMs = System.currentTimeMillis();
152         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
153         MdcOverride override = new MdcOverride();
154         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
155
156         String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
157
158         // publish to request queue
159         try {
160             asyncRequestPublisher.sendSync(eventEnvelopeJson);
161         } catch (Exception e) {
162             throw new CrudException(
163                     "Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(),
164                     Status.INTERNAL_SERVER_ERROR);
165         }
166
167         logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
168
169         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
170                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
171                         + " , transaction-id: " + event.getTransactionId() + " , operation: "
172                         + event.getOperation().toString());
173
174         ExecutorService executor =
175                 Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
176         CollectGraphResponse collector = new CollectGraphResponse();
177         CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
178         GraphEventEnvelope response;
179         Future<GraphEventEnvelope> future = executor.submit(collector);
180         try {
181             response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
182
183         } catch (InterruptedException | ExecutionException | TimeoutException e) {
184             CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
185             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
186                     "Request timed out for transactionId: " + event.getTransactionId());
187             future.cancel(true);
188             throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
189                     + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
190         } finally {
191             // Kill the thread as the work is completed
192             executor.shutdownNow();
193         }
194         metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
195                 "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
196                         + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
197                         + " ms");
198         return response;
199     }
200
201     @Override
202     public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
203         // Validate the incoming payload
204         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
205         vertex.getProperties().put(org.onap.schema.OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
206         // Create graph request event
207         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
208                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
209
210         GraphEventEnvelope response = sendAndWait(event);
211         return responseHandler.handleVertexResponse(version, event, response);
212     }
213
214     @Override
215     public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
216         Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
217         // Create graph request event
218         GraphEvent event =
219                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
220
221         GraphEventEnvelope response = sendAndWait(event);
222         return responseHandler.handleEdgeResponse(version, event, response);
223     }
224
225     @Override
226     public String updateVertex(String version, String id, String type, VertexPayload payload) throws CrudException {
227         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
228         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
229                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
230
231         GraphEventEnvelope response = sendAndWait(event);
232         return responseHandler.handleVertexResponse(version, event, response);
233     }
234
235     @Override
236     public String patchVertex(String version, String id, String type, VertexPayload payload) throws CrudException {
237         Vertex existingVertex = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
238                 new HashMap<String, String>());
239         Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
240                 payload.getProperties(), existingVertex);
241         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
242                 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
243
244         GraphEventEnvelope response = sendAndWait(event);
245         return responseHandler.handleVertexResponse(version, event, response);
246     }
247
248     @Override
249     public String deleteVertex(String version, String id, String type) throws CrudException {
250         type = OxmModelValidator.resolveCollectionType(version, type);
251         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
252                 .vertex(new GraphEventVertex(id, version, type, null)).build();
253
254         GraphEventEnvelope response = sendAndWait(event);
255         return responseHandler.handleDeletionResponse(event, response);
256     }
257
258     @Override
259     public String deleteEdge(String version, String id, String type) throws CrudException {
260         RelationshipSchemaValidator.validateType(version, type);
261         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
262                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
263
264         GraphEventEnvelope response = sendAndWait(event);
265         return responseHandler.handleDeletionResponse(event, response);
266     }
267
268     @Override
269     public String updateEdge(String version, String id, String type, EdgePayload payload) throws CrudException {
270         Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
271         Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
272         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
273                 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
274
275         GraphEventEnvelope response = sendAndWait(event);
276         return responseHandler.handleEdgeResponse(version, event, response);
277     }
278
279     @Override
280     public String patchEdge(String version, String id, String type, EdgePayload payload) throws CrudException {
281         Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
282         Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
283         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
284                 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
285
286         GraphEventEnvelope response = sendAndWait(event);
287         return responseHandler.handleEdgeResponse(version, event, response);
288     }
289
290     @PreDestroy
291     protected void preShutdown() {
292         timer.cancel();
293     }
294
295     @Override
296     protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
297         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
298                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
299         event.setDbTransactionId(dbTransId);
300         GraphEvent response = publishEvent(event);
301         return response.getVertex().toVertex();
302     }
303
304     @Override
305     protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
306         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
307                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
308         event.setDbTransactionId(dbTransId);
309         GraphEvent response = publishEvent(event);
310         return response.getVertex().toVertex();
311     }
312
313     @Override
314     protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
315         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
316                 .vertex(new GraphEventVertex(id, version, type, null)).build();
317         event.setDbTransactionId(dbTransId);
318         publishEvent(event);
319     }
320
321     @Override
322     protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
323         GraphEvent event =
324                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
325         event.setDbTransactionId(dbTransId);
326         GraphEvent response = publishEvent(event);
327         return response.getEdge().toEdge();
328     }
329
330     @Override
331     protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
332         GraphEvent event =
333                 GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
334         event.setDbTransactionId(dbTransId);
335         GraphEvent response = publishEvent(event);
336         return response.getEdge().toEdge();
337     }
338
339     @Override
340     protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
341         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
342                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
343         event.setDbTransactionId(dbTransId);
344         publishEvent(event);
345     }
346
347     private GraphEvent publishEvent(GraphEvent event) throws CrudException {
348         GraphEventEnvelope response = sendAndWait(event);
349         responseHandler.handleBulkEventResponse(event, response);
350         return response.getBody();
351     }
352 }