X-Git-Url: https://gerrit.onap.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Forg%2Fonap%2Fdmaap%2Fdbcapi%2Fservice%2FTopicService.java;h=eeffa5b69d3eddd2066fd57767a1c2b3e80bf570;hb=b578ddffd8ce8a7eceebd7b4df6ab4a6a9f803ec;hp=26def911eb7d660b83fec2b10c15d3bd2328f7c5;hpb=3d4f33e1581d63eede5ff3ebdd53f921f61cd5b6;p=dmaap%2Fdbcapi.git diff --git a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java index 26def91..eeffa5b 100644 --- a/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java +++ b/src/main/java/org/onap/dmaap/dbcapi/service/TopicService.java @@ -3,6 +3,8 @@ * org.onap.dmaap * ================================================================================ * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved. + * + * Modifications Copyright (C) 2019 IBM. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,7 +31,10 @@ import java.util.Set; import javax.ws.rs.core.Response.Status; +import org.onap.dmaap.dbcapi.aaf.AafNamespace; +import org.onap.dmaap.dbcapi.aaf.AafRole; import org.onap.dmaap.dbcapi.aaf.AafService; +import org.onap.dmaap.dbcapi.aaf.DmaapGrant; import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType; import org.onap.dmaap.dbcapi.aaf.DmaapPerm; import org.onap.dmaap.dbcapi.database.DatabaseClass; @@ -58,19 +63,25 @@ public class TopicService extends BaseLoggingClass { private Map mr_topics = DatabaseClass.getTopics(); private static DmaapService dmaapSvc = new DmaapService(); - private static Dmaap dmaap = new DmaapService().getDmaap(); private MR_ClientService clientService = new MR_ClientService(); private MR_ClusterService clusters = new MR_ClusterService(); private DcaeLocationService locations = new DcaeLocationService(); private MirrorMakerService bridge = new MirrorMakerService(); private static String centralCname; + private static boolean createTopicRoles; + public TopicService(){ DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig(); defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set"); centralCname = p.getProperty("MR.CentralCname"); - logger.info( "TopicService properties: CentralCname=" + centralCname + " defaultGlobarlMrHost=" + defaultGlobalMrHost ); + createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true")); + + + logger.info( "TopicService properties: CentralCname=" + centralCname + + " defaultGlobarlMrHost=" + defaultGlobalMrHost + + " createTopicRoles=" + createTopicRoles ); } public Map getTopics() { @@ -78,9 +89,18 @@ public class TopicService extends BaseLoggingClass { } public List getAllTopics() { - ArrayList topics = new ArrayList(mr_topics.values()); - for( Topic topic: topics ) { - topic.setClients( clientService.getAllMrClients(topic.getFqtn())); + return getAllTopics( true ); + } + public List getAllTopicsWithoutClients() { + return getAllTopics(false); + } + + private List getAllTopics( Boolean withClients ) { + ArrayList topics = new ArrayList<>(mr_topics.values()); + if ( withClients ) { + for( Topic topic: topics ) { + topic.setClients( clientService.getAllMrClients(topic.getFqtn())); + } } return topics; } @@ -99,15 +119,117 @@ public class TopicService extends BaseLoggingClass { apiError.setCode(Status.OK.getStatusCode()); return t; } + + private void aafTopicSetup(Topic topic, ApiError err ) { + + String nsr = dmaapSvc.getDmaap().getTopicNsRoot(); + if ( nsr == null ) { + err.setCode(500); + err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)" ); + err.setFields("topicNsRoot"); + return; + } + + // establish AAF Connection using TopicMgr identity + AafService aaf = new AafService(ServiceType.AAF_TopicMgr); + + AafRole pubRole = null; + AafRole subRole = null; + + // creating Topic Roles was not an original feature. + // For backwards compatibility, only do this if the feature is enabled. + // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns) + // then we likely don't have permission to create sub-ns and Roles so don't try. + if ( createTopicRoles && topic.getFqtn().startsWith(nsr)) { + // create AAF namespace for this topic + AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity()); + { + int rc = aaf.addNamespace( ns ); + if ( rc != 201 && rc != 409 ) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc ); + err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity()); + return; + } + } + + // create AAF Roles for MR clients of this topic + String rn = "publisher"; + pubRole = new AafRole( topic.getFqtn(), rn ); + int rc = aaf.addRole( pubRole ); + if ( rc != 201 && rc != 409 ) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc ); + err.setFields("topic:" + topic.getFqtn() + " role="+ rn); + return; + } + topic.setPublisherRole( pubRole.getFullyQualifiedRole() ); + + rn = "subscriber"; + subRole = new AafRole( topic.getFqtn(), rn ); + rc = aaf.addRole( subRole ); + if ( rc != 201 && rc != 409 ) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc ); + err.setFields("topic:" + topic.getFqtn() + " role="+ rn); + return; + } + topic.setSubscriberRole( subRole.getFullyQualifiedRole() ); + } + + // create AAF perms checked by MR + String instance = ":topic." + topic.getFqtn(); + String[] actions = { "pub", "sub", "view" }; + String t = dmaapSvc.getTopicPerm(); + for ( String action : actions ){ + DmaapPerm perm = new DmaapPerm( t, instance, action ); + int rc = aaf.addPerm( perm ); + if ( rc != 201 && rc != 409 ) { + err.setCode(500); + err.setMessage("Unexpected response from AAF:" + rc ); + err.setFields("t="+t + " instance="+ instance + " action="+ action); + return; + } + if ( createTopicRoles ) { + // Grant perms to our default Roles + if ( action.equals( "pub") || action.equals( "view") ) { + DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() ); + rc = aaf.addGrant( g ); + if ( rc != 201 && rc != 409 ) { + err.setCode(rc); + err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() ); + logger.warn( err.getMessage()); + return; + } + } + if ( action.equals( "sub") || action.equals( "view") ) { + DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() ); + rc = aaf.addGrant( g ); + if ( rc != 201 && rc != 409 ) { + err.setCode(rc); + err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() ); + logger.warn( err.getMessage()); + return; + } + } + } + + } + } - public Topic addTopic( Topic topic, ApiError err ) { + public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) { logger.info( "Entry: addTopic"); logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() ); String nFqtn = topic.genFqtn(); logger.info( "FQTN=" + nFqtn ); - if ( getTopic( nFqtn, err ) != null ) { + Topic pTopic = getTopic( nFqtn, err ); + if ( pTopic != null ) { String t = "topic already exists: " + nFqtn; logger.info( t ); + if ( useExisting ) { + err.setCode(Status.OK.getStatusCode()); + return pTopic; + } err.setMessage( t ); err.setFields( "fqtn"); err.setCode(Status.CONFLICT.getStatusCode()); @@ -116,22 +238,12 @@ public class TopicService extends BaseLoggingClass { err.reset(); // err filled with NOT_FOUND is expected case, but don't want to litter... topic.setFqtn( nFqtn ); + + aafTopicSetup( topic, err ); + if ( err.getCode() >= 400 ) { + return null; + } - AafService aaf = new AafService(ServiceType.AAF_TopicMgr); - String t = dmaap.getTopicNsRoot() + "." + dmaap.getDmaapName() + ".mr.topic"; - String instance = ":topic." + topic.getFqtn(); - - String[] actions = { "pub", "sub", "view" }; - for ( String action : actions ){ - DmaapPerm perm = new DmaapPerm( t, instance, action ); - int rc = aaf.addPerm( perm ); - if ( rc != 201 && rc != 409 ) { - err.setCode(500); - err.setMessage("Unexpected response from AAF:" + rc ); - err.setFields("t="+t + " instance="+ instance + " action="+ action); - return null; - } - } if ( topic.getReplicationCase().involvesGlobal() ) { if ( topic.getGlobalMrURL() == null ) { topic.setGlobalMrURL(defaultGlobalMrHost); @@ -159,6 +271,7 @@ public class TopicService extends BaseLoggingClass { logger.info( "c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL()); MR_Client nc = new MR_Client( c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction()); nc.setFqtn(topic.getFqtn()); + nc.setClientIdentity( c.getClientIdentity()); logger.info( "nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL()); clients2.add( clientService.addMr_Client(nc, topic, err)); if ( ! err.is2xx()) { @@ -231,10 +344,11 @@ public class TopicService extends BaseLoggingClass { String mmAgentRole = p.getProperty("MM.AgentRole"); String[] Roles = { mmProvRole, mmAgentRole }; String[] actions = { "view", "pub", "sub" }; - Topic bridgeAdminTopic = new Topic(); + Topic bridgeAdminTopic = new Topic().init(); bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() ); bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning"); bridgeAdminTopic.setOwner( "DBC" ); + ArrayList clients = new ArrayList(); for( String role: Roles ) { MR_Client client = new MR_Client(); @@ -247,7 +361,7 @@ public class TopicService extends BaseLoggingClass { TopicService ts = new TopicService(); ApiError err = new ApiError(); - ts.addTopic(bridgeAdminTopic, err); + ts.addTopic(bridgeAdminTopic, err, true); if ( err.is2xx() || err.getCode() == 409 ){ err.setCode(200); @@ -413,10 +527,7 @@ public class TopicService extends BaseLoggingClass { if ( source != null && target != null ) { try { logger.info( "Create a MM from " + source + " to " + target ); - MirrorMaker mm = bridge.getMirrorMaker( source, target); - if ( mm == null ) { - mm = new MirrorMaker(source, target); - } + MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn()); mm.addTopic(topic.getFqtn()); bridge.updateMirrorMaker(mm); } catch ( Exception ex ) { @@ -434,6 +545,7 @@ public class TopicService extends BaseLoggingClass { } + /* * Prior to 1707, we only supported EDGE_TO_CENTRAL replication. * This was determined automatically based on presence of edge publishers and central subscribers.