Port champ-microservice project restructure
[aai/champ.git] / champ-service / src / main / java / org / onap / champ / async / ChampAsyncResponsePublisher.java
1 /**
2  * ============LICENSE_START=======================================================
3  * 
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *    http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.champ.async;
26
27 import java.util.concurrent.ArrayBlockingQueue;
28 import java.util.concurrent.BlockingQueue;
29 import java.util.concurrent.Executors;
30 import java.util.concurrent.ThreadPoolExecutor;
31
32 import org.onap.aai.cl.api.Logger;
33 import org.onap.aai.cl.eelf.LoggerFactory;
34 import org.onap.champ.event.GraphEvent;
35 import org.onap.champ.event.GraphEvent.GraphEventResult;
36 import org.onap.champ.service.ChampThreadFactory;
37 import org.onap.champ.service.logging.ChampMsgs;
38
39 import org.onap.aai.event.api.EventPublisher;
40
41 public class ChampAsyncResponsePublisher {
42
43   private EventPublisher asyncResponsePublisher;
44
45   /**
46    * Number of events that can be queued up.
47    */
48   private Integer responsePublisherQueueSize;
49
50   /**
51    * Number of event publisher worker threads.
52    */
53   private Integer responsePublisherPoolSize;
54
55   /**
56    * Internal queue where outgoing events will be buffered.
57    **/
58   private BlockingQueue<GraphEvent> responsePublisherEventQueue;
59
60   /**
61    * Pool of worker threads that do the work of publishing the events to the
62    * event bus.
63    */
64   private ThreadPoolExecutor responsePublisherPool;
65
66   private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY = 10000;
67
68   private static final Integer DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE = 10;
69   private static final String CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME = "ChampAsyncGraphResponseEventPublisher";
70
71   private static Logger logger = LoggerFactory.getInstance().getLogger(ChampAsyncRequestProcessor.class.getName());
72
73   public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher, Integer responsePublisherQueueSize,
74       Integer responsePublisherPoolSize) {
75     this.responsePublisherQueueSize = responsePublisherQueueSize;
76
77     this.responsePublisherPoolSize = responsePublisherPoolSize;
78
79     responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
80     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
81         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
82
83     for (int i = 0; i < responsePublisherPoolSize; i++) {
84       responsePublisherPool.submit(new GizmoResponsePublisherWorker());
85     }
86     this.asyncResponsePublisher = asyncResponsePublisher;
87
88     logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
89         "ChampAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
90             + asyncResponsePublisher.getClass().getName());
91   }
92
93   public ChampAsyncResponsePublisher(EventPublisher asyncResponsePublisher) {
94     responsePublisherQueueSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_QUEUE_CAPACITY;
95
96     responsePublisherPoolSize = DEFAULT_ASYNC_RESPONSE_PUBLISH_THREAD_POOL_SIZE;
97
98     responsePublisherEventQueue = new ArrayBlockingQueue<GraphEvent>(responsePublisherQueueSize);
99     responsePublisherPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(responsePublisherPoolSize,
100         new ChampThreadFactory(CHAMP_GRAPH_RESPONSE_PUBLISH_THREAD_NAME));
101
102     for (int i = 0; i < responsePublisherPoolSize; i++) {
103       responsePublisherPool.submit(new GizmoResponsePublisherWorker());
104     }
105     this.asyncResponsePublisher = asyncResponsePublisher;
106
107     logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
108         "CrudAsyncResponsePublisher initialized SUCCESSFULLY! with event publisher "
109             + asyncResponsePublisher.getClass().getName());
110   }
111
112   public void publishResponseEvent(GraphEvent event) {
113     responsePublisherEventQueue.offer(event);
114
115   }
116
117   private class GizmoResponsePublisherWorker implements Runnable {
118
119     @Override
120     public void run() {
121
122       while (true) {
123
124         GraphEvent event = null;
125         try {
126
127           // Get the next event to be published from the queue.
128           event = responsePublisherEventQueue.take();
129
130         } catch (InterruptedException e) {
131
132           // Restore the interrupted status.
133           Thread.currentThread().interrupt();
134         }
135         // Publish the response
136
137         try {
138           event.setTimestamp(System.currentTimeMillis());
139           asyncResponsePublisher.sendSync(event.toJson());
140           if (event.getResult().equals(GraphEventResult.SUCCESS)) {
141             logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
142                 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
143                     + " , transaction-id: " + event.getTransactionId() + " , operation: "
144                     + event.getOperation().toString() + " , result: " + event.getResult());
145           } else {
146             logger.info(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_INFO,
147                 "Response published for Event of type: " + event.getObjectType() + " with key: " + event.getObjectKey()
148                     + " , transaction-id: " + event.getTransactionId() + " , operation: "
149                     + event.getOperation().toString() + " , result: " + event.getResult() + " , error: "
150                     + event.getErrorMessage());
151           }
152         } catch (Exception ex) {
153           logger.error(ChampMsgs.CHAMP_ASYNC_RESPONSE_PUBLISHER_ERROR, ex.getMessage());
154         }
155
156       }
157     }
158   }
159
160 }