Better handling for exception logs
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * Gizmo
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  */
24 package org.onap.crud.service;
25
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
28
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.schema.OxmModelValidator;
48 import org.onap.schema.RelationshipSchemaValidator;
49
50 import java.text.SimpleDateFormat;
51 import java.util.Timer;
52 import java.util.concurrent.Callable;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.ExecutionException;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.TimeoutException;
60 import javax.annotation.PreDestroy;
61 import javax.ws.rs.core.Response.Status;
62
63 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
64
65   private static Integer requestTimeOut;
66
67   private EventPublisher asyncRequestPublisher;
68
69   private Timer timer;
70
71   public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
72   private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
73
74   private static Logger logger = LoggerFactory.getInstance()
75     .getLogger(CrudAsyncGraphDataService.class.getName());
76   private static Logger metricsLogger = LoggerFactory.getInstance()
77     .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
78   private static LogFields OK_FIELDS = new LogFields();
79
80   static {
81                 OK_FIELDS.setField(Status.OK, Status.OK.toString());
82   }
83
84   public static Integer getRequestTimeOut() {
85     return requestTimeOut;
86   }
87
88   public CrudAsyncGraphDataService(GraphDao dao, 
89                   EventPublisher asyncRequestPublisher,
90                   EventConsumer asyncResponseConsumer) throws CrudException {
91
92      super(dao);
93      
94     requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
95     try {
96       requestTimeOut
97         = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
98     } catch (NumberFormatException ex) {
99       // Leave it as the default
100     }
101
102     Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
103     try {
104       responsePollInterval = Integer
105         .parseInt(CrudProperties
106                   .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
107     } catch (Exception ex) {
108       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
109                    + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
110                    + " error: " + ex.getMessage());
111     }
112
113     // Start the Response Consumer timer
114     CrudAsyncResponseConsumer crudAsyncResponseConsumer
115       = new CrudAsyncResponseConsumer(asyncResponseConsumer);
116     timer = new Timer("crudAsyncResponseConsumer-1");
117     timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
118
119     this.asyncRequestPublisher = asyncRequestPublisher;
120     
121     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
122                 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
123   }
124
125   public class CollectGraphResponse implements Callable<GraphEvent> {
126     private volatile GraphEvent graphEvent;
127     private volatile CountDownLatch latch = new CountDownLatch(1);
128
129     @Override
130     public GraphEvent call() throws TimeoutException {
131       try {
132         // Wait until graphEvent is available
133         latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
134       } catch (InterruptedException e) {
135         latch.countDown();
136         if (this.graphEvent != null) {
137           return this.graphEvent;
138         } else {
139           throw new TimeoutException();
140         }
141       }
142       return this.graphEvent;
143     }
144
145     public void populateGraphEvent(GraphEvent event) {
146       this.graphEvent = event;
147       latch.countDown();
148     }
149   }
150
151   private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
152
153     long startTimeInMs = System.currentTimeMillis();
154     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
155     MdcOverride override = new MdcOverride();
156     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
157
158     // publish to request queue
159     try {
160       asyncRequestPublisher.sendSync(event.toJson());
161     } catch (Exception e) {
162       throw new CrudException("Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
163     }
164     
165     logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
166
167     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
168                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
169                 + " , transaction-id: " + event.getTransactionId() + " , operation: "
170                 + event.getOperation().toString());
171
172     ExecutorService executor = Executors
173       .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
174     CollectGraphResponse collector = new CollectGraphResponse();
175     CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
176     GraphEvent response;
177     Future<GraphEvent> future = executor.submit(collector);
178     try {
179       response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
180
181     } catch (InterruptedException | ExecutionException | TimeoutException e) {
182       CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
183       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
184                    "Request timed out for transactionId: " + event.getTransactionId());
185       future.cancel(true);
186       throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
187                               + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
188     } finally {      
189       //Kill the thread as the work is completed
190       executor.shutdownNow();
191     }
192     metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
193             "Total elapsed time for operation: " + event.getOperation().toString()
194             + " , transactionId: " + event.getTransactionId() + " is "
195             + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
196     return response;
197   }
198
199   public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
200     // Validate the incoming payload
201     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
202                                                                     type, payload.getProperties());
203     // Create graph request event
204     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
205       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
206
207     GraphEvent response = sendAndWait(event);
208     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
209       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
210                   "Event response received: " + response.getObjectType() + " with key: "
211                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
212                   + " , operation: " + event.getOperation().toString() + " , result: "
213                   + response.getResult());
214       return CrudResponseBuilder.buildUpsertVertexResponse(
215                                                            OxmModelValidator.validateOutgoingPayload(version,
216                                                                                                      response.getVertex().toVertex()), version);
217     } else {
218       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
219                   "Event response received: " + response.getObjectType() + " with key: "
220                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
221                   + " , operation: " + event.getOperation().toString() + " , result: "
222                   + response.getResult() + " , error: " + response.getErrorMessage());
223       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
224                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
225     }
226
227   }
228
229   public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
230     Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
231     // Create graph request event
232     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
233       .edge(GraphEventEdge.fromEdge(edge, version)).build();
234
235     GraphEvent response = sendAndWait(event);
236     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
237       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
238                   "Event response received: " + response.getObjectType() + " with key: "
239                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
240                   + " , operation: " + event.getOperation().toString() + " , result: "
241                   + response.getResult());
242       return CrudResponseBuilder.buildUpsertEdgeResponse(
243                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
244                                                          version);
245     } else {
246       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
247                   "Event response received: " + response.getObjectType() + " with key: "
248                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
249                   + " , operation: " + event.getOperation().toString() + " , result: "
250                   + response.getResult() + " , error: " + response.getErrorMessage());
251       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
252                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
253     }
254   }
255
256   public String updateVertex(String version, String id, String type, VertexPayload payload)
257     throws CrudException {
258     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
259                                                                     type, payload.getProperties());
260     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
261       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
262
263     GraphEvent response = sendAndWait(event);
264     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
265       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
266                   "Event response received: " + response.getObjectType() + " with key: "
267                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
268                   + " , operation: " + event.getOperation().toString() + " , result: "
269                   + response.getResult());
270       return CrudResponseBuilder.buildUpsertVertexResponse(
271                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
272                                                            version);
273     } else {
274       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
275                   "Event response received: " + response.getObjectType() + " with key: "
276                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
277                   + " , operation: " + event.getOperation().toString() + " , result: "
278                   + response.getResult() + " , error: " + response.getErrorMessage());
279       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
280                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
281     }
282
283   }
284
285   public String patchVertex(String version, String id, String type, VertexPayload payload)
286     throws CrudException {
287     Vertex existingVertex
288       = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
289     Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
290                                                                           type, payload.getProperties(),
291                                                                           existingVertex);
292     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
293       .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
294
295     GraphEvent response = sendAndWait(event);
296     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
297       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
298                   "Event response received: " + response.getObjectType() + " with key: "
299                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
300                   + " , operation: " + event.getOperation().toString() + " , result: "
301                   + response.getResult());
302       return CrudResponseBuilder.buildUpsertVertexResponse(
303                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
304                                                            version);
305     } else {
306       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
307                   "Event response received: " + response.getObjectType() + " with key: "
308                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
309                   + " , operation: " + event.getOperation().toString() + " , result: "
310                   + response.getResult() + " , error: " + response.getErrorMessage());
311       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
312                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
313     }
314
315   }
316
317   public String deleteVertex(String version, String id, String type) throws CrudException {
318     type = OxmModelValidator.resolveCollectionType(version, type);
319     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
320       .vertex(new GraphEventVertex(id, version, type, null)).build();
321
322     GraphEvent response = sendAndWait(event);
323     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
324       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
325                   "Event response received: " + response.getObjectType() + " with key: "
326                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
327                   + " , operation: " + event.getOperation().toString() + " , result: "
328                   + response.getResult());
329       return "";
330     } else {
331       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
332                   "Event response received: " + response.getObjectType() + " with key: "
333                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
334                   + " , operation: " + event.getOperation().toString() + " , result: "
335                   + response.getResult() + " , error: " + response.getErrorMessage());
336       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
337                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
338     }
339
340   }
341
342   public String deleteEdge(String version, String id, String type) throws CrudException {
343     RelationshipSchemaValidator.validateType(version, type);
344     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
345       .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
346
347     GraphEvent response = sendAndWait(event);
348     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
349       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
350                   "Event response received: " + response.getObjectType() + " with key: "
351                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
352                   + " , operation: " + event.getOperation().toString() + " , result: "
353                   + response.getResult());
354       return "";
355     } else {
356       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
357                   "Event response received: " + response.getObjectType() + " with key: "
358                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
359                   + " , operation: " + event.getOperation().toString() + " , result: "
360                   + response.getResult() + " , error: " + response.getErrorMessage());
361       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
362                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
363     }
364
365   }
366
367   public String updateEdge(String version, String id, String type, EdgePayload payload)
368     throws CrudException {
369     Edge edge = dao.getEdge(id, type);
370     Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
371                                                                                    payload);
372     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
373       .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
374
375     GraphEvent response = sendAndWait(event);
376     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
377       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
378                   "Event response received: " + response.getObjectType() + " with key: "
379                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
380                   + " , operation: " + event.getOperation().toString() + " , result: "
381                   + response.getResult());
382       return CrudResponseBuilder.buildUpsertEdgeResponse(
383                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
384                                                                                                              response.getEdge().toEdge()), version);
385     } else {
386       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
387                   "Event response received: " + response.getObjectType() + " with key: "
388                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
389                   + " , operation: " + event.getOperation().toString() + " , result: "
390                   + response.getResult() + " , error: " + response.getErrorMessage());
391       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
392                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
393     }
394
395   }
396
397   public String patchEdge(String version, String id, String type, EdgePayload payload)
398     throws CrudException {
399     Edge edge = dao.getEdge(id, type);
400     Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
401                                                                                 payload);
402     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
403       .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
404
405     GraphEvent response = sendAndWait(event);
406     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
407       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
408                   "Event response received: " + response.getObjectType() + " with key: "
409                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
410                   + " , operation: " + event.getOperation().toString() + " , result: "
411                   + response.getResult());
412       return CrudResponseBuilder.buildUpsertEdgeResponse(
413                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
414                                                          version);
415     } else {
416       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
417                   "Event response received: " + response.getObjectType() + " with key: "
418                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
419                   + " , operation: " + event.getOperation().toString() + " , result: "
420                   + response.getResult() + " , error: " + response.getErrorMessage());
421       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
422                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
423     }
424
425   }
426
427   @PreDestroy
428   protected void preShutdown() {
429     timer.cancel();
430
431   }
432
433   @Override
434   public String addBulk(String version, BulkPayload payload) throws CrudException {
435     throw new CrudException("Bulk operation not supported in async mode", Status.BAD_REQUEST);
436   }
437
438
439 }