2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.appc.client.impl.core;
26 import org.onap.appc.client.impl.protocol.*;
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
30 import java.util.Properties;
31 import java.util.concurrent.atomic.AtomicBoolean;
34 * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
38 private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
39 private final ProtocolFactory protocolFactory;
40 protected AsyncProtocol protocol;
41 private final RetrieveMessageCallback protocolCallback = null;
42 private final CoreRegistry registry;
43 private final ITimerService timerService;
44 private final TaskQueueManager queueManager;
45 private String DEFAULT_TIMEOUT = "300000";
46 private final static String RESPONSE_TIMEOUT = "client.response.timeout";
47 private final static String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
48 private boolean isForceShutdown = false;
49 private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
50 private long shutdownTimeout;
52 CoreManager(Properties prop) throws CoreException {
53 protocolFactory = ProtocolFactory.getInstance();
56 }catch (ProtocolException e){
57 throw new CoreException(e);
59 registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
60 String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
61 long responseTimeout = Long.parseLong(timeoutProp);
62 String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
63 shutdownTimeout = Long.parseLong(gracefulTimeout);
64 timerService = new TimerServiceImpl(responseTimeout);
65 queueManager = new TaskQueueManager(prop);
70 * initiates protocol layer services.
71 * @param prop - Properties
73 private void initProtocol(Properties prop) throws ProtocolException {
74 protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
75 protocol.init(prop, getProtocolCallback());
79 * Creates protocol response callback
80 * @return - @{@link ProtocolResponseCallbackImpl}
82 RetrieveMessageCallback getProtocolCallback(){
83 return new ProtocolResponseCallbackImpl();
87 * Registers a new handler in registry
88 * @param corrID - Correlation ID
89 * @param requestResponseHandler handler to be called when response arrives
91 void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
92 registry.register(corrID, requestResponseHandler);
96 * Remove a handler from registry service by correlation ID.
97 * @param corrID - Correlation ID
98 * @return - @{@link RequestResponseHandler}
100 RequestResponseHandler unregisterHandler(String corrID){
101 return (RequestResponseHandler) registry.unregister(corrID);
105 * Checks in registry service if a handler is existing.
106 * @param corrID - Correlation ID
109 boolean isExistHandler(String corrID) {
110 return registry.isExist(corrID);
114 * Starts timer for timeout event when a request was send successfully.
115 * @param corrID - Correlation ID
117 void startTimer(String corrID){
118 timerService.add(corrID, new TimeoutHandlerImpl(corrID));
122 * Cancels timer for fimeout event, in case when complete response was received
125 void cancelTimer(String corrID){
126 timerService.cancel(corrID);
130 * Submits a new task to Queue manager. it is using for both response and timeout tasks
131 * @param corrID - Correlation ID
132 * @param task - @{@link Runnable} task.
133 * @throws InterruptedException
135 void submitTask(String corrID, Runnable task) throws InterruptedException {
136 queueManager.submit(corrID, task);
140 * Sends request to protocol.
141 * @param request - Request
142 * @param corrId - Correlation ID
143 * @param rpcName - RPC name
144 * @throws CoreException - @{@link CoreException}
146 void sendRequest(String request, String corrId, String rpcName) throws CoreException {
147 MessageContext ctx = getMessageContext(corrId, rpcName);
149 protocol.sendRequest(request, ctx);
150 } catch (ProtocolException e) {
151 unregisterHandler(corrId);
152 throw new CoreException(e);
157 * Creates @{@link MessageContext}
158 * @param correlationId - Correlation ID
159 * @param rpcName - RPC Name
160 * @return - @{@link MessageContext}
162 private MessageContext getMessageContext(String correlationId, String rpcName){
163 MessageContext msgCtx = new MessageContext();
164 msgCtx.setCorrelationID(correlationId);
165 msgCtx.setRpc(rpcName);
170 * Implements response callback from protocol and filters responses by correlation ID.
171 * Only registered events(by correlation ID) will be handled.
173 private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
175 public void onResponse(String response, MessageContext context) {
176 String corrID = context.getCorrelationID();
177 if (corrID != null) {
178 RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
179 if (messageHandler != null) {
180 LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
181 messageHandler.handleResponse(context, response);
189 * listens to @{@link Runtime} shutdown event
191 private void listenShutdown() {
192 Runtime.getRuntime().addShutdownHook(new Thread(){
200 * Implements shutdown for client library.
201 * @param isForceShutdown - true force shutdown, false graceful shutdown
203 void shutdown(boolean isForceShutdown){
212 * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
213 * shutdown only when either all request will be handled or graceful shutdown will be time out.
215 synchronized void gracefulShutdown(){
216 isGracefulShutdown.set(true);
217 if(registry.isEmpty()){
222 LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
223 wait(shutdownTimeout);
224 LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
226 } catch (InterruptedException e) {
227 LOG.error("Interrupted Exception during gracefulShutdown ::", e);
234 * Closes Protocol, stops Queue Manager and shutdowns Time Service.
236 private void forceShutdown(){
237 isForceShutdown = true;
239 LOG.info("Starting shutdown process.");
241 queueManager.stopQueueManager();
242 timerService.shutdown();
243 } catch (InterruptedException e) {
244 LOG.info("Client library shutdown in progress ", e);
250 * @return - true when shutdown is in process
252 boolean isShutdownInProgress(){
253 return isForceShutdown || isGracefulShutdown.get();
257 * Timeout handler implementation.
258 * This handler is responsible to assign a task for handling of timeout events.
261 private class TimeoutHandlerImpl implements ITimeoutHandler {
263 private final String corrID;
265 TimeoutHandlerImpl(String corrID) {
266 this.corrID = corrID;
270 * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
271 * this queue is shared between both timeout and handlers which belong to same correlation ID.
274 public void onTimeout() {
276 submitTask(corrID, new Runnable() {
279 RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
280 if (requestResponseHandler != null) {
281 requestResponseHandler.onTimeOut();
285 } catch (InterruptedException e) {
286 LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
293 * Wakes Up graceful shutdown.
295 class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
297 public synchronized void emptyCallback() {
298 LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
299 if(isGracefulShutdown.get()){
306 * wakes up waiting shutdown.
308 private synchronized void wakeUpShutdown(){