2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Copyright (C) 2017 Amdocs
8 * =============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
21 * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22 * ============LICENSE_END=========================================================
25 package org.onap.appc.client.impl.protocol;
27 import org.onap.appc.client.impl.core.MessageContext;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
31 import java.io.IOException;
32 import java.security.GeneralSecurityException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
40 class AsyncProtocolImpl implements AsyncProtocol {
43 * message bus listener thread handler
45 private Future listenerHandler;
47 * called when messages are fetched - called for a single message
49 private RetrieveMessageCallback callback;
51 * message bus client used to send/fetch
53 private MessagingService messageService;
55 * Message reader used to extract body and context from reponse message
57 private MessageReader messageReader;
59 * Message writer used to construct meesage from body and context
61 private MessageWriter messageWriter;
66 private boolean isShutdown = false;
69 * executor service for listener usage
71 private ExecutorService executorService = Executors.newSingleThreadExecutor();
73 private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
78 messageService = new UEBMessagingService();
79 messageReader = new APPCMessageReaderWriter();
80 messageWriter = (MessageWriter) messageReader;
83 public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
85 if (callback == null) {
86 throw new ProtocolException("Callback param should not be null!");
88 this.callback = callback;
91 messageService.init(props);
92 //get message bus listener thread
93 //start the thread after initializing services
94 listenerHandler = executorService.submit(new Listener());
95 } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
96 throw new ProtocolException(e);
100 public void sendRequest(String payload, MessageContext context) throws ProtocolException {
102 //get message to be sent to appc from payload and context
103 String message = messageWriter.write(payload, context);
105 messageService.send(context.getPartiton(), message);
106 LOG.debug("Successfully send message: " + message);
107 } catch (IOException e) {
108 throw new ProtocolException(e);
113 public void shutdown() {
115 messageService.close();
116 LOG.warn("The protocol layer in shutdown stage.");
117 executorService.shutdownNow();
120 public class Listener implements Runnable {
125 while (!isShutdown) {
126 List<String> messages = new ArrayList<>();
128 messages = messageService.fetch();
129 LOG.debug("Successfully fetched " + messages.size() + " messages");
130 } catch (IOException e) {
131 LOG.error("Fetching " + messages.size() + " messages failed");
133 for (String message : messages) {
135 MessageContext context = new MessageContext();
136 String payload = null;
139 //get payload and context from message to be sent to core layer
140 payload = messageReader.read(message, context);
141 LOG.debug("Got body: " + payload);
142 //call core layer response handler
144 callback.onResponse(payload, context);
146 LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
147 context.getCorrelationID() + "> response ", message);
149 } catch (ProtocolException e) {
150 LOG.error("Failed to read message from UEB. message is: " + message);