Patch set 2: changes to MR_Client
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
index 26def91..cfec54e 100644 (file)
@@ -29,7 +29,10 @@ import java.util.Set;
 
 import javax.ws.rs.core.Response.Status;
 
+import org.onap.dmaap.dbcapi.aaf.AafNamespace;
+import org.onap.dmaap.dbcapi.aaf.AafRole;
 import org.onap.dmaap.dbcapi.aaf.AafService;
+import org.onap.dmaap.dbcapi.aaf.DmaapGrant;
 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
 import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
 import org.onap.dmaap.dbcapi.database.DatabaseClass;
@@ -58,7 +61,6 @@ public class TopicService extends BaseLoggingClass {
        private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
        
        private static DmaapService dmaapSvc = new DmaapService();
-       private static Dmaap dmaap = new DmaapService().getDmaap();
        private MR_ClientService clientService = new MR_ClientService();
        private MR_ClusterService clusters = new MR_ClusterService();
        private DcaeLocationService locations = new DcaeLocationService();
@@ -66,11 +68,15 @@ public class TopicService extends BaseLoggingClass {
        
        private static String centralCname;
 
+
        public TopicService(){
                DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
                defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
                centralCname = p.getProperty("MR.CentralCname");
-               logger.info( "TopicService properties: CentralCname=" + centralCname + "   defaultGlobarlMrHost=" + defaultGlobalMrHost );
+
+               
+               logger.info( "TopicService properties: CentralCname=" + centralCname + 
+                               "   defaultGlobarlMrHost=" + defaultGlobalMrHost  );
        }
        
        public Map<String, Topic> getTopics() {                 
@@ -78,9 +84,18 @@ public class TopicService extends BaseLoggingClass {
        }
                
        public List<Topic> getAllTopics() {
+               return getAllTopics( true );
+       }
+       public List<Topic> getAllTopicsWithoutClients() {
+               return getAllTopics(false);
+       }
+       
+       private List<Topic> getAllTopics( Boolean withClients ) {
                ArrayList<Topic> topics = new ArrayList<Topic>(mr_topics.values());
-               for( Topic topic: topics ) {
-                       topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+               if ( withClients ) {
+                       for( Topic topic: topics ) {
+                               topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
+                       }
                }
                return topics;
        }
@@ -99,15 +114,108 @@ public class TopicService extends BaseLoggingClass {
                apiError.setCode(Status.OK.getStatusCode());
                return t;
        }
+       
+       private void aafTopicSetup(Topic topic, ApiError err ) {
 
-       public Topic addTopic( Topic topic, ApiError err ) {
+               String t = dmaapSvc.getTopicPerm();
+               if ( t == null ) {
+                       err.setCode(500);
+                       err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)"  );
+                       err.setFields("topicNsRoot");
+                       return;
+               }
+
+               // establish AAF Connection using TopicMgr identity
+               AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
+               
+
+               
+               // create AAF namespace for this topic
+               AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity());
+               {
+                       int rc = aaf.addNamespace( ns );
+                       if ( rc != 201 && rc != 409 ) {
+                               err.setCode(500);
+                               err.setMessage("Unexpected response from AAF:" + rc );
+                               err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity());
+                               return;
+                       }
+               }
+               
+               // create AAF Roles for MR clients of this topic
+               String rn = "publisher";
+               AafRole pubRole = new AafRole( topic.getFqtn(), rn );
+               int rc = aaf.addRole( pubRole );
+               if ( rc != 201 && rc != 409 ) {
+                       err.setCode(500);
+                       err.setMessage("Unexpected response from AAF:" + rc );
+                       err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
+                       return;
+               }
+               topic.setPublisherRole( pubRole.getFullyQualifiedRole() );
+               
+               rn = "subscriber";
+               AafRole subRole = new AafRole( topic.getFqtn(), rn );
+               rc = aaf.addRole( subRole );
+               if ( rc != 201 && rc != 409 ) {
+                       err.setCode(500);
+                       err.setMessage("Unexpected response from AAF:" + rc );
+                       err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
+                       return;
+               }
+               topic.setSubscriberRole( subRole.getFullyQualifiedRole() );
+       
+               
+               // create AAF perms checked by MR
+               String instance = ":topic." + topic.getFqtn();
+               String[] actions = { "pub", "sub", "view" };
+               for ( String action : actions ){
+                       DmaapPerm perm = new DmaapPerm( t, instance, action );
+                       rc = aaf.addPerm( perm );
+                       if ( rc != 201 && rc != 409 ) {
+                               err.setCode(500);
+                               err.setMessage("Unexpected response from AAF:" + rc );
+                               err.setFields("t="+t + " instance="+ instance + " action="+ action);
+                               return;
+                       }
+                       // Grant perms to our default Roles
+                       if ( action.equals( "pub") || action.equals( "view") ) {
+                               DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() );
+                               rc = aaf.addGrant( g );
+                               if ( rc != 201 && rc != 409 ) {
+                                       err.setCode(rc);
+                                       err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() );
+                                       logger.warn( err.getMessage());
+                                       return;
+                               } 
+                       }
+                       if ( action.equals( "sub") || action.equals( "view") ) {
+                               DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() );
+                               rc = aaf.addGrant( g );
+                               if ( rc != 201 && rc != 409 ) {
+                                       err.setCode(rc);
+                                       err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() );
+                                       logger.warn( err.getMessage());
+                                       return;
+                               } 
+                       }
+
+               }
+       }
+
+       public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) {
                logger.info( "Entry: addTopic");
                logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
                String nFqtn =  topic.genFqtn();
                logger.info( "FQTN=" + nFqtn );
-               if ( getTopic( nFqtn, err ) != null ) {
+               Topic pTopic = getTopic( nFqtn, err );
+               if ( pTopic != null ) {
                        String t = "topic already exists: " + nFqtn;
                        logger.info( t );
+                       if (  useExisting ) {
+                               err.setCode(Status.OK.getStatusCode());
+                               return pTopic;
+                       }
                        err.setMessage( t );
                        err.setFields( "fqtn");
                        err.setCode(Status.CONFLICT.getStatusCode());
@@ -116,22 +224,12 @@ public class TopicService extends BaseLoggingClass {
                err.reset();  // err filled with NOT_FOUND is expected case, but don't want to litter...
 
                topic.setFqtn( nFqtn );
+               
+               aafTopicSetup( topic, err );
+               if ( err.getCode() >= 400 ) {
+                       return null;
+               }       
 
-               AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
-               String t = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + ".mr.topic";
-               String instance = ":topic." + topic.getFqtn();
-
-               String[] actions = { "pub", "sub", "view" };
-               for ( String action : actions ){
-                       DmaapPerm perm = new DmaapPerm( t, instance, action );
-                       int rc = aaf.addPerm( perm );
-                       if ( rc != 201 && rc != 409 ) {
-                               err.setCode(500);
-                               err.setMessage("Unexpected response from AAF:" + rc );
-                               err.setFields("t="+t + " instance="+ instance + " action="+ action);
-                               return null;
-                       }
-               }
                if ( topic.getReplicationCase().involvesGlobal() ) {
                        if ( topic.getGlobalMrURL() == null ) {
                                topic.setGlobalMrURL(defaultGlobalMrHost);
@@ -159,6 +257,7 @@ public class TopicService extends BaseLoggingClass {
                                logger.info( "c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL());
                                MR_Client nc = new MR_Client( c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction());
                                nc.setFqtn(topic.getFqtn());
+                               nc.setClientIdentity( c.getClientIdentity());
                                logger.info( "nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL());
                                clients2.add( clientService.addMr_Client(nc, topic, err));
                                if ( ! err.is2xx()) {
@@ -231,10 +330,11 @@ public class TopicService extends BaseLoggingClass {
                String mmAgentRole = p.getProperty("MM.AgentRole");
                String[] Roles = { mmProvRole, mmAgentRole };
                String[] actions = { "view", "pub", "sub" };
-               Topic bridgeAdminTopic = new Topic();
+               Topic bridgeAdminTopic = new Topic().init();
                bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() );
                bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning");
                bridgeAdminTopic.setOwner( "DBC" );
+               
                ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
                for( String role: Roles ) {
                        MR_Client client = new MR_Client();
@@ -247,7 +347,7 @@ public class TopicService extends BaseLoggingClass {
                
                TopicService ts = new TopicService();
                ApiError err = new ApiError();
-               ts.addTopic(bridgeAdminTopic, err);
+               ts.addTopic(bridgeAdminTopic, err, true);
                
                if ( err.is2xx() || err.getCode() == 409 ){
                        err.setCode(200);
@@ -413,10 +513,7 @@ public class TopicService extends BaseLoggingClass {
                        if ( source != null && target != null ) {
                                try { 
                                        logger.info( "Create a MM from " + source + " to " + target );
-                                       MirrorMaker mm = bridge.getMirrorMaker( source, target);
-                                       if ( mm == null ) {
-                                               mm = new MirrorMaker(source, target);
-                                       }
+                                       MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
                                        mm.addTopic(topic.getFqtn());
                                        bridge.updateMirrorMaker(mm);
                                } catch ( Exception ex ) {
@@ -434,6 +531,7 @@ public class TopicService extends BaseLoggingClass {
 
        }
        
+       
        /*
         * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
         * This was determined automatically based on presence of edge publishers and central subscribers.