Support kafka attributes
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / DmaapService.java
index 5aae1d4..5c2c8a3 100644 (file)
@@ -43,6 +43,7 @@ import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
 import org.onap.dmaap.dbcapi.model.ApiError;
 import org.onap.dmaap.dbcapi.model.Dmaap;
 import org.onap.dmaap.dbcapi.model.MR_Client;
+import org.onap.dmaap.dbcapi.model.ReplicationType;
 import org.onap.dmaap.dbcapi.model.Topic;
 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
 import org.onap.dmaap.dbcapi.util.DmaapConfig;
@@ -136,6 +137,7 @@ public class DmaapService  extends BaseLoggingClass  {
                if ( ! dmaap.isStatusValid()  || ! nd.getDmaapName().equals(dmaap.getDmaapName()) || dmaap.getVersion().equals( "0") ) {
                        nd.setLastMod();
                        dmaapholder.update(nd);  //need to set this so the following perms will pick up any new vals.
+                       dcaeTopicNs = dmaapholder.get().getTopicNsRoot();
                        ApiPolicy apiPolicy = new ApiPolicy();
                        if ( apiPolicy.getUseAuthClass()) {
                                ApiPerms p = new ApiPerms();
@@ -279,12 +281,14 @@ public class DmaapService  extends BaseLoggingClass  {
                clients.add( nClient );
        
                // initialize Topic
-               Topic mmaTopic = new Topic();
+               Topic mmaTopic = new Topic().init();
                mmaTopic.setTopicName(dmaap.getBridgeAdminTopic());
                mmaTopic.setClients(clients);
                mmaTopic.setOwner("BusController");
                mmaTopic.setTopicDescription("topic reserved for MirrorMaker Administration");
                mmaTopic.setTnxEnabled("false");
+               mmaTopic.setPartitionCount("1");  // a single partition should guarantee message order
+               
                
                ApiError err = new ApiError();
                TopicService svc = new TopicService();