9efc7df1a1009e983c86fcaf324fb014f9c89df7
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * Gizmo
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
23  */
24 package org.onap.crud.service;
25
26 import com.att.ecomp.event.api.EventConsumer;
27 import com.att.ecomp.event.api.EventPublisher;
28
29 import org.onap.aai.cl.api.LogFields;
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.cl.mdc.MdcOverride;
34 import org.onap.crud.dao.GraphDao;
35 import org.onap.crud.entity.Edge;
36 import org.onap.crud.entity.Vertex;
37 import org.onap.crud.event.GraphEvent;
38 import org.onap.crud.event.GraphEvent.GraphEventOperation;
39 import org.onap.crud.event.GraphEvent.GraphEventResult;
40 import org.onap.crud.event.GraphEventEdge;
41 import org.onap.crud.event.GraphEventVertex;
42 import org.onap.crud.exception.CrudException;
43 import org.onap.crud.logging.CrudServiceMsgs;
44 import org.onap.crud.parser.CrudResponseBuilder;
45 import org.onap.crud.util.CrudProperties;
46 import org.onap.crud.util.CrudServiceConstants;
47 import org.onap.crud.util.CrudServiceUtil;
48 import org.onap.schema.OxmModelValidator;
49 import org.onap.schema.RelationshipSchemaValidator;
50
51 import java.text.SimpleDateFormat;
52 import java.util.List;
53 import java.util.Map;
54 import java.util.Timer;
55 import java.util.concurrent.Callable;
56 import java.util.concurrent.CountDownLatch;
57 import java.util.concurrent.ExecutionException;
58 import java.util.concurrent.ExecutorService;
59 import java.util.concurrent.Executors;
60 import java.util.concurrent.Future;
61 import java.util.concurrent.TimeUnit;
62 import java.util.concurrent.TimeoutException;
63 import javax.annotation.PreDestroy;
64 import javax.ws.rs.core.Response.Status;
65
66 public class CrudAsyncGraphDataService {
67
68   private static Integer requestTimeOut;
69
70   private GraphDao dao;
71
72   private EventPublisher asyncRequestPublisher;
73
74   private Timer timer;
75
76   public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
77   private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
78
79   private static Logger logger = LoggerFactory.getInstance()
80     .getLogger(CrudAsyncGraphDataService.class.getName());
81   private static Logger metricsLogger = LoggerFactory.getInstance()
82     .getMetricsLogger(CrudAsyncGraphDataService.class.getName());
83   private static LogFields OK_FIELDS = new LogFields();
84
85   static {
86                 OK_FIELDS.setField(Status.OK, Status.OK.toString());
87   }
88
89   public static Integer getRequestTimeOut() {
90     return requestTimeOut;
91   }
92
93   public CrudAsyncGraphDataService(GraphDao dao, 
94                   EventPublisher asyncRequestPublisher,
95                   EventConsumer asyncResponseConsumer) throws CrudException {
96
97     requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
98     try {
99       requestTimeOut
100         = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
101     } catch (NumberFormatException ex) {
102       // Leave it as the default
103     }
104
105     Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
106     try {
107       responsePollInterval = Integer
108         .parseInt(CrudProperties
109                   .get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
110     } catch (Exception ex) {
111       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
112                    + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL
113                    + " error: " + ex.getMessage());
114     }
115
116     this.dao = dao;
117
118     // Start the Response Consumer timer
119     CrudAsyncResponseConsumer crudAsyncResponseConsumer
120       = new CrudAsyncResponseConsumer(asyncResponseConsumer);
121     timer = new Timer("crudAsyncResponseConsumer-1");
122     timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
123
124     this.asyncRequestPublisher = asyncRequestPublisher;
125
126     // load the schemas
127     CrudServiceUtil.loadModels();
128     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
129                 "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
130   }
131
132   public class CollectGraphResponse implements Callable<GraphEvent> {
133     private volatile GraphEvent graphEvent;
134     private volatile CountDownLatch latch = new CountDownLatch(1);
135
136     @Override
137     public GraphEvent call() throws TimeoutException {
138       try {
139         // Wait until graphEvent is available
140         latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
141       } catch (InterruptedException e) {
142         latch.countDown();
143         if (this.graphEvent != null) {
144           return this.graphEvent;
145         } else {
146           throw new TimeoutException();
147         }
148       }
149       return this.graphEvent;
150     }
151
152     public void populateGraphEvent(GraphEvent event) {
153       this.graphEvent = event;
154       latch.countDown();
155     }
156   }
157
158   private GraphEvent sendAndWait(GraphEvent event) throws Exception {
159
160     long startTimeInMs = System.currentTimeMillis();
161     SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
162     MdcOverride override = new MdcOverride();
163     override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
164
165     // publish to request queue
166     asyncRequestPublisher.sendSync(event.toJson());
167
168     logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
169                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
170                 + " , transaction-id: " + event.getTransactionId() + " , operation: "
171                 + event.getOperation().toString());
172
173     ExecutorService executor = Executors
174       .newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
175     CollectGraphResponse collector = new CollectGraphResponse();
176     CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
177     GraphEvent response;
178     Future<GraphEvent> future = executor.submit(collector);
179     try {
180       response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
181
182     } catch (InterruptedException | ExecutionException | TimeoutException e) {
183       CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
184       logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
185                    "Request timed out for transactionId: " + event.getTransactionId());
186       future.cancel(true);
187       throw new CrudException("Timed out , transactionId: " + event.getTransactionId()
188                               + " , operation: " + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
189     } finally {      
190       //Kill the thread as the work is completed
191       executor.shutdownNow();
192     }
193     metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, OK_FIELDS, override,
194             "Total elapsed time for operation: " + event.getOperation().toString()
195             + " , transactionId: " + event.getTransactionId() + " is "
196             + Long.toString(System.currentTimeMillis() - startTimeInMs) + " ms");
197     return response;
198   }
199
200   public String addVertex(String version, String type, VertexPayload payload) throws Exception {
201     // Validate the incoming payload
202     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version,
203                                                                     type, payload.getProperties());
204     // Create graph request event
205     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
206       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
207
208     GraphEvent response = sendAndWait(event);
209     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
210       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
211                   "Event response received: " + response.getObjectType() + " with key: "
212                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
213                   + " , operation: " + event.getOperation().toString() + " , result: "
214                   + response.getResult());
215       return CrudResponseBuilder.buildUpsertVertexResponse(
216                                                            OxmModelValidator.validateOutgoingPayload(version,
217                                                                                                      response.getVertex().toVertex()), version);
218     } else {
219       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
220                   "Event response received: " + response.getObjectType() + " with key: "
221                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
222                   + " , operation: " + event.getOperation().toString() + " , result: "
223                   + response.getResult() + " , error: " + response.getErrorMessage());
224       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
225                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
226     }
227
228   }
229
230   public String addEdge(String version, String type, EdgePayload payload) throws Exception {
231     Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
232     // Create graph request event
233     GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
234       .edge(GraphEventEdge.fromEdge(edge, version)).build();
235
236     GraphEvent response = sendAndWait(event);
237     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
238       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
239                   "Event response received: " + response.getObjectType() + " with key: "
240                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
241                   + " , operation: " + event.getOperation().toString() + " , result: "
242                   + response.getResult());
243       return CrudResponseBuilder.buildUpsertEdgeResponse(
244                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
245                                                          version);
246     } else {
247       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
248                   "Event response received: " + response.getObjectType() + " with key: "
249                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
250                   + " , operation: " + event.getOperation().toString() + " , result: "
251                   + response.getResult() + " , error: " + response.getErrorMessage());
252       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
253                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
254     }
255   }
256
257   public String updateVertex(String version, String id, String type, VertexPayload payload)
258     throws Exception {
259     Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version,
260                                                                     type, payload.getProperties());
261     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
262       .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
263
264     GraphEvent response = sendAndWait(event);
265     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
266       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
267                   "Event response received: " + response.getObjectType() + " with key: "
268                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
269                   + " , operation: " + event.getOperation().toString() + " , result: "
270                   + response.getResult());
271       return CrudResponseBuilder.buildUpsertVertexResponse(
272                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
273                                                            version);
274     } else {
275       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
276                   "Event response received: " + response.getObjectType() + " with key: "
277                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
278                   + " , operation: " + event.getOperation().toString() + " , result: "
279                   + response.getResult() + " , error: " + response.getErrorMessage());
280       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
281                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
282     }
283
284   }
285
286   public String patchVertex(String version, String id, String type, VertexPayload payload)
287     throws Exception {
288     Vertex existingVertex
289       = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type));
290     Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version,
291                                                                           type, payload.getProperties(),
292                                                                           existingVertex);
293     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
294       .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
295
296     GraphEvent response = sendAndWait(event);
297     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
298       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
299                   "Event response received: " + response.getObjectType() + " with key: "
300                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
301                   + " , operation: " + event.getOperation().toString() + " , result: "
302                   + response.getResult());
303       return CrudResponseBuilder.buildUpsertVertexResponse(
304                                                            OxmModelValidator.validateOutgoingPayload(version, response.getVertex().toVertex()),
305                                                            version);
306     } else {
307       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
308                   "Event response received: " + response.getObjectType() + " with key: "
309                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
310                   + " , operation: " + event.getOperation().toString() + " , result: "
311                   + response.getResult() + " , error: " + response.getErrorMessage());
312       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
313                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
314     }
315
316   }
317
318   public String deleteVertex(String version, String id, String type) throws Exception {
319     type = OxmModelValidator.resolveCollectionType(version, type);
320     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
321       .vertex(new GraphEventVertex(id, version, type, null)).build();
322
323     GraphEvent response = sendAndWait(event);
324     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
325       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
326                   "Event response received: " + response.getObjectType() + " with key: "
327                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
328                   + " , operation: " + event.getOperation().toString() + " , result: "
329                   + response.getResult());
330       return "";
331     } else {
332       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
333                   "Event response received: " + response.getObjectType() + " with key: "
334                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
335                   + " , operation: " + event.getOperation().toString() + " , result: "
336                   + response.getResult() + " , error: " + response.getErrorMessage());
337       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
338                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
339     }
340
341   }
342
343   public String deleteEdge(String version, String id, String type) throws Exception {
344     RelationshipSchemaValidator.validateType(version, type);
345     GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
346       .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
347
348     GraphEvent response = sendAndWait(event);
349     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
350       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
351                   "Event response received: " + response.getObjectType() + " with key: "
352                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
353                   + " , operation: " + event.getOperation().toString() + " , result: "
354                   + response.getResult());
355       return "";
356     } else {
357       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
358                   "Event response received: " + response.getObjectType() + " with key: "
359                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
360                   + " , operation: " + event.getOperation().toString() + " , result: "
361                   + response.getResult() + " , error: " + response.getErrorMessage());
362       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
363                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
364     }
365
366   }
367
368   public String updateEdge(String version, String id, String type, EdgePayload payload)
369     throws Exception {
370     Edge edge = dao.getEdge(id, type);
371     Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version,
372                                                                                    payload);
373     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
374       .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
375
376     GraphEvent response = sendAndWait(event);
377     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
378       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
379                   "Event response received: " + response.getObjectType() + " with key: "
380                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
381                   + " , operation: " + event.getOperation().toString() + " , result: "
382                   + response.getResult());
383       return CrudResponseBuilder.buildUpsertEdgeResponse(
384                                                          RelationshipSchemaValidator.validateOutgoingPayload(version,
385                                                                                                              response.getEdge().toEdge()), version);
386     } else {
387       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
388                   "Event response received: " + response.getObjectType() + " with key: "
389                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
390                   + " , operation: " + event.getOperation().toString() + " , result: "
391                   + response.getResult() + " , error: " + response.getErrorMessage());
392       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
393                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
394     }
395
396   }
397
398   public String patchEdge(String version, String id, String type, EdgePayload payload)
399     throws Exception {
400     Edge edge = dao.getEdge(id, type);
401     Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version,
402                                                                                 payload);
403     GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
404       .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
405
406     GraphEvent response = sendAndWait(event);
407     if (response.getResult().equals(GraphEventResult.SUCCESS)) {
408       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
409                   "Event response received: " + response.getObjectType() + " with key: "
410                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
411                   + " , operation: " + event.getOperation().toString() + " , result: "
412                   + response.getResult());
413       return CrudResponseBuilder.buildUpsertEdgeResponse(
414                                                          RelationshipSchemaValidator.validateOutgoingPayload(version, response.getEdge().toEdge()),
415                                                          version);
416     } else {
417       logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
418                   "Event response received: " + response.getObjectType() + " with key: "
419                   + response.getObjectKey() + " , transaction-id: " + response.getTransactionId()
420                   + " , operation: " + event.getOperation().toString() + " , result: "
421                   + response.getResult() + " , error: " + response.getErrorMessage());
422       throw new CrudException("Operation Failed with transaction-id: " + response.getTransactionId()
423                               + " Error: " + response.getErrorMessage(), response.getHttpErrorStatus());
424     }
425
426   }
427
428   public String getEdge(String version, String id, String type) throws CrudException {
429     RelationshipSchemaValidator.validateType(version, type);
430     Edge edge = dao.getEdge(id, type);
431
432     return CrudResponseBuilder.buildGetEdgeResponse(RelationshipSchemaValidator
433                                                     .validateOutgoingPayload(version, edge),
434                                                     version);
435   }
436
437   public String getEdges(String version, String type, Map<String, String> filter)
438     throws CrudException {
439     RelationshipSchemaValidator.validateType(version, type);
440     List<Edge> items = dao.getEdges(type,
441                                     RelationshipSchemaValidator.resolveCollectionfilter(version, type, filter));
442     return CrudResponseBuilder.buildGetEdgesResponse(items, version);
443   }
444
445   public String getVertex(String version, String id, String type) throws CrudException {
446     type = OxmModelValidator.resolveCollectionType(version, type);
447     Vertex vertex = dao.getVertex(id, type);
448     List<Edge> edges = dao.getVertexEdges(id);
449     return CrudResponseBuilder.buildGetVertexResponse(OxmModelValidator
450                                                       .validateOutgoingPayload(version, vertex), edges,
451                                                       version);
452   }
453
454   public String getVertices(String version, String type, Map<String, String> filter)
455     throws CrudException {
456     type = OxmModelValidator.resolveCollectionType(version, type);
457     List<Vertex> items = dao.getVertices(type,
458                                          OxmModelValidator.resolveCollectionfilter(version, type, filter));
459     return CrudResponseBuilder.buildGetVerticesResponse(items, version);
460   }
461
462   @PreDestroy
463   protected void preShutdown() {
464     timer.cancel();
465
466   }
467
468
469 }