515ed9e4ba30c89675b418e5e5dae2e140b57c1c
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncResponsePublisher.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *        http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
21  */
22 package org.onap.champ.async;
23
24 import java.util.concurrent.ArrayBlockingQueue;
25 import java.util.concurrent.BlockingQueue;
26 import java.util.concurrent.Executors;
27 import java.util.concurrent.ThreadPoolExecutor;
28
29 import org.onap.aai.cl.api.Logger;
30 import org.onap.aai.cl.eelf.LoggerFactory;
31 import org.onap.champ.event.GraphEvent;
32 import org.onap.champ.event.GraphEvent.GraphEventResult;
33 import org.onap.champ.service.ChampThreadFactory;
34 import org.onap.champ.service.logging.ChampMsgs;
35
36 import org.onap.aai.event.api.EventPublisher;
37
38 public class ChampAsyncResponsePublisher {
39
40   private EventPublisher asyncResponsePublisher;
41
42   /**
43    * Number of events that can be queued up.
44    */
45   private Integer responsePublisherQueueSize;
46
47   /**
48    * Number of event publisher worker threads.
49    */
50   private Integer responsePublisherPoolSize;
51
52   /**
53    * Internal queue where outgoing events will be buffered.
54    **/
55   private BlockingQueue<GraphEvent> responsePublisherEventQueue;
56
57   /**
58    * Pool of worker threads that do the work of publishing the events to the
59    * event bus.
60    */
61   private ThreadPoolExecutor responsePublisherPool;
62
63   private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY = 10000;
64
65   private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE = 10;
66   private static final String CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME = "ChampAsyncGraphResponseEventPublisher";
67
68   private static Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class.getName());
69
70   public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher, Integer responsePublisherQueueSize,
71       Integer responsePublisherPoolSize) {
72     this.responsePublisherQueueSize = responsePublisherQueueSize;
73
74     this.responsePublisherPoolSize = responsePublisherPoolSize;
75
76     responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
77     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
78         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
79
80     for (int i = 0; i < responsePublisherPoolSize; i++) {
81       responsePublisherPool.submit(new GizmoResponsePublisherWorker());
82     }
83     this.asyncResponsePublisher = asyncResponsePublisher;
84
85     logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
86         "ChampAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
87             + asyncResponsePublisher.getClass().getName());
88   }
89
90   public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher) {
91     responsePublisherQueueSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY;
92
93     responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE;
94
95     responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
96     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
97         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
98
99     for (int i = 0; i < responsePublisherPoolSize; i++) {
100       responsePublisherPool.submit(new GizmoResponsePublisherWorker());
101     }
102     this.asyncResponsePublisher = asyncResponsePublisher;
103
104     logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
105         "CrudAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
106             + asyncResponsePublisher.getClass().getName());
107   }
108
109   public void publishResponseEvent(GraphEvent event) {
110     responsePublisherEventQueue.offer(event);
111
112   }
113
114   private class GizmoResponsePublisherWorker implements Runnable {
115
116     @Override
117     public void run() {
118
119       while (true) {
120
121         GraphEvent event = null;
122         try {
123
124           // Get the next event to be published from the queue.
125           event = responsePublisherEventQueue.take();
126
127         } catch (InterruptedException e) {
128
129           // Restore the interrupted status.
130           Thread.currentThread().interrupt();
131         }
132         // Publish the response
133
134         try {
135           event.setTimestamp(System.currentTimeMillis());
136           asyncResponsePublisher.sendSync(event.toJson());
137           if (event.getResult().equals(GraphEventResult.SUCCESS)) {
138             logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
139                 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
140                     + " , transaction-id: " + event.getTransactionId() + " , operation: "
141                     + event.getOperation().toString() + " , result: " + event.getResult());
142           } else {
143             logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
144                 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
145                     + " , transaction-id: " + event.getTransactionId() + " , operation: "
146                     + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
147                     + event.getErrorMessage());
148           }
149         } catch (Exception ex) {
150           logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());
151         }
152
153       }
154     }
155   }
156
157 }