2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ================================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=========================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.onap.champ.async;
27 import java.util.Optional;
28 import java.util.TimerTask;
29 import java.util.concurrent.ArrayBlockingQueue;
30 import java.util.concurrent.BlockingQueue;
31 import java.util.concurrent.Executors;
32 import java.util.concurrent.ThreadPoolExecutor;
34 import javax.naming.OperationNotSupportedException;
35 import javax.ws.rs.core.Response.Status;
37 import org.onap.aai.champcore.ChampTransaction;
38 import org.onap.aai.cl.api.Logger;
39 import org.onap.aai.cl.eelf.LoggerFactory;
40 import org.onap.champ.ChampRESTAPI;
41 import org.onap.champ.event.GraphEvent;
42 import org.onap.champ.event.GraphEvent.GraphEventResult;
43 import org.onap.champ.event.GraphEventEdge;
44 import org.onap.champ.event.GraphEventVertex;
45 import org.onap.champ.exception.ChampServiceException;
46 import org.onap.champ.service.ChampDataService;
47 import org.onap.champ.service.ChampThreadFactory;
48 import org.onap.champ.service.logging.ChampMsgs;
50 import org.onap.aai.event.api.EventConsumer;
53 * This Class polls the Graph events from request topic perform the necessary
54 * CRUD operation by calling champDAO and queues up the response to be consumed
55 * by response handler.
57 public class ChampAsyncRequestProcessor extends TimerTask {
59 private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
61 private ChampDataService champDataService;
64 * Number of events that can be queued up.
66 private Integer requestProcesserQueueSize;
69 * Number of event publisher worker threads.
71 private Integer requestProcesserPoolSize;
74 * Number of event publisher worker threads.
76 private Integer requestPollingTimeSeconds;
79 * Internal queue where outgoing events will be buffered until they can be
82 private BlockingQueue<GraphEvent> requestProcesserEventQueue;
85 * Pool of worker threads that do the work of publishing the events to the
88 private ThreadPoolExecutor requestProcesserPool;
90 private ChampAsyncResponsePublisher champAsyncResponsePublisher;
92 private EventConsumer asyncRequestConsumer;
94 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
96 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
97 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
98 private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
99 Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
101 public ChampAsyncRequestProcessor(ChampDataService champDataService,
102 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
104 this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
106 this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
108 this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
109 requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
110 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
111 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
113 for (int i = 0; i < requestProcesserPoolSize; i++) {
114 requestProcesserPool.submit(new ChampProcessorWorker());
117 this.champDataService = champDataService;
118 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
119 this.asyncRequestConsumer = asyncRequestConsumer;
120 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
121 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
122 + asyncRequestConsumer.getClass().getName());
127 public ChampAsyncRequestProcessor(ChampDataService champDataService,
128 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
129 Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
131 this.requestProcesserQueueSize = requestProcesserQueueSize;
133 this.requestProcesserPoolSize = requestProcesserPoolSize;
135 this.requestPollingTimeSeconds = requestPollingTimeSeconds;
137 requestProcesserEventQueue = new ArrayBlockingQueue<GraphEvent>(requestProcesserQueueSize);
138 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
139 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
141 for (int i = 0; i < requestProcesserPoolSize; i++) {
142 requestProcesserPool.submit(new ChampProcessorWorker());
145 this.champDataService = champDataService;
146 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
147 this.asyncRequestConsumer = asyncRequestConsumer;
148 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
149 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
150 + asyncRequestConsumer.getClass().getName());
153 private class ChampProcessorWorker implements Runnable {
160 GraphEvent event = null;
162 // Get the next event to be published from the queue.
163 event = requestProcesserEventQueue.take();
164 } catch (InterruptedException e) {
165 // Restore the interrupted status.
166 Thread.currentThread().interrupt();
169 // Parse the event and call champ Dao to process , Create the
170 // response event and put it on response queue
171 event.setResult(GraphEventResult.SUCCESS);
173 // Check if this request is part of an ongoing DB transaction
174 ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
175 if ( (event.getDbTransactionId() != null) && (transaction == null) ) {
176 event.setResult(GraphEventResult.FAILURE);
177 event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
178 event.setHttpErrorStatus(Status.BAD_REQUEST);
181 if (event.getResult() != GraphEventResult.FAILURE) {
183 if (event.getVertex() != null) {
185 switch (event.getOperation()) {
187 event.setVertex(GraphEventVertex.fromChampObject(
188 champDataService.storeObject(event.getVertex().toChampObject(), Optional.ofNullable(transaction)),
189 event.getVertex().getModelVersion()));
193 event.setVertex(GraphEventVertex.fromChampObject(
194 champDataService.replaceObject(event.getVertex().toChampObject(), event.getVertex().getId(), Optional.ofNullable(transaction)),
195 event.getVertex().getModelVersion()));
198 champDataService.deleteObject(event.getVertex().getId(), Optional.ofNullable(transaction));
203 } else if (event.getEdge() != null) {
204 switch (event.getOperation()) {
206 event.setEdge(GraphEventEdge.fromChampRelationship(
207 champDataService.storeRelationship(event.getEdge().toChampRelationship(), Optional.ofNullable(transaction)),
208 event.getEdge().getModelVersion()));
212 event.setEdge(GraphEventEdge.fromChampRelationship(champDataService
213 .updateRelationship(event.getEdge().toChampRelationship(), event.getEdge().getId(), Optional.ofNullable(transaction)),
214 event.getEdge().getModelVersion()));
218 champDataService.deleteRelationship(event.getEdge().getId(), Optional.ofNullable(transaction));
221 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
222 "Invalid operation for event transactionId: " + event.getTransactionId());
226 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
227 "Invalid payload for event transactionId: " + event.getTransactionId());
229 } catch (ChampServiceException champException) {
230 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
231 event.setResult(GraphEventResult.FAILURE);
232 event.setErrorMessage(champException.getMessage());
233 event.setHttpErrorStatus(champException.getHttpStatus());
235 } catch (Exception ex) {
236 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
237 event.setResult(GraphEventResult.FAILURE);
238 event.setErrorMessage(ex.getMessage());
239 event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
243 if (event.getResult().equals(GraphEventResult.SUCCESS)) {
244 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
245 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
246 + " , transaction-id: " + event.getTransactionId() + " , operation: "
247 + event.getOperation().toString() + " , result: " + event.getResult());
249 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
250 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
251 + " , transaction-id: " + event.getTransactionId() + " , operation: "
252 + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
253 + event.getErrorMessage());
256 champAsyncResponsePublisher.publishResponseEvent(event);
265 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
267 if (asyncRequestConsumer == null) {
268 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, "Unable to initialize ChampAsyncRequestProcessor");
271 Iterable<String> events = null;
273 events = asyncRequestConsumer.consume();
274 } catch (Exception e) {
275 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
279 if (events == null || !events.iterator().hasNext()) {
280 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
284 for (String event : events) {
286 GraphEvent requestEvent = GraphEvent.fromJson(event);
287 auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
288 "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
289 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
290 + requestEvent.getOperation().toString());
291 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
292 "Event received of type: " + requestEvent.getObjectType() + " with key: " + requestEvent.getObjectKey()
293 + " , transaction-id: " + requestEvent.getTransactionId() + " , operation: "
294 + requestEvent.getOperation().toString());
295 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
297 // Try to submit the event to be published to the event bus.
298 if (!requestProcesserEventQueue.offer(requestEvent)) {
299 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
300 "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
303 } catch (Exception e) {
304 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
309 asyncRequestConsumer.commitOffsets();
310 } catch(OperationNotSupportedException e) {
311 //Dmaap doesnt support commit with offset
312 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
314 catch (Exception e) {
315 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
322 public Integer getRequestPollingTimeSeconds() {
323 return requestPollingTimeSeconds;