ed25f8cf24d96f378fd0c1698df70a8ef1a0bdbc
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncRequestProcessor.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.champ.async;
23
24 import java.util.Optional;
25 import java.util.TimerTask;
26 import java.util.concurrent.ArrayBlockingQueue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadPoolExecutor;
30
31 import javax.naming.OperationNotSupportedException;
32 import javax.ws.rs.core.Response.Status;
33
34 import org.onap.aai.champcore.ChampTransaction;
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.champ.ChampRESTAPI;
38 import org.onap.champ.event.GraphEvent;
39 import org.onap.champ.event.GraphEvent.GraphEventResult;
40 import org.onap.champ.event.GraphEventEdge;
41 import org.onap.champ.event.GraphEventVertex;
42 import org.onap.champ.exception.ChampServiceException;
43 import org.onap.champ.service.ChampDataService;
44 import org.onap.champ.service.ChampThreadFactory;
45 import org.onap.champ.service.logging.ChampMsgs;
46
47 import org.onap.aai.event.api.EventConsumer;
48
49 /**
50  * This Class polls the Graph events from request topic perform the necessary
51  * CRUD operation by calling champDAO and queues up the response to be consumed
52  * by response handler.
53  */
54 public class ChampAsyncRequestProcessor extends TimerTask {
55
56   private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
57
58   private ChampDataService champDataService;
59
60   /**
61    * Number of events that can be queued up.
62    */
63   private Integer requestProcesserQueueSize;
64
65   /**
66    * Number of event publisher worker threads.
67    */
68   private Integer requestProcesserPoolSize;
69   
70   /**
71    * Number of event publisher worker threads.
72    */
73   private Integer requestPollingTimeSeconds;
74
75   /**
76    * Internal queue where outgoing events will be buffered until they can be
77    * serviced by.
78    **/
79   private BlockingQueue<GraphEvent> requestProcesserEventQueue;
80
81   /**
82    * Pool of worker threads that do the work of publishing the events to the
83    * event bus.
84    */
85   private ThreadPoolExecutor requestProcesserPool;
86
87   private ChampAsyncResponsePublisher champAsyncResponsePublisher;
88
89   private EventConsumer asyncRequestConsumer;
90
91   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
92
93   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
94   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
95   private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
96   Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
97
98   public ChampAsyncRequestProcessor(ChampDataService champDataService,
99       ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
100
101     this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
102
103     this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
104
105     this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
106     requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
107     requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
108         new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
109
110     for (int i = 0; i < requestProcesserPoolSize; i++) {
111       requestProcesserPool.submit(new ChampProcessorWorker());
112     }
113
114     this.champDataService = champDataService;
115     this.champAsyncResponsePublisher = champAsyncResponsePublisher;
116     this.asyncRequestConsumer = asyncRequestConsumer;
117     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
118         "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
119             + asyncRequestConsumer.getClass().getName());
120   }
121   
122   
123
124   public ChampAsyncRequestProcessor(ChampDataService champDataService,
125       ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
126       Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
127
128     this.requestProcesserQueueSize = requestProcesserQueueSize;
129
130     this.requestProcesserPoolSize = requestProcesserPoolSize;
131     
132     this.requestPollingTimeSeconds = requestPollingTimeSeconds;
133
134     requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
135     requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
136         new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
137
138     for (int i = 0; i < requestProcesserPoolSize; i++) {
139       requestProcesserPool.submit(new ChampProcessorWorker());
140     }
141
142     this.champDataService = champDataService;
143     this.champAsyncResponsePublisher = champAsyncResponsePublisher;
144     this.asyncRequestConsumer = asyncRequestConsumer;
145     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
146         "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
147             + asyncRequestConsumer.getClass().getName());
148   }
149
150   private class ChampProcessorWorker implements Runnable {
151
152     @Override
153     public void run() {
154
155       while (true) {
156
157         GraphEvent event = null;
158         try {
159           // Get the next event to be published from the queue.
160           event = requestProcesserEventQueue.take();
161         } catch (InterruptedException e) {
162           // Restore the interrupted status.
163           Thread.currentThread().interrupt();
164         }
165
166         // Parse the event and call champ Dao to process , Create the
167         // response event and put it on response queue
168         event.setResult(GraphEventResult.SUCCESS);
169         
170         // Check if this request is part of an ongoing DB transaction
171         ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
172         if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
173           event.setResult(GraphEventResult.FAILURE);
174           event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
175           event.setHttpErrorStatus(Status.BAD_REQUEST);
176         }
177         
178         if (event.getResult() != GraphEventResult.FAILURE) {
179           try {
180             if (event.getVertex() != null) {
181
182               switch (event.getOperation()) {
183               case CREATE:
184                 event.setVertex(GraphEventVertex.fromChampObject(
185                     champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
186                     event.getVertex().getModelVersion()));
187                 break;
188
189               case UPDATE:
190                 event.setVertex(GraphEventVertex.fromChampObject(
191                     champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
192                     event.getVertex().getModelVersion()));
193                 break;
194               case DELETE:
195                 champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
196                 break;
197               default:
198                 // log error
199               }
200             } else if (event.getEdge() != null) {
201               switch (event.getOperation()) {
202               case CREATE:
203                 event.setEdge(GraphEventEdge.fromChampRelationship(
204                     champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
205                     event.getEdge().getModelVersion()));
206                 break;
207
208               case UPDATE:
209                 event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
210                     .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
211                     event.getEdge().getModelVersion()));
212
213                 break;
214               case DELETE:
215                 champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
216                 break;
217               default:
218                 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
219                     "Invalid operation for event transactionId: " + event.getTransactionId());
220               }
221
222             } else {
223               logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
224                   "Invalid payload for event transactionId: " + event.getTransactionId());
225             }
226           } catch (ChampServiceException champException) {
227             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
228             event.setResult(GraphEventResult.FAILURE);
229             event.setErrorMessage(champException.getMessage());
230             event.setHttpErrorStatus(champException.getHttpStatus());
231
232           } catch (Exception ex) {
233             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
234             event.setResult(GraphEventResult.FAILURE);
235             event.setErrorMessage(ex.getMessage());
236             event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
237           }
238         }
239
240         if (event.getResult().equals(GraphEventResult.SUCCESS)) {
241           logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
242               "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
243                   + " , transaction-id: " + event.getTransactionId() + " , operation: "
244                   + event.getOperation().toString() + " , result: " + event.getResult());
245         } else {
246           logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
247               "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
248                   + " , transaction-id: " + event.getTransactionId() + " , operation: "
249                   + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
250                   + event.getErrorMessage());
251         }
252
253         champAsyncResponsePublisher.publishResponseEvent(event);
254
255       }
256     }
257   }
258
259   @Override
260   public void run() {
261
262     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
263
264     if (asyncRequestConsumer == null) {
265       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
266     }
267
268     Iterable<String> events = null;
269     try {
270       events = asyncRequestConsumer.consume();
271     } catch (Exception e) {
272       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
273       return;
274     }
275
276     if (events == null || !events.iterator().hasNext()) {
277       logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
278
279     }
280
281     for (String event : events) {
282       try {
283         GraphEvent requestEvent = GraphEvent.fromJson(event);
284         auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
285             "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
286                 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
287                 + requestEvent.getOperation().toString());
288         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
289             "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
290                 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
291                 + requestEvent.getOperation().toString());
292         logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
293
294         // Try to submit the event to be published to the event bus.
295         if (!requestProcesserEventQueue.offer(requestEvent)) {
296           logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
297               "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
298         }
299
300       } catch (Exception e) {
301         logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
302       }
303     }
304
305     try {
306       asyncRequestConsumer.commitOffsets();
307     } catch(OperationNotSupportedException e) {
308         //Dmaap doesnt support commit with offset       
309         logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
310     } 
311     catch (Exception e) {
312       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
313     }
314
315   }
316
317
318
319   public Integer getRequestPollingTimeSeconds() {
320     return requestPollingTimeSeconds;
321   }
322
323   
324 }