2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 * ============LICENSE_END=========================================================
25 package org.onap.appc.client.impl.protocol;
27 import org.onap.appc.client.impl.core.MessageContext;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
31 import java.io.IOException;
32 import java.security.GeneralSecurityException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
40 class AsyncProtocolImpl implements AsyncProtocol {
43 * message bus listener thread handler
45 private Future listenerHandler;
47 * called when messages are fetched - called for a single message
49 private RetrieveMessageCallback callback;
51 * message bus client used to send/fetch
53 private MessagingService messageService;
55 * Message reader used to extract body and context from reponse message
57 private MessageReader messageReader;
59 * Message writer used to construct meesage from body and context
61 private MessageWriter messageWriter;
66 private boolean isShutdown = false;
69 * executor service for listener usage
71 private ExecutorService executorService = Executors.newSingleThreadExecutor();
73 private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
78 messageService = new UEBMessagingService();
79 messageReader = new APPCMessageReaderWriter();
80 messageWriter = (MessageWriter) messageReader;
84 public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
86 if (callback == null) {
87 throw new ProtocolException("Callback param should not be null!");
89 this.callback = callback;
92 messageService.init(props);
93 //get message bus listener thread
94 //start the thread after initializing services
95 listenerHandler = executorService.submit(new Listener());
96 } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
97 throw new ProtocolException(e);
102 public void sendRequest(String payload, MessageContext context) throws ProtocolException {
104 //get message to be sent to appc from payload and context
105 String message = messageWriter.write(payload, context);
107 messageService.send(context.getPartiton(), message);
108 LOG.debug("Successfully send message: " + message);
109 } catch (IOException e) {
110 throw new ProtocolException(e);
115 public void shutdown() {
117 messageService.close();
118 LOG.warn("The protocol layer in shutdown stage.");
119 executorService.shutdownNow();
122 public class Listener implements Runnable {
127 while (!isShutdown) {
128 List<String> messages = new ArrayList<>();
130 messages = messageService.fetch();
131 LOG.debug("Successfully fetched " + messages.size() + " messages");
132 } catch (IOException e) {
133 LOG.error("Fetching " + messages.size() + " messages failed", e);
135 for (String message : messages) {
136 handleMessage(message);
141 private void handleMessage(String message) {
142 MessageContext context = new MessageContext();
146 //get payload and context from message to be sent to core layer
147 payload = messageReader.read(message, context);
148 LOG.debug("Got body: " + payload);
149 //call core layer response handler
151 callback.onResponse(payload, context);
153 LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
154 context.getCorrelationID() + "> response ", message);
156 } catch (ProtocolException e) {
157 LOG.error("Failed to read message from UEB. message is: " + message, e);