Update license date and text
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncRequestProcessor.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.champ.async;
22
23 import java.util.Optional;
24 import java.util.TimerTask;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ThreadPoolExecutor;
29
30 import javax.naming.OperationNotSupportedException;
31 import javax.ws.rs.core.Response.Status;
32
33 import org.onap.aai.champcore.ChampTransaction;
34 import org.onap.aai.cl.api.Logger;
35 import org.onap.aai.cl.eelf.LoggerFactory;
36 import org.onap.champ.ChampRESTAPI;
37 import org.onap.champ.event.GraphEvent;
38 import org.onap.champ.event.GraphEvent.GraphEventResult;
39 import org.onap.champ.event.GraphEventEdge;
40 import org.onap.champ.event.GraphEventVertex;
41 import org.onap.champ.exception.ChampServiceException;
42 import org.onap.champ.service.ChampDataService;
43 import org.onap.champ.service.ChampThreadFactory;
44 import org.onap.champ.service.logging.ChampMsgs;
45
46 import org.onap.aai.event.api.EventConsumer;
47
48 /**
49  * This Class polls the Graph events from request topic perform the necessary
50  * CRUD operation by calling champDAO and queues up the response to be consumed
51  * by response handler.
52  */
53 public class ChampAsyncRequestProcessor extends TimerTask {
54
55   private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
56
57   private ChampDataService champDataService;
58
59   /**
60    * Number of events that can be queued up.
61    */
62   private Integer requestProcesserQueueSize;
63
64   /**
65    * Number of event publisher worker threads.
66    */
67   private Integer requestProcesserPoolSize;
68   
69   /**
70    * Number of event publisher worker threads.
71    */
72   private Integer requestPollingTimeSeconds;
73
74   /**
75    * Internal queue where outgoing events will be buffered until they can be
76    * serviced by.
77    **/
78   private BlockingQueue<GraphEvent> requestProcesserEventQueue;
79
80   /**
81    * Pool of worker threads that do the work of publishing the events to the
82    * event bus.
83    */
84   private ThreadPoolExecutor requestProcesserPool;
85
86   private ChampAsyncResponsePublisher champAsyncResponsePublisher;
87
88   private EventConsumer asyncRequestConsumer;
89
90   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
91
92   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
93   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
94   private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
95   Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
96
97   public ChampAsyncRequestProcessor(ChampDataService champDataService,
98       ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
99
100     this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
101
102     this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
103
104     this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
105     requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
106     requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
107         new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
108
109     for (int i = 0; i < requestProcesserPoolSize; i++) {
110       requestProcesserPool.submit(new ChampProcessorWorker());
111     }
112
113     this.champDataService = champDataService;
114     this.champAsyncResponsePublisher = champAsyncResponsePublisher;
115     this.asyncRequestConsumer = asyncRequestConsumer;
116     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
117         "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
118             + asyncRequestConsumer.getClass().getName());
119   }
120   
121   
122
123   public ChampAsyncRequestProcessor(ChampDataService champDataService,
124       ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
125       Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
126
127     this.requestProcesserQueueSize = requestProcesserQueueSize;
128
129     this.requestProcesserPoolSize = requestProcesserPoolSize;
130     
131     this.requestPollingTimeSeconds = requestPollingTimeSeconds;
132
133     requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
134     requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
135         new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
136
137     for (int i = 0; i < requestProcesserPoolSize; i++) {
138       requestProcesserPool.submit(new ChampProcessorWorker());
139     }
140
141     this.champDataService = champDataService;
142     this.champAsyncResponsePublisher = champAsyncResponsePublisher;
143     this.asyncRequestConsumer = asyncRequestConsumer;
144     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
145         "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
146             + asyncRequestConsumer.getClass().getName());
147   }
148
149   private class ChampProcessorWorker implements Runnable {
150
151     @Override
152     public void run() {
153
154       while (true) {
155
156         GraphEvent event = null;
157         try {
158           // Get the next event to be published from the queue.
159           event = requestProcesserEventQueue.take();
160         } catch (InterruptedException e) {
161           // Restore the interrupted status.
162           Thread.currentThread().interrupt();
163         }
164
165         // Parse the event and call champ Dao to process , Create the
166         // response event and put it on response queue
167         event.setResult(GraphEventResult.SUCCESS);
168         
169         // Check if this request is part of an ongoing DB transaction
170         ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
171         if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
172           event.setResult(GraphEventResult.FAILURE);
173           event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
174           event.setHttpErrorStatus(Status.BAD_REQUEST);
175         }
176         
177         if (event.getResult() != GraphEventResult.FAILURE) {
178           try {
179             if (event.getVertex() != null) {
180
181               switch (event.getOperation()) {
182               case CREATE:
183                 event.setVertex(GraphEventVertex.fromChampObject(
184                     champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
185                     event.getVertex().getModelVersion()));
186                 break;
187
188               case UPDATE:
189                 event.setVertex(GraphEventVertex.fromChampObject(
190                     champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
191                     event.getVertex().getModelVersion()));
192                 break;
193               case DELETE:
194                 champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
195                 break;
196               default:
197                 // log error
198               }
199             } else if (event.getEdge() != null) {
200               switch (event.getOperation()) {
201               case CREATE:
202                 event.setEdge(GraphEventEdge.fromChampRelationship(
203                     champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
204                     event.getEdge().getModelVersion()));
205                 break;
206
207               case UPDATE:
208                 event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
209                     .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
210                     event.getEdge().getModelVersion()));
211
212                 break;
213               case DELETE:
214                 champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
215                 break;
216               default:
217                 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
218                     "Invalid operation for event transactionId: " + event.getTransactionId());
219               }
220
221             } else {
222               logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
223                   "Invalid payload for event transactionId: " + event.getTransactionId());
224             }
225           } catch (ChampServiceException champException) {
226             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
227             event.setResult(GraphEventResult.FAILURE);
228             event.setErrorMessage(champException.getMessage());
229             event.setHttpErrorStatus(champException.getHttpStatus());
230
231           } catch (Exception ex) {
232             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
233             event.setResult(GraphEventResult.FAILURE);
234             event.setErrorMessage(ex.getMessage());
235             event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
236           }
237         }
238
239         if (event.getResult().equals(GraphEventResult.SUCCESS)) {
240           logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
241               "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
242                   + " , transaction-id: " + event.getTransactionId() + " , operation: "
243                   + event.getOperation().toString() + " , result: " + event.getResult());
244         } else {
245           logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
246               "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
247                   + " , transaction-id: " + event.getTransactionId() + " , operation: "
248                   + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
249                   + event.getErrorMessage());
250         }
251
252         champAsyncResponsePublisher.publishResponseEvent(event);
253
254       }
255     }
256   }
257
258   @Override
259   public void run() {
260
261     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
262
263     if (asyncRequestConsumer == null) {
264       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
265     }
266
267     Iterable<String> events = null;
268     try {
269       events = asyncRequestConsumer.consume();
270     } catch (Exception e) {
271       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
272       return;
273     }
274
275     if (events == null || !events.iterator().hasNext()) {
276       logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
277
278     }
279
280     for (String event : events) {
281       try {
282         GraphEvent requestEvent = GraphEvent.fromJson(event);
283         auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
284             "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
285                 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
286                 + requestEvent.getOperation().toString());
287         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
288             "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
289                 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
290                 + requestEvent.getOperation().toString());
291         logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
292
293         // Try to submit the event to be published to the event bus.
294         if (!requestProcesserEventQueue.offer(requestEvent)) {
295           logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
296               "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
297         }
298
299       } catch (Exception e) {
300         logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
301       }
302     }
303
304     try {
305       asyncRequestConsumer.commitOffsets();
306     } catch(OperationNotSupportedException e) {
307         //Dmaap doesnt support commit with offset       
308         logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
309     } 
310     catch (Exception e) {
311       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
312     }
313
314   }
315
316
317
318   public Integer getRequestPollingTimeSeconds() {
319     return requestPollingTimeSeconds;
320   }
321
322   
323 }