package org.openecomp.appc.messageadapter.impl;
-import java.util.HashSet;
-import java.util.Properties;
-import org.apache.commons.lang.ObjectUtils;
-import org.openecomp.appc.adapter.dmaap.Producer;
-import org.openecomp.appc.adapter.dmaap.DmaapProducer;
+import org.openecomp.appc.adapter.factory.DmaapMessageAdapterFactoryImpl;
+import org.openecomp.appc.adapter.factory.MessageService;
+import org.openecomp.appc.adapter.message.MessageAdapterFactory;
+import org.openecomp.appc.adapter.message.Producer;
import org.openecomp.appc.configuration.Configuration;
import org.openecomp.appc.configuration.ConfigurationFactory;
+import com.att.eelf.configuration.EELFLogger;
+import com.att.eelf.configuration.EELFManager;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang.ObjectUtils;
import org.openecomp.appc.domainmodel.lcm.ResponseContext;
import org.openecomp.appc.domainmodel.lcm.VNFOperation;
import org.openecomp.appc.messageadapter.MessageAdapter;
import org.openecomp.appc.requesthandler.conv.Converter;
-import com.att.eelf.configuration.EELFLogger;
-import com.att.eelf.configuration.EELFManager;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.FrameworkUtil;
+import org.osgi.framework.ServiceReference;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.HashSet;
+import java.util.Properties;
-public class MessageAdapterDmaapImpl implements MessageAdapter{
+public class MessageAdapterImpl implements MessageAdapter{
- private Producer dmaapProducer;
+ private MessageService messageService;
+ private Producer producer;
private String partition ;
private Configuration configuration;
private HashSet<String> pool;
private String apiKey;
private String apiSecret;
- private static final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapterDmaapImpl.class);
+ private static final EELFLogger logger = EELFManager.getInstance().getLogger(MessageAdapterImpl.class);
/**
- * Initialize dmaapProducer client to post messages using configuration properties
+ * Initialize producer client to post messages using configuration properties
*/
@Override
public void init(){
- this.dmaapProducer = getDmaapProducer();
+ this.producer = getProducer();
}
- private Producer getDmaapProducer() {
+
+ private Producer getProducer() {
configuration = ConfigurationFactory.getConfiguration();
Properties properties=configuration.getProperties();
updateProperties(properties);
- Producer producer=new DmaapProducer(pool,writeTopic);
- producer.updateCredentials(apiKey, apiSecret);
+
+ BundleContext ctx = FrameworkUtil.getBundle(MessageAdapterImpl.class).getBundleContext();
+ if (ctx != null) {
+ ServiceReference svcRef = ctx.getServiceReference(MessageAdapterFactory.class.getName());
+ if (svcRef != null) {
+ producer = ((MessageAdapterFactory) ctx.getService(svcRef)).createProducer(pool, writeTopic,apiKey, apiSecret);
+ }
+ }
return producer;
}
}
pool = new HashSet<>();
if (props != null) {
- writeTopic = props.getProperty("dmaap.topic.write");
- apiKey = props.getProperty("dmaap.client.key");
- apiSecret = props.getProperty("dmaap.client.secret");
- String hostnames = props.getProperty("dmaap.poolMembers");
+ // readTopic = props.getProperty("dmaap.topic.read");
+ writeTopic = props.getProperty("appc.LCM.topic.write");
+ apiKey = props.getProperty("appc.LCM.client.key");
+ apiSecret = props.getProperty("appc.LCM.client.secret");
+ messageService = MessageService.parse(props.getProperty("message.service.type"));
+ // READ_TIMEOUT = Integer.valueOf(props.getProperty("dmaap.topic.read.timeout", String.valueOf(READ_TIMEOUT)));
+ String hostnames = props.getProperty("appc.LCM.poolMembers");
if (hostnames != null && !hostnames.isEmpty()) {
for (String name : hostnames.split(",")) {
pool.add(name);
if (logger.isDebugEnabled()) {
logger.debug("DMaaP Response = " + jsonMessage);
}
- success = dmaapProducer.post(this.partition, jsonMessage);
+ success = producer.post(this.partition, jsonMessage);
} catch (JsonProcessingException e1) {
- logger.error("Error generating Jason from DMaaP message "+ e1.getMessage());
+ logger.error("Error generating Json from DMaaP message "+ e1.getMessage());
success= false;
}catch (Exception e){
logger.error("Error sending message to DMaaP "+e.getMessage());