2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.champ.async;
23 import java.util.Optional;
24 import java.util.TimerTask;
25 import java.util.concurrent.ArrayBlockingQueue;
26 import java.util.concurrent.BlockingQueue;
27 import java.util.concurrent.Executors;
28 import java.util.concurrent.ThreadPoolExecutor;
29 import javax.naming.OperationNotSupportedException;
30 import javax.ws.rs.core.Response.Status;
31 import org.onap.aai.champcore.ChampTransaction;
32 import org.onap.aai.cl.api.Logger;
33 import org.onap.aai.cl.eelf.LoggerFactory;
34 import org.onap.aai.event.api.EventConsumer;
35 import org.onap.champ.ChampRESTAPI;
36 import org.onap.champ.event.GraphEvent;
37 import org.onap.champ.event.GraphEvent.GraphEventResult;
38 import org.onap.champ.event.GraphEventEdge;
39 import org.onap.champ.event.GraphEventVertex;
40 import org.onap.champ.event.envelope.GraphEventEnvelope;
41 import org.onap.champ.event.envelope.GraphEventHeader;
42 import org.onap.champ.exception.ChampServiceException;
43 import org.onap.champ.service.ChampDataService;
44 import org.onap.champ.service.ChampThreadFactory;
45 import org.onap.champ.service.logging.ChampMsgs;
48 * This Class polls the Graph events from request topic perform the necessary CRUD operation by calling champDAO and
49 * queues up the response to be consumed by response handler.
51 public class ChampAsyncRequestProcessor extends TimerTask {
53 private Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class);
55 private ChampDataService champDataService;
58 * Number of events that can be queued up.
60 private Integer requestProcesserQueueSize;
63 * Number of event publisher worker threads.
65 private Integer requestProcesserPoolSize;
68 * Number of event publisher worker threads.
70 private Integer requestPollingTimeSeconds;
73 * Internal queue where outgoing events will be buffered until they can be serviced by.
75 private BlockingQueue<GraphEventEnvelope> requestProcesserEventQueue;
78 * Pool of worker threads that do the work of publishing the events to the event bus.
80 private ThreadPoolExecutor requestProcesserPool;
82 private ChampAsyncResponsePublisher champAsyncResponsePublisher;
84 private EventConsumer asyncRequestConsumer;
86 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY = 10000;
88 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE = 10;
89 private static final Integer DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND = 30000;
90 private static final String CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME = "ChampAsyncGraphRequestEventProcessor";
91 Logger auditLogger = LoggerFactory.getInstance().getAuditLogger(ChampRESTAPI.class.getName());
93 public ChampAsyncRequestProcessor(ChampDataService champDataService,
94 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer) {
96 this.requestProcesserQueueSize = DEFAULT_ASYNC_REQUEST_PROCESS_QUEUE_CAPACITY;
98 this.requestProcesserPoolSize = DEFAULT_ASYNC_REQUEST_PROCESS_THREAD_POOL_SIZE;
100 this.requestPollingTimeSeconds = DEFAULT_ASYNC_REQUEST_PROCESS_POLLING_SECOND;
101 requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
102 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
103 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
105 for (int i = 0; i < requestProcesserPoolSize; i++) {
106 requestProcesserPool.submit(new ChampProcessorWorker());
109 this.champDataService = champDataService;
110 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
111 this.asyncRequestConsumer = asyncRequestConsumer;
112 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
113 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
114 + asyncRequestConsumer.getClass().getName());
117 public ChampAsyncRequestProcessor(ChampDataService champDataService,
118 ChampAsyncResponsePublisher champAsyncResponsePublisher, EventConsumer asyncRequestConsumer,
119 Integer requestProcesserQueueSize, Integer requestProcesserPoolSize, Integer requestPollingTimeSeconds) {
121 this.requestProcesserQueueSize = requestProcesserQueueSize;
123 this.requestProcesserPoolSize = requestProcesserPoolSize;
125 this.requestPollingTimeSeconds = requestPollingTimeSeconds;
127 requestProcesserEventQueue = new ArrayBlockingQueue<>(requestProcesserQueueSize);
128 requestProcesserPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(requestProcesserPoolSize,
129 new ChampThreadFactory(CHAMP_GRAPH_REQUEST_PROCESS_THREAD_NAME));
131 for (int i = 0; i < requestProcesserPoolSize; i++) {
132 requestProcesserPool.submit(new ChampProcessorWorker());
135 this.champDataService = champDataService;
136 this.champAsyncResponsePublisher = champAsyncResponsePublisher;
137 this.asyncRequestConsumer = asyncRequestConsumer;
138 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
139 "ChampAsyncRequestProcessor initialized SUCCESSFULLY! with event consumer "
140 + asyncRequestConsumer.getClass().getName());
143 private class ChampProcessorWorker implements Runnable {
150 GraphEventEnvelope eventEnvelope = null;
151 GraphEvent event = null;
153 // Get the next event to be published from the queue.
154 eventEnvelope = requestProcesserEventQueue.take();
155 event = eventEnvelope.getBody();
157 // Apply Champ Event header
158 eventEnvelope.setHeader(new GraphEventHeader.Builder().requestId(event.getTransactionId()).build());
160 // Parse the event and call champ Dao to process , Create the
161 // response event and put it on response queue
162 event.setResult(GraphEventResult.SUCCESS);
164 // Check if this request is part of an ongoing DB transaction
165 ChampTransaction transaction = champDataService.getTransaction(event.getDbTransactionId());
166 if ((event.getDbTransactionId() != null) && (transaction == null)) {
167 event.setResult(GraphEventResult.FAILURE);
168 event.setErrorMessage("Database transactionId " + event.getDbTransactionId() + " not found");
169 event.setHttpErrorStatus(Status.BAD_REQUEST);
172 if (event.getResult() != GraphEventResult.FAILURE) {
174 if (event.getVertex() != null) {
176 switch (event.getOperation()) {
178 event.setVertex(GraphEventVertex.fromChampObject(
179 champDataService.storeObject(event.getVertex().toChampObject(),
180 Optional.ofNullable(transaction)),
181 event.getVertex().getModelVersion()));
185 event.setVertex(GraphEventVertex.fromChampObject(
186 champDataService.replaceObject(
187 event.getVertex().toChampObject(event.getVertex().toJson()),
188 event.getVertex().getId(), Optional.ofNullable(transaction)),
189 event.getVertex().getModelVersion()));
192 champDataService.deleteObject(event.getVertex().getId(),
193 Optional.ofNullable(transaction));
198 } else if (event.getEdge() != null) {
199 switch (event.getOperation()) {
201 event.setEdge(GraphEventEdge.fromChampRelationship(
202 champDataService.storeRelationship(event.getEdge().toChampRelationship(),
203 Optional.ofNullable(transaction)),
204 event.getEdge().getModelVersion()));
208 event.setEdge(GraphEventEdge.fromChampRelationship(
209 champDataService.updateRelationship(event.getEdge().toChampRelationship(),
210 event.getEdge().getId(), Optional.ofNullable(transaction)),
211 event.getEdge().getModelVersion()));
215 champDataService.deleteRelationship(event.getEdge().getId(),
216 Optional.ofNullable(transaction));
219 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
220 "Invalid operation for event transactionId: " + event.getTransactionId());
224 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
225 "Invalid payload for event transactionId: " + event.getTransactionId());
227 } catch (ChampServiceException champException) {
228 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, champException.getMessage());
229 event.setResult(GraphEventResult.FAILURE);
230 event.setErrorMessage(champException.getMessage());
231 event.setHttpErrorStatus(champException.getHttpStatus());
233 } catch (Exception ex) {
234 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, ex.getMessage());
235 event.setResult(GraphEventResult.FAILURE);
236 event.setErrorMessage(ex.getMessage());
237 event.setHttpErrorStatus(Status.INTERNAL_SERVER_ERROR);
241 if (event.getResult().equals(GraphEventResult.SUCCESS)) {
242 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
243 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
244 + " , transaction-id: " + event.getTransactionId() + " , operation: "
245 + event.getOperation().toString() + " , result: " + event.getResult());
247 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
248 "Event processed of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
249 + " , transaction-id: " + event.getTransactionId() + " , operation: "
250 + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
251 + event.getErrorMessage());
254 champAsyncResponsePublisher.publishResponseEvent(eventEnvelope);
255 } catch (InterruptedException e) {
256 // Restore the interrupted status.
257 Thread.currentThread().interrupt();
267 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Listening for graph events");
269 if (asyncRequestConsumer == null) {
270 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
271 "Unable to initialize ChampAsyncRequestProcessor");
274 Iterable<String> events = null;
276 events = asyncRequestConsumer.consume();
277 } catch (Exception e) {
278 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
282 if (events == null || !events.iterator().hasNext()) {
283 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "No events recieved");
286 for (String event : events) {
288 GraphEventEnvelope requestEnvelope = GraphEventEnvelope.fromJson(event);
289 GraphEvent requestEvent = requestEnvelope.getBody();
290 auditLogger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
291 "Event received of type: " + requestEvent.getObjectType() + " with key: "
292 + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
293 + " , operation: " + requestEvent.getOperation().toString());
294 logger.info(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO,
295 "Event received of type: " + requestEvent.getObjectType() + " with key: "
296 + requestEvent.getObjectKey() + " , transaction-id: " + requestEvent.getTransactionId()
297 + " , operation: " + requestEvent.getOperation().toString());
298 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_INFO, "Event received with payload:" + event);
300 // Try to submit the event to be published to the event bus.
301 if (!requestProcesserEventQueue.offer(requestEnvelope)) {
302 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR,
303 "Event could not be published to the event bus due to: Internal buffer capacity exceeded.");
306 } catch (Exception e) {
307 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_ERROR, e.getMessage());
313 asyncRequestConsumer.commitOffsets();
314 } catch (OperationNotSupportedException e) {
315 // Dmaap doesnt support commit with offset
316 logger.debug(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
317 } catch (Exception e) {
318 logger.error(ChampMsgs.CHAMP_ASYNC_REQUEST_PROCESSOR_WARN, e.getMessage());
323 public Integer getRequestPollingTimeSeconds() {
324 return requestPollingTimeSeconds;