Auto-resolve edge type
[aai/gizmo.git] / src / main / java / org / onap / crud / service / CrudAsyncGraphDataService.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.crud.service;
22
23 import java.io.IOException;
24 import java.security.NoSuchAlgorithmException;
25 import java.text.SimpleDateFormat;
26 import java.util.HashMap;
27 import java.util.Timer;
28 import java.util.concurrent.Callable;
29 import java.util.concurrent.CountDownLatch;
30 import java.util.concurrent.ExecutionException;
31 import java.util.concurrent.ExecutorService;
32 import java.util.concurrent.Executors;
33 import java.util.concurrent.Future;
34 import java.util.concurrent.TimeUnit;
35 import java.util.concurrent.TimeoutException;
36 import javax.annotation.PreDestroy;
37 import javax.ws.rs.core.EntityTag;
38 import javax.ws.rs.core.Response.Status;
39 import org.apache.commons.lang3.tuple.ImmutablePair;
40 import org.onap.aai.cl.api.LogFields;
41 import org.onap.aai.cl.api.Logger;
42 import org.onap.aai.cl.eelf.LoggerFactory;
43 import org.onap.aai.cl.mdc.MdcContext;
44 import org.onap.aai.cl.mdc.MdcOverride;
45 import org.onap.aai.event.api.EventConsumer;
46 import org.onap.aai.event.api.EventPublisher;
47 import org.onap.aai.restclient.client.OperationResult;
48 import org.onap.crud.dao.GraphDao;
49 import org.onap.crud.entity.Edge;
50 import org.onap.crud.entity.Vertex;
51 import org.onap.crud.event.GraphEvent;
52 import org.onap.crud.event.GraphEvent.GraphEventOperation;
53 import org.onap.crud.event.GraphEventEdge;
54 import org.onap.crud.event.GraphEventVertex;
55 import org.onap.crud.event.envelope.GraphEventEnvelope;
56 import org.onap.crud.event.response.GraphEventResponseHandler;
57 import org.onap.crud.exception.CrudException;
58 import org.onap.crud.logging.CrudServiceMsgs;
59 import org.onap.crud.parser.EdgePayload;
60 import org.onap.crud.parser.VertexPayload;
61 import org.onap.crud.util.CrudProperties;
62 import org.onap.crud.util.CrudServiceConstants;
63 import org.onap.crud.util.etag.EtagGenerator;
64 import org.onap.schema.validation.OxmModelValidator;
65 import org.onap.schema.validation.RelationshipSchemaValidator;
66
67 public class CrudAsyncGraphDataService extends AbstractGraphDataService {
68
69     private static Integer requestTimeOut;
70
71     private EventPublisher asyncRequestPublisher;
72
73     private Timer timer;
74
75     public static final Integer DEFAULT_REQUEST_TIMEOUT = 30000;
76     private static final Integer DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL = 1000;
77
78     private static Logger logger = LoggerFactory.getInstance().getLogger(CrudAsyncGraphDataService.class.getName());
79     private static Logger metricsLogger =
80             LoggerFactory.getInstance().getMetricsLogger(CrudAsyncGraphDataService.class.getName());
81     private static LogFields okFields = new LogFields();
82     private EtagGenerator etagGenerator;
83
84     static {
85         okFields.setField(Status.OK, Status.OK.toString());
86     }
87
88     private GraphEventResponseHandler responseHandler = new GraphEventResponseHandler();
89
90     public static Integer getRequestTimeOut() {
91         return requestTimeOut;
92     }
93
94     public CrudAsyncGraphDataService(GraphDao dao, EventPublisher asyncRequestPublisher,
95             EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
96         this(dao, dao, asyncRequestPublisher, asyncResponseConsumer);
97     }
98
99     public CrudAsyncGraphDataService(GraphDao dao, GraphDao daoForGet, EventPublisher asyncRequestPublisher,
100             EventConsumer asyncResponseConsumer) throws CrudException, NoSuchAlgorithmException {
101
102         super();
103         this.dao = dao;
104         this.daoForGet = daoForGet;
105
106         requestTimeOut = DEFAULT_REQUEST_TIMEOUT;
107         try {
108             requestTimeOut = Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_REQUEST_TIMEOUT));
109         } catch (NumberFormatException ex) {
110             // Leave it as the default
111         }
112
113         Integer responsePollInterval = DEFAULT_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL;
114         try {
115             responsePollInterval =
116                     Integer.parseInt(CrudProperties.get(CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL));
117         } catch (Exception ex) {
118             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR, "Unable to parse "
119                     + CrudServiceConstants.CRD_ASYNC_RESPONSE_PROCESS_POLL_INTERVAL + " error: " + ex.getMessage());
120         }
121
122         // Start the Response Consumer timer
123         CrudAsyncResponseConsumer crudAsyncResponseConsumer = new CrudAsyncResponseConsumer(
124             asyncResponseConsumer, new GraphEventUpdater()
125         );
126         timer = new Timer("crudAsyncResponseConsumer-1");
127         timer.schedule(crudAsyncResponseConsumer, responsePollInterval, responsePollInterval);
128
129         this.asyncRequestPublisher = asyncRequestPublisher;
130         this.etagGenerator = new EtagGenerator();
131
132         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "CrudAsyncGraphDataService initialized SUCCESSFULLY!");
133     }
134
135     public class CollectGraphResponse implements Callable<GraphEventEnvelope> {
136         private volatile GraphEventEnvelope graphEventEnvelope;
137         private volatile CountDownLatch latch = new CountDownLatch(1);
138
139         @Override
140         public GraphEventEnvelope call() throws TimeoutException {
141             try {
142                 // Wait until graphEvent is available
143                 latch.await(CrudAsyncGraphDataService.getRequestTimeOut(), TimeUnit.MILLISECONDS);
144             } catch (InterruptedException e) {
145                 latch.countDown();
146                 if (this.graphEventEnvelope != null) {
147                     return this.graphEventEnvelope;
148                 } else {
149                     throw new TimeoutException();
150                 }
151             }
152             return this.graphEventEnvelope;
153         }
154
155         public void populateGraphEventEnvelope(GraphEventEnvelope eventEnvelope) {
156             this.graphEventEnvelope = eventEnvelope;
157             latch.countDown();
158         }
159     }
160
161     private GraphEventEnvelope sendAndWait(GraphEvent event) throws CrudException {
162
163         long startTimeInMs = System.currentTimeMillis();
164         SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
165         MdcOverride override = new MdcOverride();
166         override.addAttribute(MdcContext.MDC_START_TIME, formatter.format(startTimeInMs));
167
168         String eventEnvelopeJson = new GraphEventEnvelope(event).toJson();
169
170         // publish to request queue
171         try {
172             asyncRequestPublisher.sendSync(eventEnvelopeJson);
173         } catch (Exception e) {
174             throw new CrudException(
175                     "Error publishing request " + event.getTransactionId() + "  Cause: " + e.getMessage(),
176                     Status.INTERNAL_SERVER_ERROR);
177         }
178
179         logger.debug(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, "Event Sent =" + eventEnvelopeJson);
180
181         logger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO,
182                 "Event submitted of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
183                         + " , transaction-id: " + event.getTransactionId() + " , operation: "
184                         + event.getOperation().toString());
185
186         ExecutorService executor =
187                 Executors.newSingleThreadExecutor(new CrudThreadFactory("TX-" + event.getTransactionId()));
188         CollectGraphResponse collector = new CollectGraphResponse();
189         CrudAsyncGraphEventCache.put(event.getTransactionId(), collector);
190         GraphEventEnvelope response;
191         Future<GraphEventEnvelope> future = executor.submit(collector);
192         try {
193             response = future.get(requestTimeOut, TimeUnit.MILLISECONDS);
194
195         } catch (InterruptedException | ExecutionException | TimeoutException e) {
196             CrudAsyncGraphEventCache.invalidate(event.getTransactionId());
197             logger.error(CrudServiceMsgs.ASYNC_DATA_SERVICE_ERROR,
198                     "Request timed out for transactionId: " + event.getTransactionId());
199             future.cancel(true);
200             throw new CrudException("Timed out , transactionId: " + event.getTransactionId() + " , operation: "
201                     + event.getOperation().toString(), Status.INTERNAL_SERVER_ERROR);
202         } finally {
203             // Kill the thread as the work is completed
204             executor.shutdownNow();
205         }
206         metricsLogger.info(CrudServiceMsgs.ASYNC_DATA_SERVICE_INFO, okFields, override,
207                 "Total elapsed time for operation: " + event.getOperation().toString() + " , transactionId: "
208                         + event.getTransactionId() + " is " + Long.toString(System.currentTimeMillis() - startTimeInMs)
209                         + " ms");
210         return response;
211     }
212
213     @Override
214     public ImmutablePair<EntityTag, String> addVertex(String version, String type, VertexPayload payload)
215             throws CrudException {
216         // Validate the incoming payload
217         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(null, version, type, payload.getProperties());
218         vertex.getProperties().put(OxmModelValidator.Metadata.NODE_TYPE.propertyName(), type);
219         // Create graph request event
220         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
221                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
222
223         GraphEventEnvelope response = sendAndWait(event);
224
225         EntityTag entityTag;
226         try {
227             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
228         } catch (IOException e) {
229             throw new CrudException(e);
230         }
231         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
232
233         return new ImmutablePair<>(entityTag, responsePayload);
234     }
235
236     @Override
237     public ImmutablePair<EntityTag, String> addEdge(String version, String type, EdgePayload payload)
238             throws CrudException {
239         Edge edge = RelationshipSchemaValidator.validateIncomingAddPayload(version, type, payload);
240
241         // Create graph request event
242         GraphEvent event =
243                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
244
245         GraphEventEnvelope response = sendAndWait(event);
246
247         EntityTag entityTag;
248         try {
249             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
250         } catch (IOException e) {
251             throw new CrudException(e);
252         }
253         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
254
255         return new ImmutablePair<>(entityTag, responsePayload);
256     }
257
258     @Override
259     public ImmutablePair<EntityTag, String> updateVertex(String version, String id, String type, VertexPayload payload)
260             throws CrudException {
261         Vertex vertex = OxmModelValidator.validateIncomingUpsertPayload(id, version, type, payload.getProperties());
262         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
263                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
264
265         GraphEventEnvelope response = sendAndWait(event);
266
267         EntityTag entityTag;
268         try {
269             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
270         } catch (IOException e) {
271             throw new CrudException(e);
272         }
273         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
274
275         return new ImmutablePair<>(entityTag, responsePayload);
276     }
277
278     @Override
279     public ImmutablePair<EntityTag, String> patchVertex(String version, String id, String type, VertexPayload payload)
280             throws CrudException {
281         OperationResult existingVertexOpResult = dao.getVertex(id, OxmModelValidator.resolveCollectionType(version, type), version,
282                 new HashMap<String, String>());
283         Vertex existingVertex = Vertex.fromJson(existingVertexOpResult.getResult(), version);
284         Vertex patchedVertex = OxmModelValidator.validateIncomingPatchPayload(id, version, type,
285                 payload.getProperties(), existingVertex);
286         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
287                 .vertex(GraphEventVertex.fromVertex(patchedVertex, version)).build();
288
289         GraphEventEnvelope response = sendAndWait(event);
290
291         EntityTag entityTag;
292         try {
293             entityTag = new EntityTag(etagGenerator.computeHashForVertex(response.getBody().getVertex()));
294         } catch (IOException e) {
295             throw new CrudException(e);
296         }
297         String responsePayload = responseHandler.handleVertexResponse(version, event, response);
298
299         return new ImmutablePair<>(entityTag, responsePayload);
300     }
301
302     @Override
303     public String deleteVertex(String version, String id, String type) throws CrudException {
304         type = OxmModelValidator.resolveCollectionType(version, type);
305         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
306                 .vertex(new GraphEventVertex(id, version, type, null)).build();
307
308         GraphEventEnvelope response = sendAndWait(event);
309         return responseHandler.handleDeletionResponse(event, response);
310     }
311
312     @Override
313     public String deleteEdge(String version, String id, String type) throws CrudException {
314         RelationshipSchemaValidator.validateType(version, type);
315         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
316                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
317
318         GraphEventEnvelope response = sendAndWait(event);
319         return responseHandler.handleDeletionResponse(event, response);
320     }
321
322     @Override
323     public ImmutablePair<EntityTag, String> updateEdge(String version, String id, String type, EdgePayload payload)
324             throws CrudException {
325         OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
326         Edge edge = Edge.fromJson(operationResult.getResult());
327         Edge validatedEdge = RelationshipSchemaValidator.validateIncomingUpdatePayload(edge, version, payload);
328
329         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
330                 .edge(GraphEventEdge.fromEdge(validatedEdge, version)).build();
331
332         GraphEventEnvelope response = sendAndWait(event);
333
334         EntityTag entityTag;
335         try {
336             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
337         } catch (IOException e) {
338             throw new CrudException(e);
339         }
340         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
341
342         return new ImmutablePair<>(entityTag, responsePayload);
343     }
344
345     @Override
346     public ImmutablePair<EntityTag, String> patchEdge(String version, String id, String type, EdgePayload payload)
347             throws CrudException {
348         OperationResult operationResult = dao.getEdge(id, type, new HashMap<String, String>());
349         Edge edge = Edge.fromJson(operationResult.getResult());
350         Edge patchedEdge = RelationshipSchemaValidator.validateIncomingPatchPayload(edge, version, payload);
351         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
352                 .edge(GraphEventEdge.fromEdge(patchedEdge, version)).build();
353
354         GraphEventEnvelope response = sendAndWait(event);
355
356         EntityTag entityTag;
357         try {
358             entityTag = new EntityTag(etagGenerator.computeHashForEdge(response.getBody().getEdge()));
359         } catch (IOException e) {
360             throw new CrudException(e);
361         }
362         String responsePayload = responseHandler.handleEdgeResponse(version, event, response);
363
364         return new ImmutablePair<>(entityTag, responsePayload);
365     }
366
367     @PreDestroy
368     protected void preShutdown() {
369         timer.cancel();
370     }
371
372     @Override
373     protected Vertex addBulkVertex(Vertex vertex, String version, String dbTransId) throws CrudException {
374         GraphEvent event = GraphEvent.builder(GraphEventOperation.CREATE)
375                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
376         event.setDbTransactionId(dbTransId);
377         GraphEvent response = publishEvent(event);
378         return response.getVertex().toVertex();
379     }
380
381     @Override
382     protected Vertex updateBulkVertex(Vertex vertex, String id, String version, String dbTransId) throws CrudException {
383         GraphEvent event = GraphEvent.builder(GraphEventOperation.UPDATE)
384                 .vertex(GraphEventVertex.fromVertex(vertex, version)).build();
385         event.setDbTransactionId(dbTransId);
386         GraphEvent response = publishEvent(event);
387         return response.getVertex().toVertex();
388     }
389
390     @Override
391     protected void deleteBulkVertex(String id, String version, String type, String dbTransId) throws CrudException {
392         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
393                 .vertex(new GraphEventVertex(id, version, type, null)).build();
394         event.setDbTransactionId(dbTransId);
395         publishEvent(event);
396     }
397
398     @Override
399     protected Edge addBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
400         GraphEvent event =
401                 GraphEvent.builder(GraphEventOperation.CREATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
402         event.setDbTransactionId(dbTransId);
403         GraphEvent response = publishEvent(event);
404         return response.getEdge().toEdge();
405     }
406
407     @Override
408     protected Edge updateBulkEdge(Edge edge, String version, String dbTransId) throws CrudException {
409         GraphEvent event =
410                 GraphEvent.builder(GraphEventOperation.UPDATE).edge(GraphEventEdge.fromEdge(edge, version)).build();
411         event.setDbTransactionId(dbTransId);
412         GraphEvent response = publishEvent(event);
413         return response.getEdge().toEdge();
414     }
415
416     @Override
417     protected void deleteBulkEdge(String id, String version, String dbTransId) throws CrudException {
418         // Get the edge type
419         String type = null;
420         try {
421             Edge edge = daoForGet.getEdge(id);
422             type = edge.getType();
423         }
424         catch (CrudException ex) {
425             // Likely the client is trying to delete an edge which isn't present.  Just swallow the exception
426             // and let the bulk request fail via the normal path.
427         }
428         
429         GraphEvent event = GraphEvent.builder(GraphEventOperation.DELETE)
430                 .edge(new GraphEventEdge(id, version, type, null, null, null)).build();
431         event.setDbTransactionId(dbTransId);
432         publishEvent(event);
433     }
434
435     private GraphEvent publishEvent(GraphEvent event) throws CrudException {
436         GraphEventEnvelope response = sendAndWait(event);
437         responseHandler.handleBulkEventResponse(event, response);
438         return response.getBody();
439     }
440 }