d674d0e39fbb298226246f508b8ff555cf2202de
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * Gizmo
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  */
24 package org.onap.crud.service;
25
26 import org.onap.aai.event.api.EventConsumer;
27 import org.onap.aai.event.api.EventPublisher;
28
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.schema.OxmModelValidator;
48 import org.onap.schema.RelationshipSchemaValidator;
49
50 import java.text.SimpleDateFormat;
51 import java.util.HashMap;
52 import java.util.Timer;
53 import java.util.concurrent.Callable;
54 import java.util.concurrent.CountDownLatch;
55 import java.util.concurrent.ExecutionException;
56 import java.util.concurrent.ExecutorService;
57 import java.util.concurrent.Executors;
58 import java.util.concurrent.Future;
59 import java.util.concurrent.TimeUnit;
60 import java.util.concurrent.TimeoutException;
61 import javax.annotation.PreDestroy;
62 import javax.ws.rs.core.Response.Status;
63
64 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
65
66   private static Integer requestTimeOut;
67
68   private EventPublisher asyncRequestPublisher;
69
70   private Timer timer;
71
72   public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
73   private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
74
75   private static Logger logger = LoggerFactory.getInstance()
76     .getLogger(CrudAsyncGraphDataService.class.getName());
77   private static Logger metricsLogger = LoggerFactory.getInstance()
78     .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
79   private static LogFields OK_FIELDS = new LogFields();
80
81   static {
82                 OK_FIELDS.setField(Status.OK, Status.OK.toString());
83   }
84
85   public static Integer getRequestTimeOut() {
86     return requestTimeOut;
87   }
88   
89   public CrudAsyncGraphDataService(GraphDao dao, 
90           EventPublisher asyncRequestPublisher,
91           EventConsumer asyncResponseConsumer) throws CrudException {
92       this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
93   }
94
95   public CrudAsyncGraphDataService(GraphDao dao, 
96           GraphDao daoForGet, 
97                   EventPublisher asyncRequestPublisher,
98                   EventConsumer asyncResponseConsumer) throws CrudException {
99
100     super();
101     this.dao = dao;
102     this.daoForGet = daoForGet;
103      
104     requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
105     try {
106       requestTimeOut
107         = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
108     } catch (NumberFormatException ex) {
109       // Leave it as the default
110     }
111
112     Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
113     try {
114       responsePollInterval = Integer
115         .parseInt(CrudProperties
116                   .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
117     } catch (Exception ex) {
118       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
119                    + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
120                    + " error: " + ex.getMessage());
121     }
122
123     // Start the Response Consumer timer
124     CrudAsyncResponseConsumer crudAsyncResponseConsumer
125       = new CrudAsyncResponseConsumer(asyncResponseConsumer);
126     timer = new Timer("crudAsyncResponseConsumer-1");
127     timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
128
129     this.asyncRequestPublisher = asyncRequestPublisher;
130     
131     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
132                 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
133   }
134
135   public class CollectGraphResponse implements Callable<GraphEvent> {
136     private volatile GraphEvent graphEvent;
137     private volatile CountDownLatch latch = new CountDownLatch(1);
138
139     @Override
140     public GraphEvent call() throws TimeoutException {
141       try {
142         // Wait until graphEvent is available
143         latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
144       } catch (InterruptedException e) {
145         latch.countDown();
146         if (this.graphEvent != null) {
147           return this.graphEvent;
148         } else {
149           throw new TimeoutException();
150         }
151       }
152       return this.graphEvent;
153     }
154
155     public void populateGraphEvent(GraphEvent event) {
156       this.graphEvent = event;
157       latch.countDown();
158     }
159   }
160
161   private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
162
163     long startTimeInMs = System.currentTimeMillis();
164     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
165     MdcOverride override = new MdcOverride();
166     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
167
168     // publish to request queue
169     try {
170       asyncRequestPublisher.sendSync(event.toJson());
171     } catch (Exception e) {
172       throw new CrudException("Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
173     }
174     
175     logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
176
177     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
178                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
179                 + " , transaction-id: " + event.getTransactionId() + " , operation: "
180                 + event.getOperation().toString());
181
182     ExecutorService executor = Executors
183       .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
184     CollectGraphResponse collector = new CollectGraphResponse();
185     CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
186     GraphEvent response;
187     Future<GraphEvent> future = executor.submit(collector);
188     try {
189       response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
190
191     } catch (InterruptedException | ExecutionException | TimeoutException e) {
192       CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
193       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
194                    "Request timed out for transactionId: " + event.getTransactionId());
195       future.cancel(true);
196       throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
197                               + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
198     } finally {      
199       //Kill the thread as the work is completed
200       executor.shutdownNow();
201     }
202     metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
203             "Total elapsed time for operation: " + event.getOperation().toString()
204             + " , transactionId: " + event.getTransactionId() + " is "
205             + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
206     return response;
207   }
208
209   public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
210     // Validate the incoming payload
211     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
212                                                                     type, payload.getProperties());
213     // Create graph request event
214     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
215       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
216
217     GraphEvent response = sendAndWait(event);
218     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
219       logSuccessResponse(event, response);
220       return CrudResponseBuilder.buildUpsertVertexResponse(
221                                                            OxmModelValidator.validateOutgoingPayload(version,
222                                                                                                      response.getVertex().toVertex()), version);
223     } else {
224       logErrorResponse(event, response);
225       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
226                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
227     }
228
229   }
230
231   public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
232     Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
233     // Create graph request event
234     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
235       .edge(GraphEventEdge.fromEdge(edge, version)).build();
236
237     GraphEvent response = sendAndWait(event);
238     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
239       logSuccessResponse(event, response);
240       return CrudResponseBuilder.buildUpsertEdgeResponse(
241                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
242                                                          version);
243     } else {
244       logErrorResponse(event, response);
245       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
246                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
247     }
248   }
249
250   public String updateVertex(String version, String id, String type, VertexPayload payload)
251     throws CrudException {
252     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
253                                                                     type, payload.getProperties());
254     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
255       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
256
257     GraphEvent response = sendAndWait(event);
258     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
259       logSuccessResponse(event, response);
260       return CrudResponseBuilder.buildUpsertVertexResponse(
261                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
262                                                            version);
263     } else {
264       logErrorResponse(event, response);
265       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
266                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
267     }
268
269   }
270
271   public String patchVertex(String version, String id, String type, VertexPayload payload)
272     throws CrudException {
273     Vertex existingVertex
274       = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version, new HashMap<String, String>());
275     Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
276                                                                           type, payload.getProperties(),
277                                                                           existingVertex);
278     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
279       .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
280
281     GraphEvent response = sendAndWait(event);
282     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
283       logSuccessResponse(event, response);
284       return CrudResponseBuilder.buildUpsertVertexResponse(
285                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
286                                                            version);
287     } else {
288       logErrorResponse(event, response);
289       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
290                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
291     }
292
293   }
294
295   public String deleteVertex(String version, String id, String type) throws CrudException {
296     type = OxmModelValidator.resolveCollectionType(version, type);
297     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
298       .vertex(new GraphEventVertex(id, version, type, null)).build();
299
300     GraphEvent response = sendAndWait(event);
301     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
302       logSuccessResponse(event, response);
303       return "";
304     } else {
305       logErrorResponse(event, response);
306       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
307                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
308     }
309
310   }
311
312   public String deleteEdge(String version, String id, String type) throws CrudException {
313     RelationshipSchemaValidator.validateType(version, type);
314     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
315       .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
316
317     GraphEvent response = sendAndWait(event);
318     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
319       logSuccessResponse(event, response);
320       return "";
321     } else {
322       logErrorResponse(event, response);
323       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
324                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
325     }
326
327   }
328
329   public String updateEdge(String version, String id, String type, EdgePayload payload)
330     throws CrudException {
331     Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
332     Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
333                                                                                    payload);
334     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
335       .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
336
337     GraphEvent response = sendAndWait(event);
338     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
339       logSuccessResponse(event, response);
340       return CrudResponseBuilder.buildUpsertEdgeResponse(
341                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
342                                                                                                              response.getEdge().toEdge()), version);
343     } else {
344       logErrorResponse(event, response);
345       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
346                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
347     }
348
349   }
350
351   public String patchEdge(String version, String id, String type, EdgePayload payload)
352     throws CrudException {
353     Edge edge = dao.getEdge(id, type, new HashMap<String, String>());
354     Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
355                                                                                 payload);
356     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
357       .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
358
359     GraphEvent response = sendAndWait(event);
360     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
361       logSuccessResponse(event, response);
362       return CrudResponseBuilder.buildUpsertEdgeResponse(
363                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
364                                                          version);
365     } else {
366       logErrorResponse(event, response);
367       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
368                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
369     }
370
371   }
372
373   @PreDestroy
374   protected void preShutdown() {
375     timer.cancel();
376
377   }
378   
379   @Override
380   protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
381     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
382         .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
383     event.setDbTransactionId(dbTransId);
384     GraphEvent response = publishEvent(event); 
385     return response.getVertex().toVertex();
386   }
387   
388   @Override
389   protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
390     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
391         .vertex(GraphEventVertex.fromVertex(vertex, version)).build();    
392     event.setDbTransactionId(dbTransId);
393     GraphEvent response = publishEvent(event);
394     return response.getVertex().toVertex();
395   }
396   
397   @Override
398   protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
399     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
400     event.setDbTransactionId(dbTransId);
401     publishEvent(event); 
402   }
403   
404   @Override
405   protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
406     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
407     event.setDbTransactionId(dbTransId);
408     GraphEvent response = publishEvent(event);
409     return response.getEdge().toEdge();
410   }
411   
412   @Override
413   protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
414     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
415     event.setDbTransactionId(dbTransId);
416     GraphEvent response = publishEvent(event);
417     return response.getEdge().toEdge();
418   }
419   
420   @Override
421   protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
422     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
423         .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
424     event.setDbTransactionId(dbTransId);
425     publishEvent(event);
426   }
427   
428   private GraphEvent publishEvent(GraphEvent event) throws CrudException {
429     GraphEvent response = sendAndWait(event);
430     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
431       logSuccessResponse(event, response);
432     } else {
433       logErrorResponse(event, response);
434       throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
435                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
436     } 
437     
438     return response;
439   }
440
441   private void logSuccessResponse(GraphEvent event, GraphEvent response) {
442     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
443         "Event response received: " + response.getObjectType() + " with key: "
444         + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
445         + " , operation: " + event.getOperation().toString() + " , result: "
446         + response.getResult());
447   }
448   
449   private void logErrorResponse(GraphEvent event, GraphEvent response) {
450     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
451         "Event response received: " + response.getObjectType() + " with key: "
452         + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
453         + " , operation: " + event.getOperation().toString() + " , result: "
454         + response.getResult() + " , error: " + response.getErrorMessage());
455   }
456 }