Port champ-microservice project restructure
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncRequestProcessor.java
1 /**
2  * ============LICENSE_START=======================================================
3  * Gizmo
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.champ.async;
26
27 import java.util.Optional;
28 import java.util.TimerTask;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ThreadPoolExecutor;
33
34 import javax.naming.OperationNotSupportedException;
35 import javax.ws.rs.core.Response.Status;
36
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.cl.api.Logger;
39 import org.onap.aai.cl.eelf.LoggerFactory;
40 import org.onap.champ.ChampRESTAPI;
41 import org.onap.champ.event.GraphEvent;
42 import org.onap.champ.event.GraphEvent.GraphEventResult;
43 import org.onap.champ.event.GraphEventEdge;
44 import org.onap.champ.event.GraphEventVertex;
45 import org.onap.champ.exception.ChampServiceException;
46 import org.onap.champ.service.ChampDataService;
47 import org.onap.champ.service.ChampThreadFactory;
48 import org.onap.champ.service.logging.ChampMsgs;
49
50 import org.onap.aai.event.api.EventConsumer;
51
52 /**
53  * This Class polls the Graph events from request topic perform the necessary
54  * CRUD operation by calling champDAO and queues up the response to be consumed
55  * by response handler.
56  */
57 public class ChampAsyncRequestProcessor extends TimerTask {
58
59   private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
60
61   private ChampDataService champDataService;
62
63   /**
64    * Number of events that can be queued up.
65    */
66   private Integer requestProcesserQueueSize;
67
68   /**
69    * Number of event publisher worker threads.
70    */
71   private Integer requestProcesserPoolSize;
72   
73   /**
74    * Number of event publisher worker threads.
75    */
76   private Integer requestPollingTimeSeconds;
77
78   /**
79    * Internal queue where outgoing events will be buffered until they can be
80    * serviced by.
81    **/
82   private BlockingQueue<GraphEvent> requestProcesserEventQueue;
83
84   /**
85    * Pool of worker threads that do the work of publishing the events to the
86    * event bus.
87    */
88   private ThreadPoolExecutor requestProcesserPool;
89
90   private ChampAsyncResponsePublisher champAsyncResponsePublisher;
91
92   private EventConsumer asyncRequestConsumer;
93
94   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
95
96   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
97   private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
98   private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
99   Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
100
101   public ChampAsyncRequestProcessor(ChampDataService champDataService,
102       ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
103
104     this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
105
106     this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
107
108     this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
109     requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
110     requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
111         new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
112
113     for (int i = 0; i < requestProcesserPoolSize; i++) {
114       requestProcesserPool.submit(new ChampProcessorWorker());
115     }
116
117     this.champDataService = champDataService;
118     this.champAsyncResponsePublisher = champAsyncResponsePublisher;
119     this.asyncRequestConsumer = asyncRequestConsumer;
120     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
121         "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
122             + asyncRequestConsumer.getClass().getName());
123   }
124   
125   
126
127   public ChampAsyncRequestProcessor(ChampDataService champDataService,
128       ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
129       Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
130
131     this.requestProcesserQueueSize = requestProcesserQueueSize;
132
133     this.requestProcesserPoolSize = requestProcesserPoolSize;
134     
135     this.requestPollingTimeSeconds = requestPollingTimeSeconds;
136
137     requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
138     requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
139         new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
140
141     for (int i = 0; i < requestProcesserPoolSize; i++) {
142       requestProcesserPool.submit(new ChampProcessorWorker());
143     }
144
145     this.champDataService = champDataService;
146     this.champAsyncResponsePublisher = champAsyncResponsePublisher;
147     this.asyncRequestConsumer = asyncRequestConsumer;
148     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
149         "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
150             + asyncRequestConsumer.getClass().getName());
151   }
152
153   private class ChampProcessorWorker implements Runnable {
154
155     @Override
156     public void run() {
157
158       while (true) {
159
160         GraphEvent event = null;
161         try {
162           // Get the next event to be published from the queue.
163           event = requestProcesserEventQueue.take();
164         } catch (InterruptedException e) {
165           // Restore the interrupted status.
166           Thread.currentThread().interrupt();
167         }
168
169         // Parse the event and call champ Dao to process , Create the
170         // response event and put it on response queue
171         event.setResult(GraphEventResult.SUCCESS);
172         
173         // Check if this request is part of an ongoing DB transaction
174         ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
175         if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
176           event.setResult(GraphEventResult.FAILURE);
177           event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
178           event.setHttpErrorStatus(Status.BAD_REQUEST);
179         }
180         
181         if (event.getResult() != GraphEventResult.FAILURE) {
182           try {
183             if (event.getVertex() != null) {
184
185               switch (event.getOperation()) {
186               case CREATE:
187                 event.setVertex(GraphEventVertex.fromChampObject(
188                     champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
189                     event.getVertex().getModelVersion()));
190                 break;
191
192               case UPDATE:
193                 event.setVertex(GraphEventVertex.fromChampObject(
194                     champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
195                     event.getVertex().getModelVersion()));
196                 break;
197               case DELETE:
198                 champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
199                 break;
200               default:
201                 // log error
202               }
203             } else if (event.getEdge() != null) {
204               switch (event.getOperation()) {
205               case CREATE:
206                 event.setEdge(GraphEventEdge.fromChampRelationship(
207                     champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
208                     event.getEdge().getModelVersion()));
209                 break;
210
211               case UPDATE:
212                 event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
213                     .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
214                     event.getEdge().getModelVersion()));
215
216                 break;
217               case DELETE:
218                 champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
219                 break;
220               default:
221                 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
222                     "Invalid operation for event transactionId: " + event.getTransactionId());
223               }
224
225             } else {
226               logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
227                   "Invalid payload for event transactionId: " + event.getTransactionId());
228             }
229           } catch (ChampServiceException champException) {
230             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
231             event.setResult(GraphEventResult.FAILURE);
232             event.setErrorMessage(champException.getMessage());
233             event.setHttpErrorStatus(champException.getHttpStatus());
234
235           } catch (Exception ex) {
236             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
237             event.setResult(GraphEventResult.FAILURE);
238             event.setErrorMessage(ex.getMessage());
239             event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
240           }
241         }
242
243         if (event.getResult().equals(GraphEventResult.SUCCESS)) {
244           logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
245               "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
246                   + " , transaction-id: " + event.getTransactionId() + " , operation: "
247                   + event.getOperation().toString() + " , result: " + event.getResult());
248         } else {
249           logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
250               "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
251                   + " , transaction-id: " + event.getTransactionId() + " , operation: "
252                   + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
253                   + event.getErrorMessage());
254         }
255
256         champAsyncResponsePublisher.publishResponseEvent(event);
257
258       }
259     }
260   }
261
262   @Override
263   public void run() {
264
265     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
266
267     if (asyncRequestConsumer == null) {
268       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
269     }
270
271     Iterable<String> events = null;
272     try {
273       events = asyncRequestConsumer.consume();
274     } catch (Exception e) {
275       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
276       return;
277     }
278
279     if (events == null || !events.iterator().hasNext()) {
280       logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
281
282     }
283
284     for (String event : events) {
285       try {
286         GraphEvent requestEvent = GraphEvent.fromJson(event);
287         auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
288             "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
289                 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
290                 + requestEvent.getOperation().toString());
291         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
292             "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
293                 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
294                 + requestEvent.getOperation().toString());
295         logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
296
297         // Try to submit the event to be published to the event bus.
298         if (!requestProcesserEventQueue.offer(requestEvent)) {
299           logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
300               "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
301         }
302
303       } catch (Exception e) {
304         logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
305       }
306     }
307
308     try {
309       asyncRequestConsumer.commitOffsets();
310     } catch(OperationNotSupportedException e) {
311         //Dmaap doesnt support commit with offset       
312         logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
313     } 
314     catch (Exception e) {
315       logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
316     }
317
318   }
319
320
321
322   public Integer getRequestPollingTimeSeconds() {
323     return requestPollingTimeSeconds;
324   }
325
326   
327 }