removed code smells
[appc.git] / appc-client / client-lib / src / main / java / org / onap / appc / client / impl / core / CoreManager.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Modifications Copyright (C) 2019 IBM
10  * =============================================================================
11  * Licensed under the Apache License, Version 2.0 (the "License");
12  * you may not use this file except in compliance with the License.
13  * You may obtain a copy of the License at
14  * 
15  *      http://www.apache.org/licenses/LICENSE-2.0
16  * 
17  * Unless required by applicable law or agreed to in writing, software
18  * distributed under the License is distributed on an "AS IS" BASIS,
19  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20  * See the License for the specific language governing permissions and
21  * limitations under the License.
22  * 
23  * ============LICENSE_END=========================================================
24  */
25
26 package org.onap.appc.client.impl.core;
27
28 import org.onap.appc.client.impl.protocol.*;
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
31
32 import java.util.Properties;
33 import java.util.concurrent.atomic.AtomicBoolean;
34
35 /**
36  * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
37  */
38 class CoreManager{
39
40     private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
41     private final ProtocolFactory protocolFactory;
42     protected  AsyncProtocol protocol;
43     private final RetrieveMessageCallback protocolCallback = null;
44     private final CoreRegistry registry;
45     private final ITimerService timerService;
46     private final TaskQueueManager queueManager;
47     private String DEFAULT_TIMEOUT = "300000";
48     private static final String RESPONSE_TIMEOUT = "client.response.timeout";
49     private static final String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
50     private boolean isForceShutdown = false;
51     private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
52     private long shutdownTimeout;
53
54     CoreManager(Properties prop) throws CoreException {
55         protocolFactory = ProtocolFactory.getInstance();
56         try {
57             initProtocol(prop);
58         }catch (ProtocolException e){
59             throw new CoreException(e);
60         }
61         registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
62         String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
63         long responseTimeout = Long.parseLong(timeoutProp);
64         String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
65         shutdownTimeout = Long.parseLong(gracefulTimeout);
66         timerService = new TimerServiceImpl(responseTimeout);
67         queueManager = new TaskQueueManager(prop);
68         listenShutdown();
69     }
70
71     /**
72      * initiates protocol layer services.
73      * @param prop - Properties
74      */
75     private void initProtocol(Properties prop) throws ProtocolException {
76         protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
77         protocol.init(prop, getProtocolCallback());
78     }
79
80     /**
81      * Creates protocol response callback
82      * @return - @{@link ProtocolResponseCallbackImpl}
83      */
84     RetrieveMessageCallback getProtocolCallback(){
85         return new ProtocolResponseCallbackImpl();
86     }
87
88     /**
89      * Registers a new handler in registry
90      * @param corrID - Correlation ID
91      * @param requestResponseHandler handler to be called when response arrives
92      */
93     void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
94         registry.register(corrID, requestResponseHandler);
95     }
96
97     /**
98      * Remove a handler from registry service by correlation ID.
99      * @param corrID - Correlation ID
100      * @return - @{@link RequestResponseHandler}
101      */
102     RequestResponseHandler unregisterHandler(String corrID){
103         return (RequestResponseHandler) registry.unregister(corrID);
104     }
105
106     /**
107      * Checks in registry service if a handler is existing.
108      * @param corrID - Correlation ID
109      * @return - boolean
110      */
111     boolean isExistHandler(String corrID) {
112         return registry.isExist(corrID);
113     }
114
115     /**
116      * Starts timer for timeout event when a request was send successfully.
117      * @param corrID - Correlation ID
118      */
119     void startTimer(String corrID){
120         timerService.add(corrID, new TimeoutHandlerImpl(corrID));
121     }
122
123     /**
124      * Cancels timer for fimeout event, in case when complete response was received
125      * @param corrID
126      */
127     void cancelTimer(String corrID){
128         timerService.cancel(corrID);
129     }
130
131     /**
132      * Submits a new task to Queue manager. it is using for both response and timeout tasks
133      * @param corrID - Correlation ID
134      * @param task - @{@link Runnable} task.
135      * @throws InterruptedException
136      */
137     void submitTask(String corrID, Runnable task) throws InterruptedException {
138         queueManager.submit(corrID, task);
139     }
140
141     /**
142      * Sends request to protocol.
143      * @param request - Request
144      * @param corrId - Correlation ID
145      * @param rpcName - RPC name
146      * @throws CoreException - @{@link CoreException}
147      */
148     void sendRequest(String request, String corrId, String rpcName) throws CoreException {
149         MessageContext ctx = getMessageContext(corrId, rpcName);
150         try {
151             protocol.sendRequest(request, ctx);
152         } catch (ProtocolException e) {
153             unregisterHandler(corrId);
154             throw new CoreException(e);
155         }
156     }
157
158     /**
159      * Creates @{@link MessageContext}
160      * @param correlationId - Correlation ID
161      * @param rpcName - RPC Name
162      * @return - @{@link MessageContext}
163      */
164     private MessageContext getMessageContext(String correlationId, String rpcName){
165         MessageContext msgCtx = new MessageContext();
166         msgCtx.setCorrelationID(correlationId);
167         msgCtx.setRpc(rpcName);
168         return msgCtx;
169     }
170
171     /**
172      * Implements response callback from protocol and filters responses by correlation ID.
173      * Only registered events(by correlation ID) will be handled.
174      */
175     private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
176         @Override
177         public void onResponse(String response, MessageContext context) {
178             String corrID = context.getCorrelationID();
179             if (corrID != null) {
180                 RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
181                 if (messageHandler != null) {
182                     LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
183                     messageHandler.handleResponse(context, response);
184                 }
185             }
186         }
187     }
188
189
190     /**
191      * listens to @{@link Runtime} shutdown event
192      */
193     private void listenShutdown() {
194         Runtime.getRuntime().addShutdownHook(new Thread(){
195             public void run(){
196                 gracefulShutdown();
197             }
198         });
199     }
200
201     /**
202      * Implements shutdown for client library.
203      * @param isForceShutdown - true force shutdown, false graceful shutdown
204      */
205     void shutdown(boolean isForceShutdown){
206         if(isForceShutdown){
207             forceShutdown();
208         }else{
209             gracefulShutdown();
210         }
211     }
212
213     /**
214      * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
215      * shutdown only when either all request will be handled or graceful shutdown will be time out.
216      */
217     synchronized void gracefulShutdown(){
218         isGracefulShutdown.set(true);
219         if(registry.isEmpty()){
220             forceShutdown();
221         }
222         else{
223             try {
224                 LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
225                 wait(shutdownTimeout);
226                 LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
227                 forceShutdown();
228             } catch (InterruptedException e) {
229                 LOG.error("Interrupted Exception during gracefulShutdown ::", e);
230                 Thread.currentThread().interrupt();
231             }
232
233         }
234     }
235
236     /**
237      * Closes Protocol, stops Queue Manager and shutdowns Time Service.
238      */
239     private void forceShutdown(){
240         isForceShutdown = true;
241         try {
242             LOG.info("Starting shutdown process.");
243             protocol.shutdown();
244             queueManager.stopQueueManager();
245             timerService.shutdown();
246         } catch (InterruptedException e) {
247             LOG.info("Client library shutdown in progress ", e);
248             Thread.currentThread().interrupt();
249         }
250     }
251
252     /**
253      *
254      * @return - true when shutdown is in process
255      */
256     boolean isShutdownInProgress(){
257         return isForceShutdown || isGracefulShutdown.get();
258     }
259
260     /**
261      * Timeout handler implementation.
262      * This handler is responsible to assign a task for handling of timeout events.
263      *
264      */
265     private class TimeoutHandlerImpl implements ITimeoutHandler {
266
267         private final String corrID;
268
269         TimeoutHandlerImpl(String corrID) {
270             this.corrID = corrID;
271         }
272
273         /**
274          * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
275          * this queue is shared between both timeout and handlers which belong to same correlation ID.
276          */
277         @Override
278         public void onTimeout() {
279             try {
280                 submitTask(corrID, new Runnable() {
281                     @Override
282                     public void run() {
283                         RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
284                         if (requestResponseHandler != null) {
285                             requestResponseHandler.onTimeOut();
286                         }
287                     }
288                 });
289             } catch (InterruptedException e) {
290                 LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
291                 Thread.currentThread().interrupt();
292             }
293         }
294     }
295
296
297     /**
298      * Wakes Up graceful shutdown.
299      */
300     class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
301         @Override
302         public synchronized void emptyCallback() {
303             LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
304             if(isGracefulShutdown.get()){
305                 wakeUpShutdown();
306             }
307         }
308     }
309
310     /**
311      * wakes up waiting shutdown.
312      */
313     private synchronized void wakeUpShutdown(){
314         notifyAll();
315     }
316
317 }
318