14ba7f22311f0a904fced13f0e6e9f7296dba114
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.crud.service;
22
23 import java.io.IOException;
24 import java.security.NoSuchAlgorithmException;
25 import java.text.SimpleDateFormat;
26 import java.util.HashMap;
27 import java.util.Timer;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import javax.annotation.PreDestroy;
37 import javax.ws.rs.core.EntityTag;
38 import javax.ws.rs.core.Response.Status;
39 import org.apache.commons.lang3.tuple.ImmutablePair;
40 import org.onap.aai.cl.api.LogFields;
41 import org.onap.aai.cl.api.Logger;
42 import org.onap.aai.cl.eelf.LoggerFactory;
43 import org.onap.aai.cl.mdc.MdcContext;
44 import org.onap.aai.cl.mdc.MdcOverride;
45 import org.onap.aai.event.api.EventConsumer;
46 import org.onap.aai.event.api.EventPublisher;
47 import org.onap.aai.restclient.client.OperationResult;
48 import org.onap.crud.dao.GraphDao;
49 import org.onap.crud.entity.Edge;
50 import org.onap.crud.entity.Vertex;
51 import org.onap.crud.event.GraphEvent;
52 import org.onap.crud.event.GraphEvent.GraphEventOperation;
53 import org.onap.crud.event.GraphEventEdge;
54 import org.onap.crud.event.GraphEventVertex;
55 import org.onap.crud.event.envelope.GraphEventEnvelope;
56 import org.onap.crud.event.response.GraphEventResponseHandler;
57 import org.onap.crud.exception.CrudException;
58 import org.onap.crud.logging.CrudServiceMsgs;
59 import org.onap.crud.util.CrudProperties;
60 import org.onap.crud.util.CrudServiceConstants;
61 import org.onap.crud.util.etag.EtagGenerator;
62 import org.onap.schema.OxmModelValidator;
63 import org.onap.schema.RelationshipSchemaValidator;
64
65 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
66
67     private static Integer requestTimeOut;
68
69     private EventPublisher asyncRequestPublisher;
70
71     private Timer timer;
72
73     public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
74     private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
75
76     private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
77     private static Logger metricsLogger =
78             LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
79     private static LogFields okFields = new LogFields();
80     private EtagGenerator etagGenerator;
81
82     static {
83         okFields.setField(Status.OK, Status.OK.toString());
84     }
85
86     private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
87
88     public static Integer getRequestTimeOut() {
89         return requestTimeOut;
90     }
91
92     public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
93             EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
94         this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
95     }
96
97     public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
98             EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
99
100         super();
101         this.dao = dao;
102         this.daoForGet = daoForGet;
103
104         requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
105         try {
106             requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
107         } catch (NumberFormatException ex) {
108             // Leave it as the default
109         }
110
111         Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
112         try {
113             responsePollInterval =
114                     Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
115         } catch (Exception ex) {
116             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
117                     + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
118         }
119
120         // Start the Response Consumer timer
121         CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(
122             asyncResponseConsumer, new GraphEventUpdater()
123         );
124         timer = new Timer("crudAsyncResponseConsumer-1");
125         timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
126
127         this.asyncRequestPublisher = asyncRequestPublisher;
128         this.etagGenerator = new EtagGenerator();
129
130         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
131     }
132
133     public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
134         private volatile GraphEventEnvelope graphEventEnvelope;
135         private volatile CountDownLatch latch = new CountDownLatch(1);
136
137         @Override
138         public GraphEventEnvelope call() throws TimeoutException {
139             try {
140                 // Wait until graphEvent is available
141                 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
142             } catch (InterruptedException e) {
143                 latch.countDown();
144                 if (this.graphEventEnvelope != null) {
145                     return this.graphEventEnvelope;
146                 } else {
147                     throw new TimeoutException();
148                 }
149             }
150             return this.graphEventEnvelope;
151         }
152
153         public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
154             this.graphEventEnvelope = eventEnvelope;
155             latch.countDown();
156         }
157     }
158
159     private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
160
161         long startTimeInMs = System.currentTimeMillis();
162         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
163         MdcOverride override = new MdcOverride();
164         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
165
166         String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
167
168         // publish to request queue
169         try {
170             asyncRequestPublisher.sendSync(eventEnvelopeJson);
171         } catch (Exception e) {
172             throw new CrudException(
173                     "Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(),
174                     Status.INTERNAL_SERVER_ERROR);
175         }
176
177         logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
178
179         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
180                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
181                         + " , transaction-id: " + event.getTransactionId() + " , operation: "
182                         + event.getOperation().toString());
183
184         ExecutorService executor =
185                 Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
186         CollectGraphResponse collector = new CollectGraphResponse();
187         CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
188         GraphEventEnvelope response;
189         Future<GraphEventEnvelope> future = executor.submit(collector);
190         try {
191             response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
192
193         } catch (InterruptedException | ExecutionException | TimeoutException e) {
194             CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
195             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
196                     "Request timed out for transactionId: " + event.getTransactionId());
197             future.cancel(true);
198             throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
199                     + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
200         } finally {
201             // Kill the thread as the work is completed
202             executor.shutdownNow();
203         }
204         metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
205                 "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
206                         + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
207                         + " ms");
208         return response;
209     }
210
211     @Override
212     public ImmutablePair<EntityTag, String> addVertex(String version, String type, VertexPayload payload)
213             throws CrudException {
214         // Validate the incoming payload
215         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
216         vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
217         // Create graph request event
218         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
219                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
220
221         GraphEventEnvelope response = sendAndWait(event);
222
223         EntityTag entityTag;
224         try {
225             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
226         } catch (IOException e) {
227             throw new CrudException(e);
228         }
229         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
230
231         return new ImmutablePair<>(entityTag, responsePayload);
232     }
233
234     @Override
235     public ImmutablePair<EntityTag, String> addEdge(String version, String type, EdgePayload payload)
236             throws CrudException {
237         Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
238
239         // Create graph request event
240         GraphEvent event =
241                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
242
243         GraphEventEnvelope response = sendAndWait(event);
244
245         EntityTag entityTag;
246         try {
247             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
248         } catch (IOException e) {
249             throw new CrudException(e);
250         }
251         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
252
253         return new ImmutablePair<>(entityTag, responsePayload);
254     }
255
256     @Override
257     public ImmutablePair<EntityTag, String> updateVertex(String version, String id, String type, VertexPayload payload)
258             throws CrudException {
259         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
260         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
261                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
262
263         GraphEventEnvelope response = sendAndWait(event);
264
265         EntityTag entityTag;
266         try {
267             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
268         } catch (IOException e) {
269             throw new CrudException(e);
270         }
271         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
272
273         return new ImmutablePair<>(entityTag, responsePayload);
274     }
275
276     @Override
277     public ImmutablePair<EntityTag, String> patchVertex(String version, String id, String type, VertexPayload payload)
278             throws CrudException {
279         OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
280                 new HashMap<String, String>());
281         Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
282         Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
283                 payload.getProperties(), existingVertex);
284         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
285                 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
286
287         GraphEventEnvelope response = sendAndWait(event);
288
289         EntityTag entityTag;
290         try {
291             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
292         } catch (IOException e) {
293             throw new CrudException(e);
294         }
295         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
296
297         return new ImmutablePair<>(entityTag, responsePayload);
298     }
299
300     @Override
301     public String deleteVertex(String version, String id, String type) throws CrudException {
302         type = OxmModelValidator.resolveCollectionType(version, type);
303         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
304                 .vertex(new GraphEventVertex(id, version, type, null)).build();
305
306         GraphEventEnvelope response = sendAndWait(event);
307         return responseHandler.handleDeletionResponse(event, response);
308     }
309
310     @Override
311     public String deleteEdge(String version, String id, String type) throws CrudException {
312         RelationshipSchemaValidator.validateType(version, type);
313         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
314                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
315
316         GraphEventEnvelope response = sendAndWait(event);
317         return responseHandler.handleDeletionResponse(event, response);
318     }
319
320     @Override
321     public ImmutablePair<EntityTag, String> updateEdge(String version, String id, String type, EdgePayload payload)
322             throws CrudException {
323         OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
324         Edge edge = Edge.fromJson(operationResult.getResult());
325         Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
326
327         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
328                 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
329
330         GraphEventEnvelope response = sendAndWait(event);
331
332         EntityTag entityTag;
333         try {
334             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
335         } catch (IOException e) {
336             throw new CrudException(e);
337         }
338         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
339
340         return new ImmutablePair<>(entityTag, responsePayload);
341     }
342
343     @Override
344     public ImmutablePair<EntityTag, String> patchEdge(String version, String id, String type, EdgePayload payload)
345             throws CrudException {
346         OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
347         Edge edge = Edge.fromJson(operationResult.getResult());
348         Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
349         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
350                 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
351
352         GraphEventEnvelope response = sendAndWait(event);
353
354         EntityTag entityTag;
355         try {
356             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
357         } catch (IOException e) {
358             throw new CrudException(e);
359         }
360         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
361
362         return new ImmutablePair<>(entityTag, responsePayload);
363     }
364
365     @PreDestroy
366     protected void preShutdown() {
367         timer.cancel();
368     }
369
370     @Override
371     protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
372         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
373                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
374         event.setDbTransactionId(dbTransId);
375         GraphEvent response = publishEvent(event);
376         return response.getVertex().toVertex();
377     }
378
379     @Override
380     protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
381         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
382                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
383         event.setDbTransactionId(dbTransId);
384         GraphEvent response = publishEvent(event);
385         return response.getVertex().toVertex();
386     }
387
388     @Override
389     protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
390         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
391                 .vertex(new GraphEventVertex(id, version, type, null)).build();
392         event.setDbTransactionId(dbTransId);
393         publishEvent(event);
394     }
395
396     @Override
397     protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
398         GraphEvent event =
399                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
400         event.setDbTransactionId(dbTransId);
401         GraphEvent response = publishEvent(event);
402         return response.getEdge().toEdge();
403     }
404
405     @Override
406     protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
407         GraphEvent event =
408                 GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
409         event.setDbTransactionId(dbTransId);
410         GraphEvent response = publishEvent(event);
411         return response.getEdge().toEdge();
412     }
413
414     @Override
415     protected void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException {
416         // Get the edge type
417         String type = null;
418         try {
419             Edge edge = daoForGet.getEdge(id);
420             type = edge.getType();
421         }
422         catch (CrudException ex) {
423             // Likely the client is trying to delete an edge which isn't present.  Just swallow the exception
424             // and let the bulk request fail via the normal path.
425         }
426         
427         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
428                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
429         event.setDbTransactionId(dbTransId);
430         publishEvent(event);
431     }
432
433     private GraphEvent publishEvent(GraphEvent event) throws CrudException {
434         GraphEventEnvelope response = sendAndWait(event);
435         responseHandler.handleBulkEventResponse(event, response);
436         return response.getBody();
437     }
438 }