2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
20 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 package org.onap.champ.async;
24 import java.util.Optional;
25 import java.util.TimerTask;
26 import java.util.concurrent.ArrayBlockingQueue;
27 import java.util.concurrent.BlockingQueue;
28 import java.util.concurrent.Executors;
29 import java.util.concurrent.ThreadPoolExecutor;
31 import javax.naming.OperationNotSupportedException;
32 import javax.ws.rs.core.Response.Status;
34 import org.onap.aai.champcore.ChampTransaction;
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.champ.ChampRESTAPI;
38 import org.onap.champ.event.GraphEvent;
39 import org.onap.champ.event.GraphEvent.GraphEventResult;
40 import org.onap.champ.event.GraphEventEdge;
41 import org.onap.champ.event.GraphEventVertex;
42 import org.onap.champ.exception.ChampServiceException;
43 import org.onap.champ.service.ChampDataService;
44 import org.onap.champ.service.ChampThreadFactory;
45 import org.onap.champ.service.logging.ChampMsgs;
47 import org.onap.aai.event.api.EventConsumer;
50 * This Class polls the Graph events from request topic perform the necessary
51 * CRUD operation by calling champDAO and queues up the response to be consumed
52 * by response handler.
54 public class ChampAsyncRequestProcessor extends TimerTask {
56 private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
58 private ChampDataService champDataService;
61 * Number of events that can be queued up.
63 private Integer requestProcesserQueueSize;
66 * Number of event publisher worker threads.
68 private Integer requestProcesserPoolSize;
71 * Number of event publisher worker threads.
73 private Integer requestPollingTimeSeconds;
76 * Internal queue where outgoing events will be buffered until they can be
79 private BlockingQueue<GraphEvent> requestProcesserEventQueue;
82 * Pool of worker threads that do the work of publishing the events to the
85 private ThreadPoolExecutor requestProcesserPool;
87 private ChampAsyncResponsePublisher champAsyncResponsePublisher;
89 private EventConsumer asyncRequestConsumer;
91 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
93 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
94 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
95 private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
96 Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
98 public ChampAsyncRequestProcessor(ChampDataService champDataService,
99 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
101 this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
103 this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
105 this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
106 requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
107 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
108 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
110 for (int i = 0; i < requestProcesserPoolSize; i++) {
111 requestProcesserPool.submit(new ChampProcessorWorker());
114 this.champDataService = champDataService;
115 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
116 this.asyncRequestConsumer = asyncRequestConsumer;
117 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
118 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
119 + asyncRequestConsumer.getClass().getName());
124 public ChampAsyncRequestProcessor(ChampDataService champDataService,
125 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
126 Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
128 this.requestProcesserQueueSize = requestProcesserQueueSize;
130 this.requestProcesserPoolSize = requestProcesserPoolSize;
132 this.requestPollingTimeSeconds = requestPollingTimeSeconds;
134 requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
135 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
136 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
138 for (int i = 0; i < requestProcesserPoolSize; i++) {
139 requestProcesserPool.submit(new ChampProcessorWorker());
142 this.champDataService = champDataService;
143 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
144 this.asyncRequestConsumer = asyncRequestConsumer;
145 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
146 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
147 + asyncRequestConsumer.getClass().getName());
150 private class ChampProcessorWorker implements Runnable {
157 GraphEvent event = null;
159 // Get the next event to be published from the queue.
160 event = requestProcesserEventQueue.take();
161 } catch (InterruptedException e) {
162 // Restore the interrupted status.
163 Thread.currentThread().interrupt();
166 // Parse the event and call champ Dao to process , Create the
167 // response event and put it on response queue
168 event.setResult(GraphEventResult.SUCCESS);
170 // Check if this request is part of an ongoing DB transaction
171 ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
172 if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
173 event.setResult(GraphEventResult.FAILURE);
174 event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
175 event.setHttpErrorStatus(Status.BAD_REQUEST);
178 if (event.getResult() != GraphEventResult.FAILURE) {
180 if (event.getVertex() != null) {
182 switch (event.getOperation()) {
184 event.setVertex(GraphEventVertex.fromChampObject(
185 champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
186 event.getVertex().getModelVersion()));
190 event.setVertex(GraphEventVertex.fromChampObject(
191 champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
192 event.getVertex().getModelVersion()));
195 champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
200 } else if (event.getEdge() != null) {
201 switch (event.getOperation()) {
203 event.setEdge(GraphEventEdge.fromChampRelationship(
204 champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
205 event.getEdge().getModelVersion()));
209 event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
210 .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
211 event.getEdge().getModelVersion()));
215 champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
218 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
219 "Invalid operation for event transactionId: " + event.getTransactionId());
223 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
224 "Invalid payload for event transactionId: " + event.getTransactionId());
226 } catch (ChampServiceException champException) {
227 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
228 event.setResult(GraphEventResult.FAILURE);
229 event.setErrorMessage(champException.getMessage());
230 event.setHttpErrorStatus(champException.getHttpStatus());
232 } catch (Exception ex) {
233 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
234 event.setResult(GraphEventResult.FAILURE);
235 event.setErrorMessage(ex.getMessage());
236 event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
240 if (event.getResult().equals(GraphEventResult.SUCCESS)) {
241 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
242 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
243 + " , transaction-id: " + event.getTransactionId() + " , operation: "
244 + event.getOperation().toString() + " , result: " + event.getResult());
246 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
247 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
248 + " , transaction-id: " + event.getTransactionId() + " , operation: "
249 + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
250 + event.getErrorMessage());
253 champAsyncResponsePublisher.publishResponseEvent(event);
262 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
264 if (asyncRequestConsumer == null) {
265 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
268 Iterable<String> events = null;
270 events = asyncRequestConsumer.consume();
271 } catch (Exception e) {
272 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
276 if (events == null || !events.iterator().hasNext()) {
277 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
281 for (String event : events) {
283 GraphEvent requestEvent = GraphEvent.fromJson(event);
284 auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
285 "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
286 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
287 + requestEvent.getOperation().toString());
288 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
289 "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
290 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
291 + requestEvent.getOperation().toString());
292 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
294 // Try to submit the event to be published to the event bus.
295 if (!requestProcesserEventQueue.offer(requestEvent)) {
296 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
297 "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
300 } catch (Exception e) {
301 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
306 asyncRequestConsumer.commitOffsets();
307 } catch(OperationNotSupportedException e) {
308 //Dmaap doesnt support commit with offset
309 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
311 catch (Exception e) {
312 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
319 public Integer getRequestPollingTimeSeconds() {
320 return requestPollingTimeSeconds;