Junits for TopicService
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dmaap
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  *
7  * Modifications Copyright (C) 2019 IBM.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.dmaap.dbcapi.service;
24
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31
32 import javax.ws.rs.core.Response.Status;
33
34 import org.onap.dmaap.dbcapi.aaf.AafNamespace;
35 import org.onap.dmaap.dbcapi.aaf.AafRole;
36 import org.onap.dmaap.dbcapi.aaf.AafService;
37 import org.onap.dmaap.dbcapi.aaf.DmaapGrant;
38 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
39 import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
40 import org.onap.dmaap.dbcapi.database.DatabaseClass;
41 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
42 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
43 import org.onap.dmaap.dbcapi.model.ApiError;
44 import org.onap.dmaap.dbcapi.model.DcaeLocation;
45 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
46 import org.onap.dmaap.dbcapi.model.MR_Client;
47 import org.onap.dmaap.dbcapi.model.MR_Cluster;
48 import org.onap.dmaap.dbcapi.model.MirrorMaker;
49 import org.onap.dmaap.dbcapi.model.ReplicationType;
50 import org.onap.dmaap.dbcapi.model.Topic;
51 import org.onap.dmaap.dbcapi.util.DmaapConfig;
52 import org.onap.dmaap.dbcapi.util.Fqdn;
53 import org.onap.dmaap.dbcapi.util.Graph;
54
55 public class TopicService extends BaseLoggingClass {
56
57         
58
59         // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
60         private static String defaultGlobalMrHost;
61         
62         private Map<String, Topic> mr_topics;
63         
64         private static DmaapService dmaapSvc = new DmaapService();
65         private MR_ClientService clientService;
66         private MR_ClusterService clusters;
67         private DcaeLocationService locations;
68         private MirrorMakerService      bridge;
69         
70         private static String centralCname;
71         private static boolean createTopicRoles;
72         private boolean strictGraph = true;
73         private boolean mmPerMR;
74
75
76         public TopicService(){
77                 this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig)DmaapConfig.getConfig(),
78                                 new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService());
79
80         }
81
82         TopicService(Map<String, Topic> mr_topics,  MR_ClientService clientService, DmaapConfig p,
83                                  MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService  bridge) {
84                 this.mr_topics = mr_topics;
85                 this.clientService = clientService;
86                 defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
87                 centralCname = p.getProperty("MR.CentralCname");
88                 createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
89                 String unit_test = p.getProperty( "UnitTest", "No" );
90                 if ( "Yes".equals(unit_test)) {
91                         strictGraph = false;
92                 }
93                 mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
94                 logger.info( "TopicService properties: CentralCname=" + centralCname +
95                                 "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
96                                 " createTopicRoles=" + createTopicRoles +
97                                 " mmPerMR=" + mmPerMR );
98                 this.clusters = clusters;
99                 this.locations = locations;
100                 this.bridge = bridge;
101         }
102
103         public Map<String, Topic> getTopics() {
104                 return mr_topics;
105         }
106                 
107         public List<Topic> getAllTopics() {
108                 return getAllTopics( true );
109         }
110         public List<Topic> getAllTopicsWithoutClients() {
111                 return getAllTopics(false);
112         }
113         
114         private List<Topic> getAllTopics( Boolean withClients ) {
115                 ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
116                 if ( withClients ) {
117                         for( Topic topic: topics ) {
118                                 topic.setClients(clientService.getAllMrClients(topic.getFqtn()));
119                         }
120                 }
121                 return topics;
122         }
123         
124                 
125         public Topic getTopic( String key, ApiError apiError ) {        
126                 logger.info( "getTopic: key=" + key);
127                 Topic t = mr_topics.get( key );
128                 if ( t == null ) {
129                         apiError.setCode(Status.NOT_FOUND.getStatusCode());
130                         apiError.setFields( "fqtn");
131                         apiError.setMessage("topic with fqtn " + key + " not found");
132                         return null;
133                 }
134                 t.setClients( clientService.getAllMrClients( key ));
135                 apiError.setCode(Status.OK.getStatusCode());
136                 return t;
137         }
138         
139         private void aafTopicSetup(Topic topic, ApiError err ) {
140                 
141                 String nsr = dmaapSvc.getDmaap().getTopicNsRoot();
142                 if ( nsr == null ) {
143                         err.setCode(500);
144                         err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)"  );
145                         err.setFields("topicNsRoot");
146                         return;
147                 }
148
149                 // establish AAF Connection using TopicMgr identity
150                 AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
151                 
152                 AafRole pubRole = null;
153                 AafRole subRole = null;
154                 
155                 // creating Topic Roles was not an original feature.
156                 // For backwards compatibility, only do this if the feature is enabled.
157                 // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns)
158                 // then we likely don't have permission to create sub-ns and Roles so don't try.
159                 if ( createTopicRoles && topic.getFqtn().startsWith(nsr)) {
160                         // create AAF namespace for this topic
161                         AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity());
162                         {
163                                 int rc = aaf.addNamespace( ns );
164                                 if ( rc != 201 && rc != 409 ) {
165                                         err.setCode(500);
166                                         err.setMessage("Unexpected response from AAF:" + rc );
167                                         err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity());
168                                         return;
169                                 }
170                         }
171                         
172                         // create AAF Roles for MR clients of this topic
173                         String rn = "publisher";
174                         pubRole = new AafRole( topic.getFqtn(), rn );
175                         int rc = aaf.addRole( pubRole );
176                         if ( rc != 201 && rc != 409 ) {
177                                 err.setCode(500);
178                                 err.setMessage("Unexpected response from AAF:" + rc );
179                                 err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
180                                 return;
181                         }
182                         topic.setPublisherRole( pubRole.getFullyQualifiedRole() );
183                         
184                         rn = "subscriber";
185                         subRole = new AafRole( topic.getFqtn(), rn );
186                         rc = aaf.addRole( subRole );
187                         if ( rc != 201 && rc != 409 ) {
188                                 err.setCode(500);
189                                 err.setMessage("Unexpected response from AAF:" + rc );
190                                 err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
191                                 return;
192                         }
193                         topic.setSubscriberRole( subRole.getFullyQualifiedRole() );
194                 }
195         
196                 // create AAF perms checked by MR
197                 String instance = ":topic." + topic.getFqtn();
198                 String[] actions = { "pub", "sub", "view" };
199                 String t = dmaapSvc.getTopicPerm();
200                 for ( String action : actions ){
201                         DmaapPerm perm = new DmaapPerm( t, instance, action );
202                         int rc = aaf.addPerm( perm );
203                         if ( rc != 201 && rc != 409 ) {
204                                 err.setCode(500);
205                                 err.setMessage("Unexpected response from AAF:" + rc );
206                                 err.setFields("t="+t + " instance="+ instance + " action="+ action);
207                                 return;
208                         }
209                         if ( createTopicRoles ) {
210                                 // Grant perms to our default Roles
211                                 if ( action.equals( "pub") || action.equals( "view") ) {
212                                         DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() );
213                                         rc = aaf.addGrant( g );
214                                         if ( rc != 201 && rc != 409 ) {
215                                                 err.setCode(rc);
216                                                 err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() );
217                                                 logger.warn( err.getMessage());
218                                                 return;
219                                         } 
220                                 }
221                                 if ( action.equals( "sub") || action.equals( "view") ) {
222                                         DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() );
223                                         rc = aaf.addGrant( g );
224                                         if ( rc != 201 && rc != 409 ) {
225                                                 err.setCode(rc);
226                                                 err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() );
227                                                 logger.warn( err.getMessage());
228                                                 return;
229                                         } 
230                                 }
231                         }
232
233                 }
234         }
235
236         public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) {
237                 logger.info( "Entry: addTopic");
238                 logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
239                 String nFqtn =  topic.genFqtn();
240                 logger.info( "FQTN=" + nFqtn );
241                 Topic pTopic = getTopic( nFqtn, err );
242                 if ( pTopic != null ) {
243                         String t = "topic already exists: " + nFqtn;
244                         logger.info( t );
245                         if (  useExisting ) {
246                                 err.setCode(Status.OK.getStatusCode());
247                                 return pTopic;
248                         }
249                         err.setMessage( t );
250                         err.setFields( "fqtn");
251                         err.setCode(Status.CONFLICT.getStatusCode());
252                         return null;
253                 }
254                 err.reset();  // err filled with NOT_FOUND is expected case, but don't want to litter...
255
256                 topic.setFqtn( nFqtn );
257                 
258                 aafTopicSetup( topic, err );
259                 if ( err.getCode() >= 400 ) {
260                         return null;
261                 }       
262
263                 if ( topic.getReplicationCase().involvesGlobal() ) {
264                         if ( topic.getGlobalMrURL() == null ) {
265                                 topic.setGlobalMrURL(defaultGlobalMrHost);
266                         }
267                         if ( ! Fqdn.isValid( topic.getGlobalMrURL())) {
268                                 logger.error( "GlobalMR FQDN not valid: " + topic.getGlobalMrURL());
269                                 topic.setStatus( DmaapObject_Status.INVALID);
270                                 err.setCode(500);
271                                 err.setMessage("Value is not a valid FQDN:" +  topic.getGlobalMrURL() );
272                                 err.setFields("globalMrURL");
273         
274                                 return null;
275                         }
276                 }
277
278
279                 if ( topic.getNumClients() > 0 ) {
280                         ArrayList<MR_Client> clients = new ArrayList<MR_Client>(topic.getClients());
281                 
282         
283                         ArrayList<MR_Client> clients2 = new ArrayList<MR_Client>();
284                         for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
285                                 MR_Client c = it.next();
286
287                                 logger.info( "c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL());
288                                 MR_Client nc = new MR_Client( c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction());
289                                 nc.setFqtn(topic.getFqtn());
290                                 nc.setClientIdentity( c.getClientIdentity());
291                                 logger.info( "nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL());
292                                 clients2.add( clientService.addMr_Client(nc, topic, err));
293                                 if ( ! err.is2xx()) {
294                                         return null;
295                                 }
296                         }
297
298                         topic.setClients(clients2);
299                 }
300
301                 Topic ntopic = checkForBridge( topic, err );
302                 if ( ntopic == null ) {
303                         topic.setStatus( DmaapObject_Status.INVALID);
304                         if ( ! err.is2xx()) {
305                                 return null;
306                         }
307                 }
308
309                 
310                 mr_topics.put( nFqtn, ntopic );
311
312                 err.setCode(Status.OK.getStatusCode());
313                 return ntopic;
314         }
315         
316                 
317         public Topic updateTopic( Topic topic, ApiError err ) {
318                 logger.info( "updateTopic: entry");
319                 logger.info( "updateTopic: topic=" + topic);
320                 logger.info( "updateTopic: fqtn=" + topic.getFqtn() );
321                 if ( topic.getFqtn().isEmpty()) {
322                         return null;
323                 }
324                 logger.info( "updateTopic: call checkForBridge");
325                 Topic ntopic = checkForBridge( topic, err );
326                 if ( ntopic == null ) {
327                         topic.setStatus( DmaapObject_Status.INVALID);
328                         if ( ! err.is2xx() ) {
329                                 return null;
330                         }
331                 }
332                 if(ntopic != null) {
333                         logger.info( "updateTopic: call put");
334                         mr_topics.put( ntopic.getFqtn(), ntopic );
335                 }
336                 err.setCode(Status.OK.getStatusCode());
337                 return ntopic;
338         }
339                 
340         public Topic removeTopic( String pubId, ApiError apiError ) {
341                 Topic topic = mr_topics.get(pubId);
342                 if ( topic == null ) {
343                         apiError.setCode(Status.NOT_FOUND.getStatusCode());
344                         apiError.setMessage("Topic " + pubId + " does not exist");
345                         apiError.setFields("fqtn");
346                         return null;
347                 }
348                 ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients( pubId ));
349                 for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
350                         MR_Client c = it.next();
351                         
352         
353                         clientService.removeMr_Client(c.getMrClientId(), false, apiError);
354                         if ( ! apiError.is2xx()) {
355                                 return null;
356                         }
357                 }
358                 apiError.setCode(Status.OK.getStatusCode());
359                 return mr_topics.remove(pubId);
360         }       
361         public static ApiError setBridgeClientPerms( MR_Cluster node ) {
362                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
363                 String mmProvRole = p.getProperty("MM.ProvRole");
364                 String mmAgentRole = p.getProperty("MM.AgentRole");
365                 String[] Roles = { mmProvRole, mmAgentRole };
366                 String[] actions = { "view", "pub", "sub" };
367                 Topic bridgeAdminTopic = new Topic().init();
368                 bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() );
369                 bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning");
370                 bridgeAdminTopic.setOwner( "DBC" );
371                 
372                 ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
373                 for( String role: Roles ) {
374                         MR_Client client = new MR_Client();
375                         client.setAction(actions);
376                         client.setClientRole(role);
377                         client.setDcaeLocationName( node.getDcaeLocationName());
378                         clients.add( client );
379                 }
380                 bridgeAdminTopic.setClients(clients);
381                 
382                 TopicService ts = new TopicService();
383                 ApiError err = new ApiError();
384                 ts.addTopic(bridgeAdminTopic, err, true);
385                 
386                 if ( err.is2xx() || err.getCode() == 409 ){
387                         err.setCode(200);
388                         return err;
389                 }
390                 
391                 errorLogger.error( DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR,  bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage());
392                 return err;
393         }       
394         
395         
396         public Topic checkForBridge( Topic topic, ApiError err ) {
397                 logger.info( "checkForBridge: entry");
398                 logger.info( "fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase());
399                 if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
400                         topic.setStatus( DmaapObject_Status.VALID);
401                         return topic;   
402                 }
403                 
404                 boolean anythingWrong = false;
405                 
406                 Set<String> groups = clusters.getGroups();
407                 for ( String g : groups ) {
408                         logger.info( "buildBridge for " + topic.getFqtn() + " on group" + g);
409                         anythingWrong |= buildBridge( topic, err, g );
410                 }
411                 if ( anythingWrong ) {
412                         topic.setStatus( DmaapObject_Status.INVALID);
413                         if ( ! err.is2xx() ) {
414                                 return null;
415                         }       
416                 } else {
417                         topic.setStatus( DmaapObject_Status.VALID);
418                 }
419                 return topic;
420         }
421                 
422         private boolean buildBridge( Topic topic, ApiError err, String group ) {
423                 logger.info( "buildBridge: entry");
424                 boolean anythingWrong = false;
425                 Graph graph;
426                 logger.info( "buildBridge: strictGraph=" + strictGraph );
427                 if ( group == null || group.isEmpty() ) {
428                         graph = new Graph( topic.getClients(), strictGraph );
429                 } else {
430                         graph = new Graph( topic.getClients(), strictGraph, group );
431                 }
432                 logger.info( "buildBridge: graph=" + graph );
433                 MR_Cluster groupCentralCluster = null;
434                 
435                 
436                 if ( graph.isEmpty() ) {
437                         logger.info( "buildBridge: graph is empty.  return false" );
438                         return false;
439                 } else if ( group == null &&  topic.getReplicationCase().involvesFQDN() ) {
440                         logger.info( "buildBridge: group is null and replicationCaseInvolvesFQDN. return false" );
441                         return false;
442                 } else if ( ! graph.hasCentral() ) {
443                         logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
444                         return true;
445                 } else {
446                         groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
447                 }
448                 Collection<String> clientLocations = graph.getKeys();
449                 for( String loc : clientLocations ) {
450                         logger.info( "loc=" + loc );
451                         DcaeLocation location = locations.getDcaeLocation(loc);
452                         MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
453                         logger.info( "cluster=" + cluster + " at "+ cluster.getDcaeLocationName() );
454                         logger.info( "location.isCentral()="+location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc() );
455
456                         
457                                 
458                         String source = null;
459                         String target = null;
460                         
461                         /*
462                          * Provision Edge to Central bridges...
463                          */
464                         if ( ! location.isCentral()  && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
465                                 switch( topic.getReplicationCase() ) {
466                                 case REPLICATION_EDGE_TO_CENTRAL:
467                                 case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
468                                         source = cluster.getFqdn();
469                                         target = (mmPerMR)? groupCentralCluster.getFqdn() : centralCname;
470                                         logger.info( "REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" +target );
471                                         break;
472                                 case REPLICATION_CENTRAL_TO_EDGE:
473                                 case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
474                                         source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
475                                         target = cluster.getFqdn();
476                                         break;
477                                 case REPLICATION_CENTRAL_TO_GLOBAL:
478                                 case REPLICATION_GLOBAL_TO_CENTRAL:
479                                 case REPLICATION_FQDN_TO_GLOBAL:
480                                 case REPLICATION_GLOBAL_TO_FQDN:
481                                         break;
482
483                                 case REPLICATION_EDGE_TO_FQDN:
484                                 case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2C portion only
485                                         source = cluster.getFqdn();
486                                         target = groupCentralCluster.getFqdn();
487                                         break;
488                                 case REPLICATION_FQDN_TO_EDGE:
489                                 case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for F2E portion only
490                                         source = groupCentralCluster.getFqdn();
491                                         target = cluster.getFqdn();
492                                         break;
493
494                                 default:
495                                         logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
496                                         anythingWrong = true;
497                                         err.setCode(400);
498                                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
499                                                         + topic.getReplicationCase() );
500                                         err.setMessage("Unexpected value for ReplicationType");
501                                         continue;
502                                 }
503
504                         } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
505                                 /*
506                                  * Provision Central to Global bridges
507                                  */
508                                 switch( topic.getReplicationCase() ) {
509
510                                 case REPLICATION_CENTRAL_TO_GLOBAL:
511                                 case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
512                                         source = centralCname;
513                                         target = topic.getGlobalMrURL();
514                                         break;
515                                 case REPLICATION_GLOBAL_TO_CENTRAL:
516                                 case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
517                                         source = topic.getGlobalMrURL();
518                                         target = centralCname;
519                                         break;
520
521                                 case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2F portion only
522                                         source = groupCentralCluster.getFqdn();
523                                         target = topic.getGlobalMrURL();
524                                         break;
525
526                                 case REPLICATION_FQDN_TO_GLOBAL:
527                                         source = groupCentralCluster.getFqdn();
528                                         target = topic.getGlobalMrURL();
529                                         break;
530                                         
531                                 case REPLICATION_GLOBAL_TO_FQDN:
532                                 case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for G2F portion only
533                                         source = topic.getGlobalMrURL();
534                                         target = groupCentralCluster.getFqdn();
535                                         break;
536
537                                 case REPLICATION_FQDN_TO_EDGE:
538                                 case REPLICATION_EDGE_TO_FQDN:
539                                 case REPLICATION_EDGE_TO_CENTRAL:
540                                 case REPLICATION_CENTRAL_TO_EDGE:
541                                         break;
542                                 default:
543                                         logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
544                                         anythingWrong = true;
545                                         err.setCode(400);
546                                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
547                                                         + topic.getReplicationCase() );
548                                         err.setMessage("Unexpected value for ReplicationType");
549                                         continue;
550                                 }                               
551                         } else {
552                                 logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
553                                 anythingWrong = true;
554                                 continue;
555                         }
556                         if ( source != null && target != null ) {
557                                 try { 
558                                         logger.info( "Create a MM from " + source + " to " + target );
559                                         MirrorMaker mm = bridge.findNextMM( source, target, topic.getFqtn());
560                                         mm.addTopic(topic.getFqtn());
561                                         bridge.updateMirrorMaker(mm);
562                                 } catch ( Exception ex ) {
563                                         err.setCode(500);
564                                         err.setFields( "mirror_maker.topic");
565                                         err.setMessage("Unexpected condition: " + ex );
566                                         anythingWrong = true;
567                                         break;
568                                 }
569                         }                       
570
571                         
572                 }
573                 return  anythingWrong;
574
575         }
576         
577         
578         /*
579          * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
580          * This was determined automatically based on presence of edge publishers and central subscribers.
581          * The following method is a modification of that original logic, to preserve some backwards compatibility, 
582          * i.e. to be used when no ReplicationType is specified.
583          */
584         public ReplicationType reviewTopic( Topic topic ) {
585         
586                 
587                 if ( topic.getNumClients() > 1 ) {
588                         Graph graph = new Graph( topic.getClients(), false );
589                         
590                         String centralFqdn = new String();
591                         if ( graph.hasCentral() ) {
592                                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
593                                 centralFqdn = p.getProperty("MR.CentralCname");
594                         }
595
596                         Collection<String> locations = graph.getKeys();
597                         for( String loc : locations ) {
598                                 logger.info( "loc=" + loc );
599                                 MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
600                                 if ( cluster == null ) {
601                                         logger.info( "No MR cluster for location " + loc );
602                                         continue;
603                                 }
604                                 if ( graph.hasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
605                                         logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
606                                         return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
607                                         
608                                 }
609                                 
610                         }
611                 }
612         
613                 return ReplicationType.REPLICATION_NONE;
614         }
615
616 }