f299dc74895274b70d18f51ce25939503642350c
[appc.git] / appc-client / client-lib / src / main / java / org / openecomp / appc / client / impl / core / CoreManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.openecomp.appc.client.impl.core;
26
27 import org.openecomp.appc.client.impl.protocol.*;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30
31 import java.util.Properties;
32 import java.util.concurrent.atomic.AtomicBoolean;
33
34 /**
35  * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
36  */
37 class CoreManager{
38
39     private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
40     private final ProtocolFactory protocolFactory;
41     protected  AsyncProtocol protocol;
42     private final RetrieveMessageCallback protocolCallback = null;
43     private final CoreRegistry registry;
44     private final ITimerService timerService;
45     private final TaskQueueManager queueManager;
46     private String DEFAULT_TIMEOUT = "300000";
47     private final static String RESPONSE_TIMEOUT = "client.response.timeout";
48     private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
49     private boolean isForceShutdown = false;
50     private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
51     private long shutdownTimeout;
52
53     CoreManager(Properties prop) throws CoreException {
54         protocolFactory = ProtocolFactory.getInstance();
55         try {
56             initProtocol(prop);
57         }catch (ProtocolException e){
58             throw new CoreException(e);
59         }
60         registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
61         String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
62         long responseTimeout = Long.parseLong(timeoutProp);
63         String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
64         shutdownTimeout = Long.parseLong(gracefulTimeout);
65         timerService = new TimerServiceImpl(responseTimeout);
66         queueManager = new TaskQueueManager(prop);
67         listenShutdown();
68     }
69
70     /**
71      * initiates protocol layer services.
72      * @param prop - Properties
73      */
74     private void initProtocol(Properties prop) throws ProtocolException {
75         protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
76         protocol.init(prop, getProtocolCallback());
77     }
78
79     /**
80      * Creates protocol response callback
81      * @return - @{@link ProtocolResponseCallbackImpl}
82      */
83     RetrieveMessageCallback getProtocolCallback(){
84         return new ProtocolResponseCallbackImpl();
85     }
86
87     /**
88      * Registers a new handler in registry
89      * @param corrID - Correlation ID
90      * @param requestResponseHandler handler to be called when response arrives
91      */
92     void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
93         registry.register(corrID, requestResponseHandler);
94     }
95
96     /**
97      * Remove a handler from registry service by correlation ID.
98      * @param corrID - Correlation ID
99      * @return - @{@link RequestResponseHandler}
100      */
101     RequestResponseHandler unregisterHandler(String corrID){
102         return (RequestResponseHandler) registry.unregister(corrID);
103     }
104
105     /**
106      * Checks in registry service if a handler is existing.
107      * @param corrID - Correlation ID
108      * @return - boolean
109      */
110     boolean isExistHandler(String corrID) {
111         return registry.isExist(corrID);
112     }
113
114     /**
115      * Starts timer for timeout event when a request was send successfully.
116      * @param corrID - Correlation ID
117      */
118     void startTimer(String corrID){
119         timerService.add(corrID, new TimeoutHandlerImpl(corrID));
120     }
121
122     /**
123      * Cancels timer for fimeout event, in case when complete response was received
124      * @param corrID
125      */
126     void cancelTimer(String corrID){
127         timerService.cancel(corrID);
128     }
129
130     /**
131      * Submits a new task to Queue manager. it is using for both response and timeout tasks
132      * @param corrID - Correlation ID
133      * @param task - @{@link Runnable} task.
134      * @throws InterruptedException
135      */
136     void submitTask(String corrID, Runnable task) throws InterruptedException {
137         queueManager.submit(corrID, task);
138     }
139
140     /**
141      * Sends request to protocol.
142      * @param request - Request
143      * @param corrId - Correlation ID
144      * @param rpcName - RPC name
145      * @throws CoreException - @{@link CoreException}
146      */
147     void sendRequest(String request, String corrId, String rpcName) throws CoreException {
148         MessageContext ctx = getMessageContext(corrId, rpcName);
149         try {
150             protocol.sendRequest(request, ctx);
151         } catch (ProtocolException e) {
152             unregisterHandler(corrId);
153             throw new CoreException(e);
154         }
155     }
156
157     /**
158      * Creates @{@link MessageContext}
159      * @param correlationId - Correlation ID
160      * @param rpcName - RPC Name
161      * @return - @{@link MessageContext}
162      */
163     private MessageContext getMessageContext(String correlationId, String rpcName){
164         MessageContext msgCtx = new MessageContext();
165         msgCtx.setCorrelationID(correlationId);
166         msgCtx.setRpc(rpcName);
167         return msgCtx;
168     }
169
170     /**
171      * Implements response callback from protocol and filters responses by correlation ID.
172      * Only registered events(by correlation ID) will be handled.
173      */
174     private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
175         @Override
176         public void onResponse(String response, MessageContext context) {
177             String corrID = context.getCorrelationID();
178             if (corrID != null) {
179                 RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
180                 if (messageHandler != null) {
181                     LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
182                     messageHandler.handleResponse(context, response);
183                 }
184             }
185         }
186     }
187
188
189     /**
190      * listens to @{@link Runtime} shutdown event
191      */
192     private void listenShutdown() {
193         Runtime.getRuntime().addShutdownHook(new Thread(){
194             public void run(){
195                 gracefulShutdown();
196             }
197         });
198     }
199
200     /**
201      * Implements shutdown for client library.
202      * @param isForceShutdown - true force shutdown, false graceful shutdown
203      */
204     void shutdown(boolean isForceShutdown){
205         if(isForceShutdown){
206             forceShutdown();
207         }else{
208             gracefulShutdown();
209         }
210     }
211
212     /**
213      * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
214      * shutdown only when either all request will be handled or graceful shutdown will be time out.
215      */
216     synchronized void gracefulShutdown(){
217         isGracefulShutdown.set(true);
218         if(registry.isEmpty()){
219             forceShutdown();
220         }
221         else{
222             try {
223                 LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
224                 wait(shutdownTimeout);
225                 LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
226                 forceShutdown();
227             } catch (InterruptedException e) {
228                 e.printStackTrace();
229             }
230
231         }
232     }
233
234     /**
235      * Closes Protocol, stops Queue Manager and shutdowns Time Service.
236      */
237     private void forceShutdown(){
238         isForceShutdown = true;
239         try {
240             LOG.info("Starting shutdown process.");
241             protocol.shutdown();
242             queueManager.stopQueueManager();
243             timerService.shutdown();
244         } catch (InterruptedException e) {
245             LOG.info("Client library shutdown in progress ", e);
246         }
247     }
248
249     /**
250      *
251      * @return - true when shutdown is in process
252      */
253     boolean isShutdownInProgress(){
254         return isForceShutdown || isGracefulShutdown.get();
255     }
256
257     /**
258      * Timeout handler implementation.
259      * This handler is responsible to assign a task for handling of timeout events.
260      *
261      */
262     private class TimeoutHandlerImpl implements ITimeoutHandler {
263
264         private final String corrID;
265
266         TimeoutHandlerImpl(String corrID) {
267             this.corrID = corrID;
268         }
269
270         /**
271          * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
272          * this queue is shared between both timeout and handlers which belong to same correlation ID.
273          */
274         @Override
275         public void onTimeout() {
276             try {
277                 submitTask(corrID, new Runnable() {
278                     @Override
279                     public void run() {
280                         RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
281                         if (requestResponseHandler != null) {
282                             requestResponseHandler.onTimeOut();
283                         }
284                     }
285                 });
286             } catch (InterruptedException e) {
287                 LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
288             }
289         }
290     }
291
292
293     /**
294      * Wakes Up graceful shutdown.
295      */
296     class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
297         @Override
298         public synchronized void emptyCallback() {
299             LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
300             if(isGracefulShutdown.get()){
301                 wakeUpShutdown();
302             }
303         }
304     }
305
306     /**
307      * wakes up waiting shutdown.
308      */
309     private synchronized void wakeUpShutdown(){
310         notifyAll();
311     }
312
313 }
314