2 * ============LICENSE_START==========================================
4 * ===================================================================
5 * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * Copyright © 2017-2018 Amdocs
7 * ===================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END============================================
21 package org.onap.champ.async;
23 import java.util.concurrent.ArrayBlockingQueue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadPoolExecutor;
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.aai.event.api.EventPublisher;
31 import org.onap.champ.event.GraphEvent;
32 import org.onap.champ.event.GraphEvent.GraphEventResult;
33 import org.onap.champ.event.envelope.GraphEventEnvelope;
34 import org.onap.champ.service.ChampThreadFactory;
35 import org.onap.champ.service.logging.ChampMsgs;
37 public class ChampAsyncResponsePublisher {
39 private EventPublisher asyncResponsePublisher;
42 * Number of events that can be queued up.
44 private Integer responsePublisherQueueSize;
47 * Number of event publisher worker threads.
49 private Integer responsePublisherPoolSize;
52 * Internal queue where outgoing events will be buffered.
54 private BlockingQueue<GraphEventEnvelope> responsePublisherEventQueue;
57 * Pool of worker threads that do the work of publishing the events to the
60 private ThreadPoolExecutor responsePublisherPool;
62 private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY = 10000;
64 private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE = 10;
65 private static final String CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME = "ChampAsyncGraphResponseEventPublisher";
67 private static Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class.getName());
69 public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher, Integer responsePublisherQueueSize,
70 Integer responsePublisherPoolSize) {
71 this.responsePublisherQueueSize = responsePublisherQueueSize;
73 this.responsePublisherPoolSize = responsePublisherPoolSize;
75 responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
76 responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
77 new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
79 for (int i = 0; i < responsePublisherPoolSize; i++) {
80 responsePublisherPool.submit(new GizmoResponsePublisherWorker());
82 this.asyncResponsePublisher = asyncResponsePublisher;
84 logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
85 "ChampAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
86 + asyncResponsePublisher.getClass().getName());
89 public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher) {
90 responsePublisherQueueSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY;
92 responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE;
94 responsePublisherEventQueue = new ArrayBlockingQueue<GraphEventEnvelope>(responsePublisherQueueSize);
95 responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
96 new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
98 for (int i = 0; i < responsePublisherPoolSize; i++) {
99 responsePublisherPool.submit(new GizmoResponsePublisherWorker());
101 this.asyncResponsePublisher = asyncResponsePublisher;
103 logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
104 "CrudAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
105 + asyncResponsePublisher.getClass().getName());
108 public void publishResponseEvent(GraphEventEnvelope eventEnvelope) {
109 responsePublisherEventQueue.offer(eventEnvelope);
113 private class GizmoResponsePublisherWorker implements Runnable {
119 GraphEventEnvelope eventEnvelope = null;
120 GraphEvent event = null;
122 // Get the next event to be published from the queue.
123 eventEnvelope = responsePublisherEventQueue.take();
124 event = eventEnvelope.getBody();
125 } catch (InterruptedException e) {
126 // Restore the interrupted status.
127 Thread.currentThread().interrupt();
129 // Publish the response
131 event.setTimestamp(System.currentTimeMillis());
132 asyncResponsePublisher.sendSync(eventEnvelope.toJson());
133 if (event.getResult().equals(GraphEventResult.SUCCESS)) {
134 logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
135 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
136 + " , transaction-id: " + event.getTransactionId() + " , operation: "
137 + event.getOperation().toString() + " , result: " + event.getResult());
139 logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
140 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
141 + " , transaction-id: " + event.getTransactionId() + " , operation: "
142 + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
143 + event.getErrorMessage());
145 } catch (Exception ex) {
146 logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());