8567d993a575eb00cadfe56b27bb9bdd617984b7
[appc.git] / appc-client / client-lib / src / main / java / org / onap / appc / client / impl / protocol / AsyncProtocolImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.client.impl.protocol;
26
27 import org.onap.appc.client.impl.core.MessageContext;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30
31 import java.io.IOException;
32 import java.security.GeneralSecurityException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39
40 class AsyncProtocolImpl implements AsyncProtocol {
41
42     /**
43      * message bus listener thread handler
44      */
45     private Future listenerHandler;
46     /**
47      * called when messages are fetched - called for a single message
48      */
49     private RetrieveMessageCallback callback;
50     /**
51      * message bus client used to send/fetch
52      */
53     private MessagingService messageService;
54     /**
55      * Message reader used to extract body and context from reponse message
56      */
57     private MessageReader messageReader;
58     /**
59      * Message writer used to construct meesage from body and context
60      */
61     private MessageWriter messageWriter;
62
63     /**
64      * shutdown indicator
65      */
66     private boolean isShutdown = false;
67
68     /**
69      * executor service for listener usage
70      */
71     private ExecutorService executorService = Executors.newSingleThreadExecutor();
72
73     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
74
75
76     AsyncProtocolImpl() {
77
78         messageService = new UEBMessagingService();
79         messageReader = new APPCMessageReaderWriter();
80         messageWriter = (MessageWriter) messageReader;
81     }
82
83     @Override
84     public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
85
86         if (callback == null) {
87             throw new ProtocolException("Callback param should not be null!");
88         }
89         this.callback = callback;
90
91         try {
92             messageService.init(props);
93             //get message bus listener thread
94             //start the thread after initializing services
95             listenerHandler = executorService.submit(new Listener());
96         } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
97             throw new ProtocolException(e);
98         }
99     }
100
101     @Override
102     public void sendRequest(String payload, MessageContext context) throws ProtocolException {
103
104         //get message to be sent to appc from payload and context
105         String message = messageWriter.write(payload, context);
106         try {
107             messageService.send(context.getPartiton(), message);
108             LOG.debug("Successfully send message: " + message);
109         } catch (IOException e) {
110             throw new ProtocolException(e);
111         }
112     }
113
114     @Override
115     public void shutdown() {
116         isShutdown = true;
117         messageService.close();
118         LOG.warn("The protocol layer in shutdown stage.");
119         executorService.shutdownNow();
120     }
121
122     public class Listener implements Runnable {
123
124         @Override
125         public void run() {
126
127             while (!isShutdown) {
128                 List<String> messages = new ArrayList<>();
129                 try {
130                     messages = messageService.fetch();
131                     LOG.debug("Successfully fetched " + messages.size() + " messages");
132                 } catch (IOException e) {
133                     LOG.error("Fetching " + messages.size() + " messages failed", e);
134                 }
135                 for (String message : messages) {
136                     handleMessage(message);
137                 }
138             }
139         }
140
141         private void handleMessage(String message) {
142             MessageContext context = new MessageContext();
143             String payload;
144
145             try {
146                 //get payload and context from message to be sent to core layer
147                 payload = messageReader.read(message, context);
148                 LOG.debug("Got body: " + payload);
149                 //call core layer response handler
150                 if (!isShutdown) {
151                     callback.onResponse(payload, context);
152                 } else {
153                     LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
154                         context.getCorrelationID() + "> response ", message);
155                 }
156             } catch (ProtocolException e) {
157                 LOG.error("Failed to read message from UEB. message is: " + message, e);
158             }
159         }
160     }
161
162 }