X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=appc-dispatcher%2Fappc-request-handler%2Fappc-request-handler-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fappc%2Fmessageadapter%2Fimpl%2FMessageAdapterImpl.java;fp=appc-dispatcher%2Fappc-request-handler%2Fappc-request-handler-core%2Fsrc%2Fmain%2Fjava%2Forg%2Fonap%2Fappc%2Fmessageadapter%2Fimpl%2FMessageAdapterImpl.java;h=63a3c00a46df6a37b1f56098980ca50ca3b7b40e;hb=781b1a6df324419c846c84ea983c18fc8362bfd3;hp=0000000000000000000000000000000000000000;hpb=161df8a94bb3b0c34ed16fd4fdba078bd1eeef9a;p=appc.git diff --git a/appc-dispatcher/appc-request-handler/appc-request-handler-core/src/main/java/org/onap/appc/messageadapter/impl/MessageAdapterImpl.java b/appc-dispatcher/appc-request-handler/appc-request-handler-core/src/main/java/org/onap/appc/messageadapter/impl/MessageAdapterImpl.java new file mode 100644 index 000000000..63a3c00a4 --- /dev/null +++ b/appc-dispatcher/appc-request-handler/appc-request-handler-core/src/main/java/org/onap/appc/messageadapter/impl/MessageAdapterImpl.java @@ -0,0 +1,139 @@ +/*- + * ============LICENSE_START======================================================= + * ONAP : APPC + * ================================================================================ + * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * ================================================================================ + * Copyright (C) 2017 Amdocs + * ============================================================================= + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * ECOMP is a trademark and service mark of AT&T Intellectual Property. + * ============LICENSE_END========================================================= + */ + +package org.onap.appc.messageadapter.impl; + + +import org.onap.appc.adapter.factory.DmaapMessageAdapterFactoryImpl; +import org.onap.appc.adapter.factory.MessageService; +import org.onap.appc.adapter.message.MessageAdapterFactory; +import org.onap.appc.adapter.message.Producer; +import org.onap.appc.configuration.Configuration; +import org.onap.appc.configuration.ConfigurationFactory; +import com.att.eelf.configuration.EELFLogger; +import com.att.eelf.configuration.EELFManager; + +import com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.commons.lang.ObjectUtils; +import org.onap.appc.domainmodel.lcm.ResponseContext; +import org.onap.appc.domainmodel.lcm.VNFOperation; +import org.onap.appc.messageadapter.MessageAdapter; +import org.onap.appc.requesthandler.conv.Converter; +import org.osgi.framework.BundleContext; +import org.osgi.framework.FrameworkUtil; +import org.osgi.framework.ServiceReference; + +import java.util.HashSet; +import java.util.Properties; + +public class MessageAdapterImpl implements MessageAdapter{ + + private MessageService messageService; + private Producer producer; + private String partition ; + private Configuration configuration; + private HashSet pool; + private String writeTopic; + private String apiKey; + private String apiSecret; + + private static final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapterImpl.class); + + /** + * Initialize producer client to post messages using configuration properties + */ + @Override + public void init(){ + this.producer = getProducer(); + } + + private Producer getProducer() { + configuration = ConfigurationFactory.getConfiguration(); + Properties properties=configuration.getProperties(); + updateProperties(properties); + + BundleContext ctx = FrameworkUtil.getBundle(MessageAdapterImpl.class).getBundleContext(); + if (ctx != null) { + ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName()); + if (svcRef != null) { + producer = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopic,apiKey, apiSecret); + } + } + return producer; + } + + + private void updateProperties(Properties props) { + if (logger.isTraceEnabled()) { + logger.trace("Entering to updateProperties with Properties = "+ ObjectUtils.toString(props)); + } + pool = new HashSet<>(); + if (props != null) { + // readTopic = props.getProperty("dmaap.topic.read"); + writeTopic = props.getProperty("appc.LCM.topic.write"); + apiKey = props.getProperty("appc.LCM.client.key"); + apiSecret = props.getProperty("appc.LCM.client.secret"); + messageService = MessageService.parse(props.getProperty("message.service.type")); + // READ_TIMEOUT = Integer.valueOf(props.getProperty("dmaap.topic.read.timeout", String.valueOf(READ_TIMEOUT))); + String hostnames = props.getProperty("appc.LCM.poolMembers"); + if (hostnames != null && !hostnames.isEmpty()) { + for (String name : hostnames.split(",")) { + pool.add(name); + } + } + } + } + + /** + * Posts message to DMaaP. As DMaaP accepts only json messages this method first convert dmaapMessage to json format and post it to DMaaP. + * @param asyncResponse response data that based on it a message will be send to DMaaP (the format of the message that will be sent to DMaaP based on the action and its YANG domainmodel). + * @return True if message is postes successfully else False + */ + @Override + public boolean post(VNFOperation operation, String rpcName, ResponseContext asyncResponse){ + boolean success; + if (logger.isTraceEnabled()) { + logger.trace("Entering to post with AsyncResponse = " + ObjectUtils.toString(asyncResponse)); + } + + String jsonMessage; + try { + jsonMessage = Converter.convAsyncResponseToDmaapOutgoingMessageJsonString(operation, rpcName, asyncResponse); + if (logger.isDebugEnabled()) { + logger.debug("DMaaP Response = " + jsonMessage); + } + success = producer.post(this.partition, jsonMessage); + } catch (JsonProcessingException e1) { + logger.error("Error generating Json from DMaaP message "+ e1.getMessage()); + success= false; + }catch (Exception e){ + logger.error("Error sending message to DMaaP "+e.getMessage()); + success= false; + } + if (logger.isTraceEnabled()) { + logger.trace("Exiting from post with (success = "+ ObjectUtils.toString(success)+")"); + } + return success; + } +}