82626d80232f11f1767b6105b4d421919d9b1ed7
[appc.git] / appc-client / client-lib / src / main / java / org / onap / appc / client / impl / protocol / AsyncProtocolImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  * ============LICENSE_END=========================================================
23  */
24
25 package org.onap.appc.client.impl.protocol;
26
27 import org.onap.appc.client.impl.core.MessageContext;
28 import com.att.eelf.configuration.EELFLogger;
29 import com.att.eelf.configuration.EELFManager;
30
31 import java.io.IOException;
32 import java.security.GeneralSecurityException;
33 import java.util.ArrayList;
34 import java.util.List;
35 import java.util.Properties;
36 import java.util.concurrent.ExecutorService;
37 import java.util.concurrent.Executors;
38 import java.util.concurrent.Future;
39
40 class AsyncProtocolImpl implements AsyncProtocol {
41
42     /**
43      * message bus listener thread handler
44      */
45     private Future listenerHandler;
46     /**
47      * called when messages are fetched - called for a single message
48      */
49     private RetrieveMessageCallback callback;
50     /**
51      * message bus client used to send/fetch
52      */
53     private MessagingService messageService;
54     /**
55      * Message reader used to extract body and context from reponse message
56      */
57     private MessageReader messageReader;
58     /**
59      * Message writer used to construct meesage from body and context
60      */
61     private MessageWriter messageWriter;
62
63     /**
64      * shutdown indicator
65      */
66     private boolean isShutdown = false;
67
68     /**
69      * executor service for listener usage
70      */
71     private ExecutorService executorService = Executors.newSingleThreadExecutor();
72
73     private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
74
75
76     AsyncProtocolImpl() {
77
78         messageService = new UEBMessagingService();
79         messageReader = new APPCMessageReaderWriter();
80         messageWriter = (MessageWriter) messageReader;
81     }
82
83     public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
84
85         if (callback == null) {
86             throw new ProtocolException("Callback param should not be null!");
87         }
88         this.callback = callback;
89
90         try {
91             messageService.init(props);
92             //get message bus listener thread
93             //start the thread after initializing services
94             listenerHandler = executorService.submit(new Listener());
95         } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
96             throw new ProtocolException(e);
97         }
98     }
99
100     public void sendRequest(String payload, MessageContext context) throws ProtocolException {
101
102         //get message to be sent to appc from payload and context
103         String message = messageWriter.write(payload, context);
104         try {
105             messageService.send(context.getPartiton(), message);
106             LOG.debug("Successfully send message: " + message);
107         } catch (IOException e) {
108             throw new ProtocolException(e);
109         }
110     }
111
112     @Override
113     public void shutdown() {
114         isShutdown = true;
115         messageService.close();
116         LOG.warn("The protocol layer in shutdown stage.");
117         executorService.shutdownNow();
118     }
119
120     public class Listener implements Runnable {
121
122
123         public void run() {
124
125             while (!isShutdown) {
126                 List<String> messages = new ArrayList<>();
127                 try {
128                     messages = messageService.fetch();
129                     LOG.debug("Successfully fetched " + messages.size() + " messages");
130                 } catch (IOException e) {
131                     LOG.error("Fetching " + messages.size() + " messages failed");
132                 }
133                 for (String message : messages) {
134
135                     MessageContext context = new MessageContext();
136                     String payload = null;
137
138                     try {
139                         //get payload and context from message to be sent to core layer
140                         payload = messageReader.read(message, context);
141                         LOG.debug("Got body: " + payload);
142                         //call core layer response handler
143                         if(!isShutdown) {
144                             callback.onResponse(payload, context);
145                         }else{
146                             LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
147                                     context.getCorrelationID() + "> response ", message);
148                         }
149                     } catch (ProtocolException e) {
150                         LOG.error("Failed to read message from UEB. message is: " + message);
151                     }
152                 }
153             }
154         }
155     }
156
157 }