5bb7f56a3011ac2963da4899f8adf46b652613fb
[appc.git] / appc-client / client-lib / src / main / java / org / onap / appc / client / impl / core / CoreManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.appc.client.impl.core;
25
26 import org.onap.appc.client.impl.protocol.*;
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29
30 import java.util.Properties;
31 import java.util.concurrent.atomic.AtomicBoolean;
32
33 /**
34  * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
35  */
36 class CoreManager{
37
38     private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
39     private final ProtocolFactory protocolFactory;
40     protected  AsyncProtocol protocol;
41     private final RetrieveMessageCallback protocolCallback = null;
42     private final CoreRegistry registry;
43     private final ITimerService timerService;
44     private final TaskQueueManager queueManager;
45     private String DEFAULT_TIMEOUT = "300000";
46     private final static String RESPONSE_TIMEOUT = "client.response.timeout";
47     private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
48     private boolean isForceShutdown = false;
49     private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
50     private long shutdownTimeout;
51
52     CoreManager(Properties prop) throws CoreException {
53         protocolFactory = ProtocolFactory.getInstance();
54         try {
55             initProtocol(prop);
56         }catch (ProtocolException e){
57             throw new CoreException(e);
58         }
59         registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
60         String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
61         long responseTimeout = Long.parseLong(timeoutProp);
62         String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
63         shutdownTimeout = Long.parseLong(gracefulTimeout);
64         timerService = new TimerServiceImpl(responseTimeout);
65         queueManager = new TaskQueueManager(prop);
66         listenShutdown();
67     }
68
69     /**
70      * initiates protocol layer services.
71      * @param prop - Properties
72      */
73     private void initProtocol(Properties prop) throws ProtocolException {
74         protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
75         protocol.init(prop, getProtocolCallback());
76     }
77
78     /**
79      * Creates protocol response callback
80      * @return - @{@link ProtocolResponseCallbackImpl}
81      */
82     RetrieveMessageCallback getProtocolCallback(){
83         return new ProtocolResponseCallbackImpl();
84     }
85
86     /**
87      * Registers a new handler in registry
88      * @param corrID - Correlation ID
89      * @param requestResponseHandler handler to be called when response arrives
90      */
91     void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
92         registry.register(corrID, requestResponseHandler);
93     }
94
95     /**
96      * Remove a handler from registry service by correlation ID.
97      * @param corrID - Correlation ID
98      * @return - @{@link RequestResponseHandler}
99      */
100     RequestResponseHandler unregisterHandler(String corrID){
101         return (RequestResponseHandler) registry.unregister(corrID);
102     }
103
104     /**
105      * Checks in registry service if a handler is existing.
106      * @param corrID - Correlation ID
107      * @return - boolean
108      */
109     boolean isExistHandler(String corrID) {
110         return registry.isExist(corrID);
111     }
112
113     /**
114      * Starts timer for timeout event when a request was send successfully.
115      * @param corrID - Correlation ID
116      */
117     void startTimer(String corrID){
118         timerService.add(corrID, new TimeoutHandlerImpl(corrID));
119     }
120
121     /**
122      * Cancels timer for fimeout event, in case when complete response was received
123      * @param corrID
124      */
125     void cancelTimer(String corrID){
126         timerService.cancel(corrID);
127     }
128
129     /**
130      * Submits a new task to Queue manager. it is using for both response and timeout tasks
131      * @param corrID - Correlation ID
132      * @param task - @{@link Runnable} task.
133      * @throws InterruptedException
134      */
135     void submitTask(String corrID, Runnable task) throws InterruptedException {
136         queueManager.submit(corrID, task);
137     }
138
139     /**
140      * Sends request to protocol.
141      * @param request - Request
142      * @param corrId - Correlation ID
143      * @param rpcName - RPC name
144      * @throws CoreException - @{@link CoreException}
145      */
146     void sendRequest(String request, String corrId, String rpcName) throws CoreException {
147         MessageContext ctx = getMessageContext(corrId, rpcName);
148         try {
149             protocol.sendRequest(request, ctx);
150         } catch (ProtocolException e) {
151             unregisterHandler(corrId);
152             throw new CoreException(e);
153         }
154     }
155
156     /**
157      * Creates @{@link MessageContext}
158      * @param correlationId - Correlation ID
159      * @param rpcName - RPC Name
160      * @return - @{@link MessageContext}
161      */
162     private MessageContext getMessageContext(String correlationId, String rpcName){
163         MessageContext msgCtx = new MessageContext();
164         msgCtx.setCorrelationID(correlationId);
165         msgCtx.setRpc(rpcName);
166         return msgCtx;
167     }
168
169     /**
170      * Implements response callback from protocol and filters responses by correlation ID.
171      * Only registered events(by correlation ID) will be handled.
172      */
173     private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
174         @Override
175         public void onResponse(String response, MessageContext context) {
176             String corrID = context.getCorrelationID();
177             if (corrID != null) {
178                 RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
179                 if (messageHandler != null) {
180                     LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
181                     messageHandler.handleResponse(context, response);
182                 }
183             }
184         }
185     }
186
187
188     /**
189      * listens to @{@link Runtime} shutdown event
190      */
191     private void listenShutdown() {
192         Runtime.getRuntime().addShutdownHook(new Thread(){
193             public void run(){
194                 gracefulShutdown();
195             }
196         });
197     }
198
199     /**
200      * Implements shutdown for client library.
201      * @param isForceShutdown - true force shutdown, false graceful shutdown
202      */
203     void shutdown(boolean isForceShutdown){
204         if(isForceShutdown){
205             forceShutdown();
206         }else{
207             gracefulShutdown();
208         }
209     }
210
211     /**
212      * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
213      * shutdown only when either all request will be handled or graceful shutdown will be time out.
214      */
215     synchronized void gracefulShutdown(){
216         isGracefulShutdown.set(true);
217         if(registry.isEmpty()){
218             forceShutdown();
219         }
220         else{
221             try {
222                 LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
223                 wait(shutdownTimeout);
224                 LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
225                 forceShutdown();
226             } catch (InterruptedException e) {
227                 LOG.error("Interrupted Exception during gracefulShutdown ::", e);
228             }
229
230         }
231     }
232
233     /**
234      * Closes Protocol, stops Queue Manager and shutdowns Time Service.
235      */
236     private void forceShutdown(){
237         isForceShutdown = true;
238         try {
239             LOG.info("Starting shutdown process.");
240             protocol.shutdown();
241             queueManager.stopQueueManager();
242             timerService.shutdown();
243         } catch (InterruptedException e) {
244             LOG.info("Client library shutdown in progress ", e);
245         }
246     }
247
248     /**
249      *
250      * @return - true when shutdown is in process
251      */
252     boolean isShutdownInProgress(){
253         return isForceShutdown || isGracefulShutdown.get();
254     }
255
256     /**
257      * Timeout handler implementation.
258      * This handler is responsible to assign a task for handling of timeout events.
259      *
260      */
261     private class TimeoutHandlerImpl implements ITimeoutHandler {
262
263         private final String corrID;
264
265         TimeoutHandlerImpl(String corrID) {
266             this.corrID = corrID;
267         }
268
269         /**
270          * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
271          * this queue is shared between both timeout and handlers which belong to same correlation ID.
272          */
273         @Override
274         public void onTimeout() {
275             try {
276                 submitTask(corrID, new Runnable() {
277                     @Override
278                     public void run() {
279                         RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
280                         if (requestResponseHandler != null) {
281                             requestResponseHandler.onTimeOut();
282                         }
283                     }
284                 });
285             } catch (InterruptedException e) {
286                 LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
287             }
288         }
289     }
290
291
292     /**
293      * Wakes Up graceful shutdown.
294      */
295     class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
296         @Override
297         public synchronized void emptyCallback() {
298             LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
299             if(isGracefulShutdown.get()){
300                 wakeUpShutdown();
301             }
302         }
303     }
304
305     /**
306      * wakes up waiting shutdown.
307      */
308     private synchronized void wakeUpShutdown(){
309         notifyAll();
310     }
311
312 }
313