2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ============LICENSE_END=========================================================
24 package org.onap.appc.client.impl.protocol;
26 import org.onap.appc.client.impl.core.MessageContext;
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
30 import java.io.IOException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Properties;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
39 class AsyncProtocolImpl implements AsyncProtocol {
42 * message bus listener thread handler
44 private Future listenerHandler;
46 * called when messages are fetched - called for a single message
48 private RetrieveMessageCallback callback;
50 * message bus client used to send/fetch
52 private MessagingService messageService;
54 * Message reader used to extract body and context from reponse message
56 private MessageReader messageReader;
58 * Message writer used to construct meesage from body and context
60 private MessageWriter messageWriter;
65 private boolean isShutdown = false;
68 * executor service for listener usage
70 private ExecutorService executorService = Executors.newSingleThreadExecutor();
72 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
74 private String controllerType = null;
78 messageService = new UEBMessagingService();
79 messageReader = new APPCMessageReaderWriter();
80 messageWriter = (MessageWriter) messageReader;
84 public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
86 if (callback == null) {
87 throw new ProtocolException("Callback param should not be null!");
89 this.callback = callback;
91 controllerType = props.getProperty(UEBPropertiesKeys.CONTROLLER_TYPE);
94 messageService.init(props);
95 //get message bus listener thread
96 //start the thread after initializing services
97 listenerHandler = executorService.submit(new Listener());
98 } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
99 throw new ProtocolException(e);
104 public void sendRequest(String payload, MessageContext context) throws ProtocolException {
105 if (controllerType != null && controllerType.length()!= 0 && (!controllerType.equals("APPC")))
107 context.setPartiton(controllerType);
110 //get message to be sent to appc from payload and context
111 String message = messageWriter.write(payload, context);
113 messageService.send(context.getPartiton(), message);
114 LOG.debug("Successfully send message: " + message);
115 } catch (IOException e) {
116 throw new ProtocolException(e);
121 public void shutdown() {
123 messageService.close();
124 LOG.warn("The protocol layer in shutdown stage.");
125 executorService.shutdownNow();
128 public class Listener implements Runnable {
133 while (!isShutdown) {
134 List<String> messages = new ArrayList<>();
136 messages = messageService.fetch();
137 LOG.debug("Successfully fetched " + messages.size() + " messages");
138 } catch (IOException e) {
139 LOG.error("Fetching " + messages.size() + " messages failed", e);
141 for (String message : messages) {
142 handleMessage(message);
147 private void handleMessage(String message) {
148 MessageContext context = new MessageContext();
152 //get payload and context from message to be sent to core layer
153 payload = messageReader.read(message, context);
154 LOG.debug("Got body: " + payload);
155 //call core layer response handler
157 callback.onResponse(payload, context);
159 LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
160 context.getCorrelationID() + "> response ", message);
162 } catch (ProtocolException e) {
163 LOG.error("Failed to read message from UEB. message is: " + message, e);