2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.champ.async;
23 import java.util.Optional;
24 import java.util.TimerTask;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ThreadPoolExecutor;
30 import javax.naming.OperationNotSupportedException;
31 import javax.ws.rs.core.Response.Status;
33 import org.onap.aai.champcore.ChampTransaction;
34 import org.onap.aai.cl.api.Logger;
35 import org.onap.aai.cl.eelf.LoggerFactory;
36 import org.onap.champ.ChampRESTAPI;
37 import org.onap.champ.event.GraphEvent;
38 import org.onap.champ.event.GraphEvent.GraphEventResult;
39 import org.onap.champ.event.GraphEventEdge;
40 import org.onap.champ.event.GraphEventVertex;
41 import org.onap.champ.exception.ChampServiceException;
42 import org.onap.champ.service.ChampDataService;
43 import org.onap.champ.service.ChampThreadFactory;
44 import org.onap.champ.service.logging.ChampMsgs;
46 import org.onap.aai.event.api.EventConsumer;
49 * This Class polls the Graph events from request topic perform the necessary
50 * CRUD operation by calling champDAO and queues up the response to be consumed
51 * by response handler.
53 public class ChampAsyncRequestProcessor extends TimerTask {
55 private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
57 private ChampDataService champDataService;
60 * Number of events that can be queued up.
62 private Integer requestProcesserQueueSize;
65 * Number of event publisher worker threads.
67 private Integer requestProcesserPoolSize;
70 * Number of event publisher worker threads.
72 private Integer requestPollingTimeSeconds;
75 * Internal queue where outgoing events will be buffered until they can be
78 private BlockingQueue<GraphEvent> requestProcesserEventQueue;
81 * Pool of worker threads that do the work of publishing the events to the
84 private ThreadPoolExecutor requestProcesserPool;
86 private ChampAsyncResponsePublisher champAsyncResponsePublisher;
88 private EventConsumer asyncRequestConsumer;
90 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
92 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
93 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
94 private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
95 Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
97 public ChampAsyncRequestProcessor(ChampDataService champDataService,
98 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
100 this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
102 this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
104 this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
105 requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
106 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
107 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
109 for (int i = 0; i < requestProcesserPoolSize; i++) {
110 requestProcesserPool.submit(new ChampProcessorWorker());
113 this.champDataService = champDataService;
114 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
115 this.asyncRequestConsumer = asyncRequestConsumer;
116 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
117 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
118 + asyncRequestConsumer.getClass().getName());
123 public ChampAsyncRequestProcessor(ChampDataService champDataService,
124 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
125 Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
127 this.requestProcesserQueueSize = requestProcesserQueueSize;
129 this.requestProcesserPoolSize = requestProcesserPoolSize;
131 this.requestPollingTimeSeconds = requestPollingTimeSeconds;
133 requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
134 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
135 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
137 for (int i = 0; i < requestProcesserPoolSize; i++) {
138 requestProcesserPool.submit(new ChampProcessorWorker());
141 this.champDataService = champDataService;
142 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
143 this.asyncRequestConsumer = asyncRequestConsumer;
144 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
145 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
146 + asyncRequestConsumer.getClass().getName());
149 private class ChampProcessorWorker implements Runnable {
156 GraphEvent event = null;
158 // Get the next event to be published from the queue.
159 event = requestProcesserEventQueue.take();
160 } catch (InterruptedException e) {
161 // Restore the interrupted status.
162 Thread.currentThread().interrupt();
165 // Parse the event and call champ Dao to process , Create the
166 // response event and put it on response queue
167 event.setResult(GraphEventResult.SUCCESS);
169 // Check if this request is part of an ongoing DB transaction
170 ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
171 if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
172 event.setResult(GraphEventResult.FAILURE);
173 event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
174 event.setHttpErrorStatus(Status.BAD_REQUEST);
177 if (event.getResult() != GraphEventResult.FAILURE) {
179 if (event.getVertex() != null) {
181 switch (event.getOperation()) {
183 event.setVertex(GraphEventVertex.fromChampObject(
184 champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
185 event.getVertex().getModelVersion()));
189 event.setVertex(GraphEventVertex.fromChampObject(
190 champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
191 event.getVertex().getModelVersion()));
194 champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
199 } else if (event.getEdge() != null) {
200 switch (event.getOperation()) {
202 event.setEdge(GraphEventEdge.fromChampRelationship(
203 champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
204 event.getEdge().getModelVersion()));
208 event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
209 .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
210 event.getEdge().getModelVersion()));
214 champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
217 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
218 "Invalid operation for event transactionId: " + event.getTransactionId());
222 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
223 "Invalid payload for event transactionId: " + event.getTransactionId());
225 } catch (ChampServiceException champException) {
226 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
227 event.setResult(GraphEventResult.FAILURE);
228 event.setErrorMessage(champException.getMessage());
229 event.setHttpErrorStatus(champException.getHttpStatus());
231 } catch (Exception ex) {
232 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
233 event.setResult(GraphEventResult.FAILURE);
234 event.setErrorMessage(ex.getMessage());
235 event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
239 if (event.getResult().equals(GraphEventResult.SUCCESS)) {
240 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
241 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
242 + " , transaction-id: " + event.getTransactionId() + " , operation: "
243 + event.getOperation().toString() + " , result: " + event.getResult());
245 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
246 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
247 + " , transaction-id: " + event.getTransactionId() + " , operation: "
248 + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
249 + event.getErrorMessage());
252 champAsyncResponsePublisher.publishResponseEvent(event);
261 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
263 if (asyncRequestConsumer == null) {
264 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
267 Iterable<String> events = null;
269 events = asyncRequestConsumer.consume();
270 } catch (Exception e) {
271 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
275 if (events == null || !events.iterator().hasNext()) {
276 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
280 for (String event : events) {
282 GraphEvent requestEvent = GraphEvent.fromJson(event);
283 auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
284 "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
285 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
286 + requestEvent.getOperation().toString());
287 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
288 "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
289 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
290 + requestEvent.getOperation().toString());
291 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
293 // Try to submit the event to be published to the event bus.
294 if (!requestProcesserEventQueue.offer(requestEvent)) {
295 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
296 "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
299 } catch (Exception e) {
300 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
305 asyncRequestConsumer.commitOffsets();
306 } catch(OperationNotSupportedException e) {
307 //Dmaap doesnt support commit with offset
308 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
310 catch (Exception e) {
311 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
318 public Integer getRequestPollingTimeSeconds() {
319 return requestPollingTimeSeconds;