2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Modifications Copyright (C) 2019 IBM
10 * =============================================================================
11 * Licensed under the Apache License, Version 2.0 (the "License");
12 * you may not use this file except in compliance with the License.
13 * You may obtain a copy of the License at
15 * http://www.apache.org/licenses/LICENSE-2.0
17 * Unless required by applicable law or agreed to in writing, software
18 * distributed under the License is distributed on an "AS IS" BASIS,
19 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
20 * See the License for the specific language governing permissions and
21 * limitations under the License.
23 * ============LICENSE_END=========================================================
26 package org.onap.appc.client.impl.core;
28 import org.onap.appc.client.impl.protocol.*;
29 import com.att.eelf.configuration.EELFLogger;
30 import com.att.eelf.configuration.EELFManager;
32 import java.util.Properties;
33 import java.util.concurrent.atomic.AtomicBoolean;
36 * Consolidates all services(Registry, Timeout and Task Queue) for handling of requests/responses events.
40 private final EELFLogger LOG = EELFManager.getInstance().getLogger(CoreManager.class);
41 private final ProtocolFactory protocolFactory;
42 protected AsyncProtocol protocol;
43 private final RetrieveMessageCallback protocolCallback = null;
44 private final CoreRegistry registry;
45 private final ITimerService timerService;
46 private final TaskQueueManager queueManager;
47 private String DEFAULT_TIMEOUT = "300000";
48 private static final String RESPONSE_TIMEOUT = "client.response.timeout";
49 private static final String GRACEFUL_SHUTDOWN_TIMEOUT = "client.graceful.shutdown.timeout";
50 private boolean isForceShutdown = false;
51 private AtomicBoolean isGracefulShutdown = new AtomicBoolean(false);
52 private long shutdownTimeout;
54 CoreManager(Properties prop) throws CoreException {
55 protocolFactory = ProtocolFactory.getInstance();
58 }catch (ProtocolException e){
59 throw new CoreException(e);
61 registry = new CoreRegistry<RequestResponseHandler>(new EmptyRegistryCallbackImpl());
62 String timeoutProp = prop.getProperty(RESPONSE_TIMEOUT, DEFAULT_TIMEOUT);
63 long responseTimeout = Long.parseLong(timeoutProp);
64 String gracefulTimeout = prop.getProperty(GRACEFUL_SHUTDOWN_TIMEOUT, DEFAULT_TIMEOUT);
65 shutdownTimeout = Long.parseLong(gracefulTimeout);
66 timerService = new TimerServiceImpl(responseTimeout);
67 queueManager = new TaskQueueManager(prop);
72 * initiates protocol layer services.
73 * @param prop - Properties
75 private void initProtocol(Properties prop) throws ProtocolException {
76 protocol = (AsyncProtocol) protocolFactory.getProtocolObject(ProtocolType.ASYNC);
77 protocol.init(prop, getProtocolCallback());
81 * Creates protocol response callback
82 * @return - @{@link ProtocolResponseCallbackImpl}
84 RetrieveMessageCallback getProtocolCallback(){
85 return new ProtocolResponseCallbackImpl();
89 * Registers a new handler in registry
90 * @param corrID - Correlation ID
91 * @param requestResponseHandler handler to be called when response arrives
93 void registerHandler(String corrID, RequestResponseHandler requestResponseHandler){
94 registry.register(corrID, requestResponseHandler);
98 * Remove a handler from registry service by correlation ID.
99 * @param corrID - Correlation ID
100 * @return - @{@link RequestResponseHandler}
102 RequestResponseHandler unregisterHandler(String corrID){
103 return (RequestResponseHandler) registry.unregister(corrID);
107 * Checks in registry service if a handler is existing.
108 * @param corrID - Correlation ID
111 boolean isExistHandler(String corrID) {
112 return registry.isExist(corrID);
116 * Starts timer for timeout event when a request was send successfully.
117 * @param corrID - Correlation ID
119 void startTimer(String corrID){
120 timerService.add(corrID, new TimeoutHandlerImpl(corrID));
124 * Cancels timer for fimeout event, in case when complete response was received
127 void cancelTimer(String corrID){
128 timerService.cancel(corrID);
132 * Submits a new task to Queue manager. it is using for both response and timeout tasks
133 * @param corrID - Correlation ID
134 * @param task - @{@link Runnable} task.
135 * @throws InterruptedException
137 void submitTask(String corrID, Runnable task) throws InterruptedException {
138 queueManager.submit(corrID, task);
142 * Sends request to protocol.
143 * @param request - Request
144 * @param corrId - Correlation ID
145 * @param rpcName - RPC name
146 * @throws CoreException - @{@link CoreException}
148 void sendRequest(String request, String corrId, String rpcName) throws CoreException {
149 MessageContext ctx = getMessageContext(corrId, rpcName);
151 protocol.sendRequest(request, ctx);
152 } catch (ProtocolException e) {
153 unregisterHandler(corrId);
154 throw new CoreException(e);
159 * Creates @{@link MessageContext}
160 * @param correlationId - Correlation ID
161 * @param rpcName - RPC Name
162 * @return - @{@link MessageContext}
164 private MessageContext getMessageContext(String correlationId, String rpcName){
165 MessageContext msgCtx = new MessageContext();
166 msgCtx.setCorrelationID(correlationId);
167 msgCtx.setRpc(rpcName);
172 * Implements response callback from protocol and filters responses by correlation ID.
173 * Only registered events(by correlation ID) will be handled.
175 private class ProtocolResponseCallbackImpl implements RetrieveMessageCallback {
177 public void onResponse(String response, MessageContext context) {
178 String corrID = context.getCorrelationID();
179 if (corrID != null) {
180 RequestResponseHandler messageHandler = (RequestResponseHandler) registry.get(corrID);
181 if (messageHandler != null) {
182 LOG.info("On response callback corrID <" + corrID + "> handler " + messageHandler + " response " + response);
183 messageHandler.handleResponse(context, response);
191 * listens to @{@link Runtime} shutdown event
193 private void listenShutdown() {
194 Runtime.getRuntime().addShutdownHook(new Thread(){
202 * Implements shutdown for client library.
203 * @param isForceShutdown - true force shutdown, false graceful shutdown
205 void shutdown(boolean isForceShutdown){
214 * Graceful shutdown. in case of all requests already were handled, calls to force shutdown. another goes to force
215 * shutdown only when either all request will be handled or graceful shutdown will be time out.
217 synchronized void gracefulShutdown(){
218 isGracefulShutdown.set(true);
219 if(registry.isEmpty()){
224 LOG.info("Core manager::graceful shutdown is starting... this <" + this + ">");
225 wait(shutdownTimeout);
226 LOG.info("Core manager::graceful shutdown is continue... this <" + this + ">");
228 } catch (InterruptedException e) {
229 LOG.error("Interrupted Exception during gracefulShutdown ::", e);
230 Thread.currentThread().interrupt();
237 * Closes Protocol, stops Queue Manager and shutdowns Time Service.
239 private void forceShutdown(){
240 isForceShutdown = true;
242 LOG.info("Starting shutdown process.");
244 queueManager.stopQueueManager();
245 timerService.shutdown();
246 } catch (InterruptedException e) {
247 LOG.info("Client library shutdown in progress ", e);
248 Thread.currentThread().interrupt();
254 * @return - true when shutdown is in process
256 boolean isShutdownInProgress(){
257 return isForceShutdown || isGracefulShutdown.get();
261 * Timeout handler implementation.
262 * This handler is responsible to assign a task for handling of timeout events.
265 private class TimeoutHandlerImpl implements ITimeoutHandler {
267 private final String corrID;
269 TimeoutHandlerImpl(String corrID) {
270 this.corrID = corrID;
274 * When a timeout event is occurring, the new Timeout task will be assigned into a queue,
275 * this queue is shared between both timeout and handlers which belong to same correlation ID.
278 public void onTimeout() {
280 submitTask(corrID, new Runnable() {
283 RequestResponseHandler requestResponseHandler = unregisterHandler(corrID);
284 if (requestResponseHandler != null) {
285 requestResponseHandler.onTimeOut();
289 } catch (InterruptedException e) {
290 LOG.warn("could not submit timeout task for correlation ID <" + corrID + "> ", e);
291 Thread.currentThread().interrupt();
298 * Wakes Up graceful shutdown.
300 class EmptyRegistryCallbackImpl implements CoreRegistry.EmptyRegistryCallback {
302 public synchronized void emptyCallback() {
303 LOG.info("Registry is empty, wake up the shutdown!, isGraceful flag <" + isGracefulShutdown + ">");
304 if(isGracefulShutdown.get()){
311 * wakes up waiting shutdown.
313 private synchronized void wakeUpShutdown(){