Client to use controller type for DMaaP partition
[appc.git] / appc-client / client-lib / src / main / java / org / onap / appc / client / impl / protocol / AsyncProtocolImpl.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * ONAP : APPC
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * ================================================================================
7  * Copyright (C) 2017 Amdocs
8  * =============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * 
21  * ============LICENSE_END=========================================================
22  */
23
24 package org.onap.appc.client.impl.protocol;
25
26 import org.onap.appc.client.impl.core.MessageContext;
27 import com.att.eelf.configuration.EELFLogger;
28 import com.att.eelf.configuration.EELFManager;
29
30 import java.io.IOException;
31 import java.security.GeneralSecurityException;
32 import java.util.ArrayList;
33 import java.util.List;
34 import java.util.Properties;
35 import java.util.concurrent.ExecutorService;
36 import java.util.concurrent.Executors;
37 import java.util.concurrent.Future;
38
39 class AsyncProtocolImpl implements AsyncProtocol {
40
41     /**
42      * message bus listener thread handler
43      */
44     private Future listenerHandler;
45     /**
46      * called when messages are fetched - called for a single message
47      */
48     private RetrieveMessageCallback callback;
49     /**
50      * message bus client used to send/fetch
51      */
52     private MessagingService messageService;
53     /**
54      * Message reader used to extract body and context from reponse message
55      */
56     private MessageReader messageReader;
57     /**
58      * Message writer used to construct meesage from body and context
59      */
60     private MessageWriter messageWriter;
61
62     /**
63      * shutdown indicator
64      */
65     private boolean isShutdown = false;
66
67     /**
68      * executor service for listener usage
69      */
70     private ExecutorService executorService = Executors.newSingleThreadExecutor();
71
72     private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
73
74     private String controllerType = null;
75
76     AsyncProtocolImpl() {
77
78         messageService = new UEBMessagingService();
79         messageReader = new APPCMessageReaderWriter();
80         messageWriter = (MessageWriter) messageReader;
81     }
82
83     @Override
84     public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
85
86         if (callback == null) {
87             throw new ProtocolException("Callback param should not be null!");
88         }
89         this.callback = callback;
90         
91         controllerType = props.getProperty(UEBPropertiesKeys.CONTROLLER_TYPE);
92         
93         try {            
94             messageService.init(props);
95             //get message bus listener thread
96             //start the thread after initializing services
97             listenerHandler = executorService.submit(new Listener());
98         } catch (GeneralSecurityException | IllegalAccessException | NoSuchFieldException | IOException e) {
99             throw new ProtocolException(e);
100         }
101     }
102
103     @Override
104     public void sendRequest(String payload, MessageContext context) throws ProtocolException {
105         if (controllerType != null && controllerType.length()!= 0 && (!controllerType.equals("APPC")))
106         {
107             context.setPartiton(controllerType);
108         }
109         
110         //get message to be sent to appc from payload and context
111         String message = messageWriter.write(payload, context);
112         try {
113             messageService.send(context.getPartiton(), message);
114             LOG.debug("Successfully send message: " + message);
115         } catch (IOException e) {
116             throw new ProtocolException(e);
117         }
118     }
119
120     @Override
121     public void shutdown() {
122         isShutdown = true;
123         messageService.close();
124         LOG.warn("The protocol layer in shutdown stage.");
125         executorService.shutdownNow();
126     }
127
128     public class Listener implements Runnable {
129
130         @Override
131         public void run() {
132
133             while (!isShutdown) {
134                 List<String> messages = new ArrayList<>();
135                 try {
136                     messages = messageService.fetch();
137                     LOG.debug("Successfully fetched " + messages.size() + " messages");
138                 } catch (IOException e) {
139                     LOG.error("Fetching " + messages.size() + " messages failed", e);
140                 }
141                 for (String message : messages) {
142                     handleMessage(message);
143                 }
144             }
145         }
146
147         private void handleMessage(String message) {
148             MessageContext context = new MessageContext();
149             String payload;
150
151             try {
152                 //get payload and context from message to be sent to core layer
153                 payload = messageReader.read(message, context);
154                 LOG.debug("Got body: " + payload);
155                 //call core layer response handler
156                 if (!isShutdown) {
157                     callback.onResponse(payload, context);
158                 } else {
159                     LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
160                         context.getCorrelationID() + "> response ", message);
161                 }
162             } catch (ProtocolException e) {
163                 LOG.error("Failed to read message from UEB. message is: " + message, e);
164             }
165         }
166     }
167
168 }