e091ee9056a94875632bb6e717ad60a89008580e
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncRequestProcessor.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.champ.async;
22
23 import java.util.Optional;
24 import java.util.TimerTask;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import javax.naming.OperationNotSupportedException;
30 import javax.ws.rs.core.Response.Status;
31 import org.onap.aai.champcore.ChampTransaction;
32 import org.onap.aai.cl.api.Logger;
33 import org.onap.aai.cl.eelf.LoggerFactory;
34 import org.onap.aai.event.api.EventConsumer;
35 import org.onap.champ.ChampRESTAPI;
36 import org.onap.champ.event.GraphEvent;
37 import org.onap.champ.event.GraphEvent.GraphEventResult;
38 import org.onap.champ.event.GraphEventEdge;
39 import org.onap.champ.event.GraphEventVertex;
40 import org.onap.champ.event.envelope.GraphEventEnvelope;
41 import org.onap.champ.event.envelope.GraphEventHeader;
42 import org.onap.champ.exception.ChampServiceException;
43 import org.onap.champ.service.ChampDataService;
44 import org.onap.champ.service.ChampThreadFactory;
45 import org.onap.champ.service.logging.ChampMsgs;
46
47 /**
48  * This Class polls the Graph events from request topic perform the necessary CRUD operation by calling champDAO and
49  * queues up the response to be consumed by response handler.
50  */
51 public class ChampAsyncRequestProcessor extends TimerTask {
52
53     private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
54
55     private ChampDataService champDataService;
56
57     /**
58      * Number of events that can be queued up.
59      */
60     private Integer requestProcesserQueueSize;
61
62     /**
63      * Number of event publisher worker threads.
64      */
65     private Integer requestProcesserPoolSize;
66
67     /**
68      * Number of event publisher worker threads.
69      */
70     private Integer requestPollingTimeSeconds;
71
72     /**
73      * Internal queue where outgoing events will be buffered until they can be serviced by.
74      **/
75     private BlockingQueue<GraphEventEnvelope> requestProcesserEventQueue;
76
77     /**
78      * Pool of worker threads that do the work of publishing the events to the event bus.
79      */
80     private ThreadPoolExecutor requestProcesserPool;
81
82     private ChampAsyncResponsePublisher champAsyncResponsePublisher;
83
84     private EventConsumer asyncRequestConsumer;
85
86     private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
87
88     private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
89     private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
90     private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
91     Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
92
93     public ChampAsyncRequestProcessor(ChampDataService champDataService,
94             ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
95
96         this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
97
98         this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
99
100         this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
101         requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
102         requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
103                 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
104
105         for (int i = 0; i < requestProcesserPoolSize; i++) {
106             requestProcesserPool.submit(new ChampProcessorWorker());
107         }
108
109         this.champDataService = champDataService;
110         this.champAsyncResponsePublisher = champAsyncResponsePublisher;
111         this.asyncRequestConsumer = asyncRequestConsumer;
112         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
113                 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
114                         + asyncRequestConsumer.getClass().getName());
115     }
116
117     public ChampAsyncRequestProcessor(ChampDataService champDataService,
118             ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
119             Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
120
121         this.requestProcesserQueueSize = requestProcesserQueueSize;
122
123         this.requestProcesserPoolSize = requestProcesserPoolSize;
124
125         this.requestPollingTimeSeconds = requestPollingTimeSeconds;
126
127         requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
128         requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
129                 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
130
131         for (int i = 0; i < requestProcesserPoolSize; i++) {
132             requestProcesserPool.submit(new ChampProcessorWorker());
133         }
134
135         this.champDataService = champDataService;
136         this.champAsyncResponsePublisher = champAsyncResponsePublisher;
137         this.asyncRequestConsumer = asyncRequestConsumer;
138         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
139                 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
140                         + asyncRequestConsumer.getClass().getName());
141     }
142
143     private class ChampProcessorWorker implements Runnable {
144
145         @Override
146         public void run() {
147
148             while (true) {
149
150                 GraphEventEnvelope eventEnvelope = null;
151                 GraphEvent event = null;
152                 try {
153                     // Get the next event to be published from the queue.
154                     eventEnvelope = requestProcesserEventQueue.take();
155                     event = eventEnvelope.getBody();
156
157                     // Apply Champ Event header
158                     eventEnvelope.setHeader(new GraphEventHeader.Builder().requestId(event.getTransactionId()).build());
159
160                     // Parse the event and call champ Dao to process , Create the
161                     // response event and put it on response queue
162                     event.setResult(GraphEventResult.SUCCESS);
163
164                     // Check if this request is part of an ongoing DB transaction
165                     ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
166                     if ((event.getDbTransactionId() != null) && (transaction == null)) {
167                         event.setResult(GraphEventResult.FAILURE);
168                         event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
169                         event.setHttpErrorStatus(Status.BAD_REQUEST);
170                     }
171
172                     if (event.getResult() != GraphEventResult.FAILURE) {
173                         try {
174                             if (event.getVertex() != null) {
175
176                                 switch (event.getOperation()) {
177                                     case CREATE:
178                                         event.setVertex(GraphEventVertex.fromChampObject(
179                                             champDataService.storeObject(event.getVertex().toChampObject(),
180                                                 Optional.ofNullable(transaction)),
181                                             event.getVertex().getModelVersion()));
182                                         break;
183
184                                     case UPDATE:
185                                         event.setVertex(GraphEventVertex.fromChampObject(
186                                             champDataService.replaceObject(
187                                                 event.getVertex().toChampObject(event.getVertex().toJson()),
188                                                 event.getVertex().getId(), Optional.ofNullable(transaction)),
189                                             event.getVertex().getModelVersion()));
190                                         break;
191                                     case DELETE:
192                                         champDataService.deleteObject(event.getVertex().getId(),
193                                             Optional.ofNullable(transaction));
194                                         break;
195                                     default:
196                                         // log error
197                                 }
198                             } else if (event.getEdge() != null) {
199                                 switch (event.getOperation()) {
200                                     case CREATE:
201                                         event.setEdge(GraphEventEdge.fromChampRelationship(
202                                             champDataService.storeRelationship(event.getEdge().toChampRelationship(),
203                                                 Optional.ofNullable(transaction)),
204                                             event.getEdge().getModelVersion()));
205                                         break;
206
207                                     case UPDATE:
208                                         event.setEdge(GraphEventEdge.fromChampRelationship(
209                                             champDataService.updateRelationship(event.getEdge().toChampRelationship(),
210                                                 event.getEdge().getId(), Optional.ofNullable(transaction)),
211                                             event.getEdge().getModelVersion()));
212
213                                         break;
214                                     case DELETE:
215                                         champDataService.deleteRelationship(event.getEdge().getId(),
216                                             Optional.ofNullable(transaction));
217                                         break;
218                                     default:
219                                         logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
220                                             "Invalid operation for event transactionId: " + event.getTransactionId());
221                                 }
222
223                             } else {
224                                 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
225                                     "Invalid payload for event transactionId: " + event.getTransactionId());
226                             }
227                         } catch (ChampServiceException champException) {
228                             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
229                             event.setResult(GraphEventResult.FAILURE);
230                             event.setErrorMessage(champException.getMessage());
231                             event.setHttpErrorStatus(champException.getHttpStatus());
232
233                         } catch (Exception ex) {
234                             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
235                             event.setResult(GraphEventResult.FAILURE);
236                             event.setErrorMessage(ex.getMessage());
237                             event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
238                         }
239                     }
240
241                     if (event.getResult().equals(GraphEventResult.SUCCESS)) {
242                         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
243                             "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
244                                 + " , transaction-id: " + event.getTransactionId() + " , operation: "
245                                 + event.getOperation().toString() + " , result: " + event.getResult());
246                     } else {
247                         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
248                             "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
249                                 + " , transaction-id: " + event.getTransactionId() + " , operation: "
250                                 + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
251                                 + event.getErrorMessage());
252                     }
253
254                     champAsyncResponsePublisher.publishResponseEvent(eventEnvelope);
255                 } catch (InterruptedException e) {
256                     // Restore the interrupted status.
257                     Thread.currentThread().interrupt();
258                 }
259
260             }
261         }
262     }
263
264     @Override
265     public void run() {
266
267         logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
268
269         if (asyncRequestConsumer == null) {
270             logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
271                     "Unable to initialize ChampAsyncRequestProcessor");
272         } else {
273             try {
274                 Iterable<String> events = asyncRequestConsumer.consume();
275
276                 if (events == null || !events.iterator().hasNext()) {
277                     logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
278                 } else {
279                     processEvents(events);
280                 }
281
282                 asyncRequestConsumer.commitOffsets();
283             } catch (OperationNotSupportedException e) {
284                 // Dmaap doesnt support commit with offset
285                 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
286             } catch (Exception e) {
287                 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
288             }
289         }
290
291     }
292
293     private void processEvents(Iterable<String> events) {
294         for (String event : events) {
295             try {
296                 GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
297                 GraphEvent requestEvent = requestEnvelope.getBody();
298                 auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
299                         "Event received of type: " + requestEvent.getObjectType() + " with key: "
300                                 + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
301                                 + " , operation: " + requestEvent.getOperation().toString());
302                 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
303                         "Event received of type: " + requestEvent.getObjectType() + " with key: "
304                                 + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
305                                 + " , operation: " + requestEvent.getOperation().toString());
306                 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
307
308                 // Try to submit the event to be published to the event bus.
309                 if (!requestProcesserEventQueue.offer(requestEnvelope)) {
310                     logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
311                             "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
312                 }
313
314             } catch (Exception e) {
315                 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
316             }
317         }
318     }
319
320     public Integer getRequestPollingTimeSeconds() {
321         return requestPollingTimeSeconds;
322     }
323
324 }