Update license date and text
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncResponsePublisher.java
1 /**
2  * ============LICENSE_START==========================================
3  * org.onap.aai
4  * ===================================================================
5  * Copyright © 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017-2018 Amdocs
7  * ===================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END============================================
20  */
21 package org.onap.champ.async;
22
23 import java.util.concurrent.ArrayBlockingQueue;
24 import java.util.concurrent.BlockingQueue;
25 import java.util.concurrent.Executors;
26 import java.util.concurrent.ThreadPoolExecutor;
27
28 import org.onap.aai.cl.api.Logger;
29 import org.onap.aai.cl.eelf.LoggerFactory;
30 import org.onap.champ.event.GraphEvent;
31 import org.onap.champ.event.GraphEvent.GraphEventResult;
32 import org.onap.champ.service.ChampThreadFactory;
33 import org.onap.champ.service.logging.ChampMsgs;
34
35 import org.onap.aai.event.api.EventPublisher;
36
37 public class ChampAsyncResponsePublisher {
38
39   private EventPublisher asyncResponsePublisher;
40
41   /**
42    * Number of events that can be queued up.
43    */
44   private Integer responsePublisherQueueSize;
45
46   /**
47    * Number of event publisher worker threads.
48    */
49   private Integer responsePublisherPoolSize;
50
51   /**
52    * Internal queue where outgoing events will be buffered.
53    **/
54   private BlockingQueue<GraphEvent> responsePublisherEventQueue;
55
56   /**
57    * Pool of worker threads that do the work of publishing the events to the
58    * event bus.
59    */
60   private ThreadPoolExecutor responsePublisherPool;
61
62   private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY = 10000;
63
64   private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE = 10;
65   private static final String CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME = "ChampAsyncGraphResponseEventPublisher";
66
67   private static Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class.getName());
68
69   public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher, Integer responsePublisherQueueSize,
70       Integer responsePublisherPoolSize) {
71     this.responsePublisherQueueSize = responsePublisherQueueSize;
72
73     this.responsePublisherPoolSize = responsePublisherPoolSize;
74
75     responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
76     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
77         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
78
79     for (int i = 0; i < responsePublisherPoolSize; i++) {
80       responsePublisherPool.submit(new GizmoResponsePublisherWorker());
81     }
82     this.asyncResponsePublisher = asyncResponsePublisher;
83
84     logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
85         "ChampAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
86             + asyncResponsePublisher.getClass().getName());
87   }
88
89   public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher) {
90     responsePublisherQueueSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY;
91
92     responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE;
93
94     responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
95     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
96         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
97
98     for (int i = 0; i < responsePublisherPoolSize; i++) {
99       responsePublisherPool.submit(new GizmoResponsePublisherWorker());
100     }
101     this.asyncResponsePublisher = asyncResponsePublisher;
102
103     logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
104         "CrudAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
105             + asyncResponsePublisher.getClass().getName());
106   }
107
108   public void publishResponseEvent(GraphEvent event) {
109     responsePublisherEventQueue.offer(event);
110
111   }
112
113   private class GizmoResponsePublisherWorker implements Runnable {
114
115     @Override
116     public void run() {
117
118       while (true) {
119
120         GraphEvent event = null;
121         try {
122
123           // Get the next event to be published from the queue.
124           event = responsePublisherEventQueue.take();
125
126         } catch (InterruptedException e) {
127
128           // Restore the interrupted status.
129           Thread.currentThread().interrupt();
130         }
131         // Publish the response
132
133         try {
134           event.setTimestamp(System.currentTimeMillis());
135           asyncResponsePublisher.sendSync(event.toJson());
136           if (event.getResult().equals(GraphEventResult.SUCCESS)) {
137             logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
138                 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
139                     + " , transaction-id: " + event.getTransactionId() + " , operation: "
140                     + event.getOperation().toString() + " , result: " + event.getResult());
141           } else {
142             logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
143                 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
144                     + " , transaction-id: " + event.getTransactionId() + " , operation: "
145                     + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
146                     + event.getErrorMessage());
147           }
148         } catch (Exception ex) {
149           logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());
150         }
151
152       }
153     }
154   }
155
156 }