X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FTopicService.java;h=68dfd5166b9ec1b4708ebb6668581d5c984ed6ff;hb=479c7a5645b6f3f9bf478f925fa2009597871a7b;hp=cfec54e4e410678d1cb5931b5687cb6be5d87063;hpb=536c6aabdfd2bcdc493501a9498fb8a97d208c0b;p=dmaap%2Fdbcapi.git diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index cfec54e..68dfd51 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -3,6 +3,8 @@ * org.onap.dmaap * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * + * Modifications Copyright (C) 2019 IBM. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,7 +42,6 @@ import org.onap.dmaap.dbcapi.logging.BaseLoggingClass; import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum; import org.onap.dmaap.dbcapi.model.ApiError; import org.onap.dmaap.dbcapi.model.DcaeLocation; -import org.onap.dmaap.dbcapi.model.Dmaap; import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status; import org.onap.dmaap.dbcapi.model.MR_Client; import org.onap.dmaap.dbcapi.model.MR_Cluster; @@ -67,16 +68,25 @@ public class TopicService extends BaseLoggingClass { private MirrorMakerService bridge = new MirrorMakerService(); private static String centralCname; + private static boolean createTopicRoles; + private boolean strictGraph = true; + private boolean mmPerMR; public TopicService(){ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); centralCname = p.getProperty("MR.CentralCname"); - - + createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true")); + String unit_test = p.getProperty( "UnitTest", "No" ); + if ( unit_test.equals( "Yes" ) ) { + strictGraph = false; + } + mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true")); logger.info( "TopicService properties: CentralCname=" + centralCname + - " defaultGlobarlMrHost=" + defaultGlobalMrHost ); + " defaultGlobarlMrHost=" + defaultGlobalMrHost + + " createTopicRoles=" + createTopicRoles + + " mmPerMR=" + mmPerMR ); } public Map getTopics() { @@ -91,7 +101,7 @@ public class TopicService extends BaseLoggingClass { } private List getAllTopics( Boolean withClients ) { - ArrayList topics = new ArrayList(mr_topics.values()); + ArrayList topics = new ArrayList<>(mr_topics.values()); if ( withClients ) { for( Topic topic: topics ) { topic.setClients( clientService.getAllMrClients(topic.getFqtn())); @@ -116,9 +126,9 @@ public class TopicService extends BaseLoggingClass { } private void aafTopicSetup(Topic topic, ApiError err ) { - - String t = dmaapSvc.getTopicPerm(); - if ( t == null ) { + + String nsr = dmaapSvc.getDmaap().getTopicNsRoot(); + if ( nsr == null ) { err.setCode(500); err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)" ); err.setFields("topicNsRoot"); @@ -128,76 +138,85 @@ public class TopicService extends BaseLoggingClass { // establish AAF Connection using TopicMgr identity AafService aaf = new AafService(ServiceType.AAF_TopicMgr); - + AafRole pubRole = null; + AafRole subRole = null; - // create AAF namespace for this topic - AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity()); - { - int rc = aaf.addNamespace( ns ); + // creating Topic Roles was not an original feature. + // For backwards compatibility, only do this if the feature is enabled. + // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns) + // then we likely don't have permission to create sub-ns and Roles so don't try. + if ( createTopicRoles && topic.getFqtn().startsWith(nsr)) { + // create AAF namespace for this topic + AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity()); + { + int rc = aaf.addNamespace( ns ); + if ( rc != 201 && rc != 409 ) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc ); + err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity()); + return; + } + } + + // create AAF Roles for MR clients of this topic + String rn = "publisher"; + pubRole = new AafRole( topic.getFqtn(), rn ); + int rc = aaf.addRole( pubRole ); if ( rc != 201 && rc != 409 ) { err.setCode(500); err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity()); + err.setFields("topic:" + topic.getFqtn() + " role="+ rn); return; } + topic.setPublisherRole( pubRole.getFullyQualifiedRole() ); + + rn = "subscriber"; + subRole = new AafRole( topic.getFqtn(), rn ); + rc = aaf.addRole( subRole ); + if ( rc != 201 && rc != 409 ) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc ); + err.setFields("topic:" + topic.getFqtn() + " role="+ rn); + return; + } + topic.setSubscriberRole( subRole.getFullyQualifiedRole() ); } - - // create AAF Roles for MR clients of this topic - String rn = "publisher"; - AafRole pubRole = new AafRole( topic.getFqtn(), rn ); - int rc = aaf.addRole( pubRole ); - if ( rc != 201 && rc != 409 ) { - err.setCode(500); - err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("topic:" + topic.getFqtn() + " role="+ rn); - return; - } - topic.setPublisherRole( pubRole.getFullyQualifiedRole() ); - - rn = "subscriber"; - AafRole subRole = new AafRole( topic.getFqtn(), rn ); - rc = aaf.addRole( subRole ); - if ( rc != 201 && rc != 409 ) { - err.setCode(500); - err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("topic:" + topic.getFqtn() + " role="+ rn); - return; - } - topic.setSubscriberRole( subRole.getFullyQualifiedRole() ); - // create AAF perms checked by MR String instance = ":topic." + topic.getFqtn(); String[] actions = { "pub", "sub", "view" }; + String t = dmaapSvc.getTopicPerm(); for ( String action : actions ){ DmaapPerm perm = new DmaapPerm( t, instance, action ); - rc = aaf.addPerm( perm ); + int rc = aaf.addPerm( perm ); if ( rc != 201 && rc != 409 ) { err.setCode(500); err.setMessage("Unexpected response from AAF:" + rc ); err.setFields("t="+t + " instance="+ instance + " action="+ action); return; } - // Grant perms to our default Roles - if ( action.equals( "pub") || action.equals( "view") ) { - DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() ); - rc = aaf.addGrant( g ); - if ( rc != 201 && rc != 409 ) { - err.setCode(rc); - err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() ); - logger.warn( err.getMessage()); - return; - } - } - if ( action.equals( "sub") || action.equals( "view") ) { - DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() ); - rc = aaf.addGrant( g ); - if ( rc != 201 && rc != 409 ) { - err.setCode(rc); - err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() ); - logger.warn( err.getMessage()); - return; - } + if ( createTopicRoles ) { + // Grant perms to our default Roles + if ( action.equals( "pub") || action.equals( "view") ) { + DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() ); + rc = aaf.addGrant( g ); + if ( rc != 201 && rc != 409 ) { + err.setCode(rc); + err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() ); + logger.warn( err.getMessage()); + return; + } + } + if ( action.equals( "sub") || action.equals( "view") ) { + DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() ); + rc = aaf.addGrant( g ); + if ( rc != 201 && rc != 409 ) { + err.setCode(rc); + err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() ); + logger.warn( err.getMessage()); + return; + } + } } } @@ -285,10 +304,13 @@ public class TopicService extends BaseLoggingClass { public Topic updateTopic( Topic topic, ApiError err ) { - logger.info( "Entry: updateTopic"); + logger.info( "updateTopic: entry"); + logger.info( "updateTopic: topic=" + topic); + logger.info( "updateTopic: fqtn=" + topic.getFqtn() ); if ( topic.getFqtn().isEmpty()) { return null; } + logger.info( "updateTopic: call checkForBridge"); Topic ntopic = checkForBridge( topic, err ); if ( ntopic == null ) { topic.setStatus( DmaapObject_Status.INVALID); @@ -297,6 +319,7 @@ public class TopicService extends BaseLoggingClass { } } if(ntopic != null) { + logger.info( "updateTopic: call put"); mr_topics.put( ntopic.getFqtn(), ntopic ); } err.setCode(Status.OK.getStatusCode()); @@ -360,7 +383,8 @@ public class TopicService extends BaseLoggingClass { public Topic checkForBridge( Topic topic, ApiError err ) { - + logger.info( "checkForBridge: entry"); + logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase()); if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) { topic.setStatus( DmaapObject_Status.VALID); return topic; @@ -370,6 +394,7 @@ public class TopicService extends BaseLoggingClass { Set groups = clusters.getGroups(); for ( String g : groups ) { + logger.info( "buildBridge for " + topic.getFqtn() + " on group" + g); anythingWrong |= buildBridge( topic, err, g ); } if ( anythingWrong ) { @@ -384,19 +409,24 @@ public class TopicService extends BaseLoggingClass { } private boolean buildBridge( Topic topic, ApiError err, String group ) { - + logger.info( "buildBridge: entry"); boolean anythingWrong = false; Graph graph; + logger.info( "buildBridge: strictGraph=" + strictGraph ); if ( group == null || group.isEmpty() ) { - graph = new Graph( topic.getClients(), true ); + graph = new Graph( topic.getClients(), strictGraph ); } else { - graph = new Graph( topic.getClients(), true, group ); + graph = new Graph( topic.getClients(), strictGraph, group ); } + logger.info( "buildBridge: graph=" + graph ); MR_Cluster groupCentralCluster = null; + if ( graph.isEmpty() ) { + logger.info( "buildBridge: graph is empty. return false" ); return false; } else if ( group == null && topic.getReplicationCase().involvesFQDN() ) { + logger.info( "buildBridge: group is null and replicationCaseInvolvesFQDN. return false" ); return false; } else if ( ! graph.hasCentral() ) { logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients"); @@ -409,7 +439,8 @@ public class TopicService extends BaseLoggingClass { logger.info( "loc=" + loc ); DcaeLocation location = locations.getDcaeLocation(loc); MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc); - logger.info( "cluster=" + cluster ); + logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() ); + logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() ); @@ -424,11 +455,12 @@ public class TopicService extends BaseLoggingClass { case REPLICATION_EDGE_TO_CENTRAL: case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL: // NOTE: this is for E2C portion only source = cluster.getFqdn(); - target = centralCname; + target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname; + logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target ); break; case REPLICATION_CENTRAL_TO_EDGE: case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE: // NOTE: this is for C2E portion only - source = centralCname; + source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname; target = cluster.getFqdn(); break; case REPLICATION_CENTRAL_TO_GLOBAL: @@ -513,7 +545,7 @@ public class TopicService extends BaseLoggingClass { if ( source != null && target != null ) { try { logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn()); + MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn()); mm.addTopic(topic.getFqtn()); bridge.updateMirrorMaker(mm); } catch ( Exception ex ) {