Support kafka attributes
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / DmaapService.java
index de87b00..5c2c8a3 100644 (file)
@@ -43,6 +43,7 @@ import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.model.ApiError;
 import org.onap.dmaap.dbcapi.model.Dmaap;
 import org.onap.dmaap.dbcapi.model.MR_Client;
+import org.onap.dmaap.dbcapi.model.ReplicationType;
 import org.onap.dmaap.dbcapi.model.Topic;
 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.util.DmaapConfig;
@@ -52,6 +53,7 @@ public class DmaapService  extends BaseLoggingClass  {
 
        
        private Singleton<Dmaap> dmaapholder = DatabaseClass.getDmaap();
+       private static String noEnvironmentPrefix;
        
        
        String topicFactory; // = "org.openecomp.dcae.dmaap.topicFactory";
@@ -66,6 +68,15 @@ public class DmaapService  extends BaseLoggingClass  {
                topicMgrRole = p.getProperty("MR.TopicMgrRole", "MR.TopicMgrRole.not.set" );
                dcaeTopicNs = dmaapholder.get().getTopicNsRoot();
                multiSite = "true".equalsIgnoreCase(p.getProperty("MR.multisite", "true"));
+               noEnvironmentPrefix = p.getProperty( "AAF.NoEnvironmentPrefix", "org.onap");
+               
+               logger.info( "DmaapService settings: " + 
+                               " topicFactory=" + topicFactory +
+                               " topicMgrRole=" + topicMgrRole +
+                               " dcaeTopicNs=" + dcaeTopicNs +
+                               " multisite=" + multiSite +
+                               " noEnvironmentPrefix=" + noEnvironmentPrefix
+                               );
                
        }
        
@@ -126,6 +137,7 @@ public class DmaapService  extends BaseLoggingClass  {
                if ( ! dmaap.isStatusValid()  || ! nd.getDmaapName().equals(dmaap.getDmaapName()) || dmaap.getVersion().equals( "0") ) {
                        nd.setLastMod();
                        dmaapholder.update(nd);  //need to set this so the following perms will pick up any new vals.
+                       dcaeTopicNs = dmaapholder.get().getTopicNsRoot();
                        ApiPolicy apiPolicy = new ApiPolicy();
                        if ( apiPolicy.getUseAuthClass()) {
                                ApiPerms p = new ApiPerms();
@@ -155,7 +167,15 @@ public class DmaapService  extends BaseLoggingClass  {
        }
        public String getTopicPerm( String val ) {
                Dmaap dmaap = dmaapholder.get();
-               return dmaap.getTopicNsRoot() + "." + val + ".mr.topic";
+               String nsRoot = dmaap.getTopicNsRoot();
+               String t;
+               // in ONAP Casablanca, we assume no distinction of environments reflected in topic namespace
+               if ( nsRoot.startsWith(noEnvironmentPrefix) ) {
+                       t = nsRoot +  ".mr.topic";
+               } else {
+                       t = nsRoot + "." + val + ".mr.topic";
+               }
+               return t;
        }
        
        public String getBridgeAdminFqtn(){
@@ -261,18 +281,20 @@ public class DmaapService  extends BaseLoggingClass  {
                clients.add( nClient );
        
                // initialize Topic
-               Topic mmaTopic = new Topic();
+               Topic mmaTopic = new Topic().init();
                mmaTopic.setTopicName(dmaap.getBridgeAdminTopic());
                mmaTopic.setClients(clients);
                mmaTopic.setOwner("BusController");
                mmaTopic.setTopicDescription("topic reserved for MirrorMaker Administration");
                mmaTopic.setTnxEnabled("false");
+               mmaTopic.setPartitionCount("1");  // a single partition should guarantee message order
+               
                
                ApiError err = new ApiError();
                TopicService svc = new TopicService();
                try {
                        @SuppressWarnings("unused")
-                       Topic nTopic = svc.addTopic(mmaTopic, err);
+                       Topic nTopic = svc.addTopic(mmaTopic, err, true);
                        if ( err.is2xx() || err.getCode() == 409 ) {
                                return false;
                        }