Config to be able to route GET through datarouter
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * Gizmo
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  */
24 package org.onap.crud.service;
25
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
28
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.schema.OxmModelValidator;
48 import org.onap.schema.RelationshipSchemaValidator;
49
50 import java.text.SimpleDateFormat;
51 import java.util.Timer;
52 import java.util.concurrent.Callable;
53 import java.util.concurrent.CountDownLatch;
54 import java.util.concurrent.ExecutionException;
55 import java.util.concurrent.ExecutorService;
56 import java.util.concurrent.Executors;
57 import java.util.concurrent.Future;
58 import java.util.concurrent.TimeUnit;
59 import java.util.concurrent.TimeoutException;
60 import javax.annotation.PreDestroy;
61 import javax.ws.rs.core.Response.Status;
62
63 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
64
65   private static Integer requestTimeOut;
66
67   private EventPublisher asyncRequestPublisher;
68
69   private Timer timer;
70
71   public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
72   private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
73
74   private static Logger logger = LoggerFactory.getInstance()
75     .getLogger(CrudAsyncGraphDataService.class.getName());
76   private static Logger metricsLogger = LoggerFactory.getInstance()
77     .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
78   private static LogFields OK_FIELDS = new LogFields();
79
80   static {
81                 OK_FIELDS.setField(Status.OK, Status.OK.toString());
82   }
83
84   public static Integer getRequestTimeOut() {
85     return requestTimeOut;
86   }
87   
88   public CrudAsyncGraphDataService(GraphDao dao, 
89           EventPublisher asyncRequestPublisher,
90           EventConsumer asyncResponseConsumer) throws CrudException {
91       this(dao,dao,asyncRequestPublisher,asyncResponseConsumer);
92   }
93
94   public CrudAsyncGraphDataService(GraphDao dao, 
95           GraphDao daoForGet, 
96                   EventPublisher asyncRequestPublisher,
97                   EventConsumer asyncResponseConsumer) throws CrudException {
98
99     super();
100     this.dao = dao;
101     this.daoForGet = daoForGet;
102      
103     requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
104     try {
105       requestTimeOut
106         = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
107     } catch (NumberFormatException ex) {
108       // Leave it as the default
109     }
110
111     Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
112     try {
113       responsePollInterval = Integer
114         .parseInt(CrudProperties
115                   .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
116     } catch (Exception ex) {
117       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
118                    + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
119                    + " error: " + ex.getMessage());
120     }
121
122     // Start the Response Consumer timer
123     CrudAsyncResponseConsumer crudAsyncResponseConsumer
124       = new CrudAsyncResponseConsumer(asyncResponseConsumer);
125     timer = new Timer("crudAsyncResponseConsumer-1");
126     timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
127
128     this.asyncRequestPublisher = asyncRequestPublisher;
129     
130     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
131                 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
132   }
133
134   public class CollectGraphResponse implements Callable<GraphEvent> {
135     private volatile GraphEvent graphEvent;
136     private volatile CountDownLatch latch = new CountDownLatch(1);
137
138     @Override
139     public GraphEvent call() throws TimeoutException {
140       try {
141         // Wait until graphEvent is available
142         latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
143       } catch (InterruptedException e) {
144         latch.countDown();
145         if (this.graphEvent != null) {
146           return this.graphEvent;
147         } else {
148           throw new TimeoutException();
149         }
150       }
151       return this.graphEvent;
152     }
153
154     public void populateGraphEvent(GraphEvent event) {
155       this.graphEvent = event;
156       latch.countDown();
157     }
158   }
159
160   private GraphEvent sendAndWait(GraphEvent event) throws CrudException {
161
162     long startTimeInMs = System.currentTimeMillis();
163     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
164     MdcOverride override = new MdcOverride();
165     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
166
167     // publish to request queue
168     try {
169       asyncRequestPublisher.sendSync(event.toJson());
170     } catch (Exception e) {
171       throw new CrudException("Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(), Status.INTERNAL_SERVER_ERROR);
172     }
173     
174     logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent ="+event.toJson());
175
176     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
177                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
178                 + " , transaction-id: " + event.getTransactionId() + " , operation: "
179                 + event.getOperation().toString());
180
181     ExecutorService executor = Executors
182       .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
183     CollectGraphResponse collector = new CollectGraphResponse();
184     CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
185     GraphEvent response;
186     Future<GraphEvent> future = executor.submit(collector);
187     try {
188       response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
189
190     } catch (InterruptedException | ExecutionException | TimeoutException e) {
191       CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
192       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
193                    "Request timed out for transactionId: " + event.getTransactionId());
194       future.cancel(true);
195       throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
196                               + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
197     } finally {      
198       //Kill the thread as the work is completed
199       executor.shutdownNow();
200     }
201     metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
202             "Total elapsed time for operation: " + event.getOperation().toString()
203             + " , transactionId: " + event.getTransactionId() + " is "
204             + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
205     return response;
206   }
207
208   public String addVertex(String version, String type, VertexPayload payload) throws CrudException {
209     // Validate the incoming payload
210     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
211                                                                     type, payload.getProperties());
212     // Create graph request event
213     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
214       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
215
216     GraphEvent response = sendAndWait(event);
217     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
218       logSuccessResponse(event, response);
219       return CrudResponseBuilder.buildUpsertVertexResponse(
220                                                            OxmModelValidator.validateOutgoingPayload(version,
221                                                                                                      response.getVertex().toVertex()), version);
222     } else {
223       logErrorResponse(event, response);
224       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
225                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
226     }
227
228   }
229
230   public String addEdge(String version, String type, EdgePayload payload) throws CrudException {
231     Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
232     // Create graph request event
233     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
234       .edge(GraphEventEdge.fromEdge(edge, version)).build();
235
236     GraphEvent response = sendAndWait(event);
237     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
238       logSuccessResponse(event, response);
239       return CrudResponseBuilder.buildUpsertEdgeResponse(
240                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
241                                                          version);
242     } else {
243       logErrorResponse(event, response);
244       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
245                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
246     }
247   }
248
249   public String updateVertex(String version, String id, String type, VertexPayload payload)
250     throws CrudException {
251     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
252                                                                     type, payload.getProperties());
253     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
254       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
255
256     GraphEvent response = sendAndWait(event);
257     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
258       logSuccessResponse(event, response);
259       return CrudResponseBuilder.buildUpsertVertexResponse(
260                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
261                                                            version);
262     } else {
263       logErrorResponse(event, response);
264       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
265                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
266     }
267
268   }
269
270   public String patchVertex(String version, String id, String type, VertexPayload payload)
271     throws CrudException {
272     Vertex existingVertex
273       = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version);
274     Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
275                                                                           type, payload.getProperties(),
276                                                                           existingVertex);
277     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
278       .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
279
280     GraphEvent response = sendAndWait(event);
281     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
282       logSuccessResponse(event, response);
283       return CrudResponseBuilder.buildUpsertVertexResponse(
284                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
285                                                            version);
286     } else {
287       logErrorResponse(event, response);
288       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
289                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
290     }
291
292   }
293
294   public String deleteVertex(String version, String id, String type) throws CrudException {
295     type = OxmModelValidator.resolveCollectionType(version, type);
296     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
297       .vertex(new GraphEventVertex(id, version, type, null)).build();
298
299     GraphEvent response = sendAndWait(event);
300     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
301       logSuccessResponse(event, response);
302       return "";
303     } else {
304       logErrorResponse(event, response);
305       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
306                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
307     }
308
309   }
310
311   public String deleteEdge(String version, String id, String type) throws CrudException {
312     RelationshipSchemaValidator.validateType(version, type);
313     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
314       .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
315
316     GraphEvent response = sendAndWait(event);
317     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
318       logSuccessResponse(event, response);
319       return "";
320     } else {
321       logErrorResponse(event, response);
322       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
323                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
324     }
325
326   }
327
328   public String updateEdge(String version, String id, String type, EdgePayload payload)
329     throws CrudException {
330     Edge edge = dao.getEdge(id, type);
331     Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
332                                                                                    payload);
333     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
334       .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
335
336     GraphEvent response = sendAndWait(event);
337     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
338       logSuccessResponse(event, response);
339       return CrudResponseBuilder.buildUpsertEdgeResponse(
340                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
341                                                                                                              response.getEdge().toEdge()), version);
342     } else {
343       logErrorResponse(event, response);
344       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
345                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
346     }
347
348   }
349
350   public String patchEdge(String version, String id, String type, EdgePayload payload)
351     throws CrudException {
352     Edge edge = dao.getEdge(id, type);
353     Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
354                                                                                 payload);
355     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
356       .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
357
358     GraphEvent response = sendAndWait(event);
359     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
360       logSuccessResponse(event, response);
361       return CrudResponseBuilder.buildUpsertEdgeResponse(
362                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
363                                                          version);
364     } else {
365       logErrorResponse(event, response);
366       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
367                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
368     }
369
370   }
371
372   @PreDestroy
373   protected void preShutdown() {
374     timer.cancel();
375
376   }
377   
378   @Override
379   protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
380     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
381         .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
382     event.setDbTransactionId(dbTransId);
383     GraphEvent response = publishEvent(event); 
384     return response.getVertex().toVertex();
385   }
386   
387   @Override
388   protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
389     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
390         .vertex(GraphEventVertex.fromVertex(vertex, version)).build();    
391     event.setDbTransactionId(dbTransId);
392     GraphEvent response = publishEvent(event);
393     return response.getVertex().toVertex();
394   }
395   
396   @Override
397   protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
398     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE).vertex(new GraphEventVertex(id, version, type, null)).build();
399     event.setDbTransactionId(dbTransId);
400     publishEvent(event); 
401   }
402   
403   @Override
404   protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
405     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
406     event.setDbTransactionId(dbTransId);
407     GraphEvent response = publishEvent(event);
408     return response.getEdge().toEdge();
409   }
410   
411   @Override
412   protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
413     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
414     event.setDbTransactionId(dbTransId);
415     GraphEvent response = publishEvent(event);
416     return response.getEdge().toEdge();
417   }
418   
419   @Override
420   protected void deleteBulkEdge(String id, String version, String type, String dbTransId) throws CrudException {
421     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
422         .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
423     event.setDbTransactionId(dbTransId);
424     publishEvent(event);
425   }
426   
427   private GraphEvent publishEvent(GraphEvent event) throws CrudException {
428     GraphEvent response = sendAndWait(event);
429     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
430       logSuccessResponse(event, response);
431     } else {
432       logErrorResponse(event, response);
433       throw new CrudException("Operation failed with transaction-id: " + response.getTransactionId()
434                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
435     } 
436     
437     return response;
438   }
439
440   private void logSuccessResponse(GraphEvent event, GraphEvent response) {
441     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
442         "Event response received: " + response.getObjectType() + " with key: "
443         + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
444         + " , operation: " + event.getOperation().toString() + " , result: "
445         + response.getResult());
446   }
447   
448   private void logErrorResponse(GraphEvent event, GraphEvent response) {
449     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
450         "Event response received: " + response.getObjectType() + " with key: "
451         + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
452         + " , operation: " + event.getOperation().toString() + " , result: "
453         + response.getResult() + " , error: " + response.getErrorMessage());
454   }
455 }