* ============LICENSE_START=======================================================
* ONAP : APPC
* ================================================================================
- * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
* ================================================================================
* Copyright (C) 2017 Amdocs
* =============================================================================
* See the License for the specific language governing permissions and
* limitations under the License.
*
- * ECOMP is a trademark and service mark of AT&T Intellectual Property.
* ============LICENSE_END=========================================================
*/
*/
private ExecutorService executorService = Executors.newSingleThreadExecutor();
- private final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
+ private static final EELFLogger LOG = EELFManager.getInstance().getLogger(AsyncProtocolImpl.class);
+ private String controllerType = null;
AsyncProtocolImpl() {
messageWriter = (MessageWriter) messageReader;
}
+ @Override
public void init(Properties props, RetrieveMessageCallback callback) throws ProtocolException {
if (callback == null) {
throw new ProtocolException("Callback param should not be null!");
}
this.callback = callback;
-
- try {
+
+ controllerType = props.getProperty(UEBPropertiesKeys.CONTROLLER_TYPE);
+
+ try {
messageService.init(props);
//get message bus listener thread
//start the thread after initializing services
}
}
+ @Override
public void sendRequest(String payload, MessageContext context) throws ProtocolException {
-
+ if (controllerType != null && controllerType.length()!= 0 && (!controllerType.equals("APPC")))
+ {
+ context.setPartiton(controllerType);
+ }
+
//get message to be sent to appc from payload and context
String message = messageWriter.write(payload, context);
try {
public class Listener implements Runnable {
-
+ @Override
public void run() {
while (!isShutdown) {
messages = messageService.fetch();
LOG.debug("Successfully fetched " + messages.size() + " messages");
} catch (IOException e) {
- LOG.error("Fetching " + messages.size() + " messages failed");
+ LOG.error("Fetching " + messages.size() + " messages failed", e);
}
for (String message : messages) {
+ handleMessage(message);
+ }
+ }
+ }
- MessageContext context = new MessageContext();
- String payload = null;
-
- try {
- //get payload and context from message to be sent to core layer
- payload = messageReader.read(message, context);
- LOG.debug("Got body: " + payload);
- //call core layer response handler
- if(!isShutdown) {
- callback.onResponse(payload, context);
- }else{
- LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
- context.getCorrelationID() + "> response ", message);
- }
- } catch (ProtocolException e) {
- LOG.error("Failed to read message from UEB. message is: " + message);
- }
+ private void handleMessage(String message) {
+ MessageContext context = new MessageContext();
+ String payload;
+
+ try {
+ //get payload and context from message to be sent to core layer
+ payload = messageReader.read(message, context);
+ LOG.debug("Got body: " + payload);
+ //call core layer response handler
+ if (!isShutdown) {
+ callback.onResponse(payload, context);
+ } else {
+ LOG.warn("Shutdown in progress, response will not receive. Correlation ID <" +
+ context.getCorrelationID() + "> response ", message);
}
+ } catch (ProtocolException e) {
+ LOG.error("Failed to read message from UEB. message is: " + message, e);
}
}
}