Updates for more code coverage
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dmaap
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  *
7  * Modifications Copyright (C) 2019 IBM.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  * 
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  * 
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.dmaap.dbcapi.service;
24
25 import java.util.ArrayList;
26 import java.util.Collection;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Set;
31
32 import javax.ws.rs.core.Response.Status;
33
34 import org.onap.dmaap.dbcapi.aaf.AafNamespace;
35 import org.onap.dmaap.dbcapi.aaf.AafRole;
36 import org.onap.dmaap.dbcapi.aaf.AafService;
37 import org.onap.dmaap.dbcapi.aaf.DmaapGrant;
38 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
39 import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
40 import org.onap.dmaap.dbcapi.database.DatabaseClass;
41 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
42 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
43 import org.onap.dmaap.dbcapi.model.ApiError;
44 import org.onap.dmaap.dbcapi.model.DcaeLocation;
45 import org.onap.dmaap.dbcapi.model.Dmaap;
46 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
47 import org.onap.dmaap.dbcapi.model.MR_Client;
48 import org.onap.dmaap.dbcapi.model.MR_Cluster;
49 import org.onap.dmaap.dbcapi.model.MirrorMaker;
50 import org.onap.dmaap.dbcapi.model.ReplicationType;
51 import org.onap.dmaap.dbcapi.model.Topic;
52 import org.onap.dmaap.dbcapi.util.DmaapConfig;
53 import org.onap.dmaap.dbcapi.util.Fqdn;
54 import org.onap.dmaap.dbcapi.util.Graph;
55
56 public class TopicService extends BaseLoggingClass {
57
58         
59
60         // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
61         private static String defaultGlobalMrHost;
62         
63         private Map<String, Topic> mr_topics = DatabaseClass.getTopics();
64         
65         private static DmaapService dmaapSvc = new DmaapService();
66         private MR_ClientService clientService = new MR_ClientService();
67         private MR_ClusterService clusters = new MR_ClusterService();
68         private DcaeLocationService locations = new DcaeLocationService();
69         private MirrorMakerService      bridge = new MirrorMakerService();
70         
71         private static String centralCname;
72         private static boolean createTopicRoles;
73         private boolean strictGraph = true;
74
75
76         public TopicService(){
77                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
78                 defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
79                 centralCname = p.getProperty("MR.CentralCname");
80                 createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
81                 String unit_test = p.getProperty( "UnitTest", "No" );
82                 if ( unit_test.equals( "Yes" ) ) {
83                         strictGraph = false;
84                 }
85                 logger.info( "TopicService properties: CentralCname=" + centralCname + 
86                                 "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
87                                 " createTopicRoles=" + createTopicRoles );
88         }
89         
90         public Map<String, Topic> getTopics() {                 
91                 return mr_topics;
92         }
93                 
94         public List<Topic> getAllTopics() {
95                 return getAllTopics( true );
96         }
97         public List<Topic> getAllTopicsWithoutClients() {
98                 return getAllTopics(false);
99         }
100         
101         private List<Topic> getAllTopics( Boolean withClients ) {
102                 ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
103                 if ( withClients ) {
104                         for( Topic topic: topics ) {
105                                 topic.setClients( clientService.getAllMrClients(topic.getFqtn()));
106                         }
107                 }
108                 return topics;
109         }
110         
111                 
112         public Topic getTopic( String key, ApiError apiError ) {        
113                 logger.info( "getTopic: key=" + key);
114                 Topic t = mr_topics.get( key );
115                 if ( t == null ) {
116                         apiError.setCode(Status.NOT_FOUND.getStatusCode());
117                         apiError.setFields( "fqtn");
118                         apiError.setMessage("topic with fqtn " + key + " not found");
119                         return null;
120                 }
121                 t.setClients( clientService.getAllMrClients( key ));
122                 apiError.setCode(Status.OK.getStatusCode());
123                 return t;
124         }
125         
126         private void aafTopicSetup(Topic topic, ApiError err ) {
127                 
128                 String nsr = dmaapSvc.getDmaap().getTopicNsRoot();
129                 if ( nsr == null ) {
130                         err.setCode(500);
131                         err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)"  );
132                         err.setFields("topicNsRoot");
133                         return;
134                 }
135
136                 // establish AAF Connection using TopicMgr identity
137                 AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
138                 
139                 AafRole pubRole = null;
140                 AafRole subRole = null;
141                 
142                 // creating Topic Roles was not an original feature.
143                 // For backwards compatibility, only do this if the feature is enabled.
144                 // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns)
145                 // then we likely don't have permission to create sub-ns and Roles so don't try.
146                 if ( createTopicRoles && topic.getFqtn().startsWith(nsr)) {
147                         // create AAF namespace for this topic
148                         AafNamespace ns = new AafNamespace( topic.getFqtn(), aaf.getIdentity());
149                         {
150                                 int rc = aaf.addNamespace( ns );
151                                 if ( rc != 201 && rc != 409 ) {
152                                         err.setCode(500);
153                                         err.setMessage("Unexpected response from AAF:" + rc );
154                                         err.setFields("namespace:" + topic.getFqtn() + " identity="+ aaf.getIdentity());
155                                         return;
156                                 }
157                         }
158                         
159                         // create AAF Roles for MR clients of this topic
160                         String rn = "publisher";
161                         pubRole = new AafRole( topic.getFqtn(), rn );
162                         int rc = aaf.addRole( pubRole );
163                         if ( rc != 201 && rc != 409 ) {
164                                 err.setCode(500);
165                                 err.setMessage("Unexpected response from AAF:" + rc );
166                                 err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
167                                 return;
168                         }
169                         topic.setPublisherRole( pubRole.getFullyQualifiedRole() );
170                         
171                         rn = "subscriber";
172                         subRole = new AafRole( topic.getFqtn(), rn );
173                         rc = aaf.addRole( subRole );
174                         if ( rc != 201 && rc != 409 ) {
175                                 err.setCode(500);
176                                 err.setMessage("Unexpected response from AAF:" + rc );
177                                 err.setFields("topic:" + topic.getFqtn() + " role="+ rn);
178                                 return;
179                         }
180                         topic.setSubscriberRole( subRole.getFullyQualifiedRole() );
181                 }
182         
183                 // create AAF perms checked by MR
184                 String instance = ":topic." + topic.getFqtn();
185                 String[] actions = { "pub", "sub", "view" };
186                 String t = dmaapSvc.getTopicPerm();
187                 for ( String action : actions ){
188                         DmaapPerm perm = new DmaapPerm( t, instance, action );
189                         int rc = aaf.addPerm( perm );
190                         if ( rc != 201 && rc != 409 ) {
191                                 err.setCode(500);
192                                 err.setMessage("Unexpected response from AAF:" + rc );
193                                 err.setFields("t="+t + " instance="+ instance + " action="+ action);
194                                 return;
195                         }
196                         if ( createTopicRoles ) {
197                                 // Grant perms to our default Roles
198                                 if ( action.equals( "pub") || action.equals( "view") ) {
199                                         DmaapGrant g = new DmaapGrant( perm, pubRole.getFullyQualifiedRole() );
200                                         rc = aaf.addGrant( g );
201                                         if ( rc != 201 && rc != 409 ) {
202                                                 err.setCode(rc);
203                                                 err.setMessage( "Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole() );
204                                                 logger.warn( err.getMessage());
205                                                 return;
206                                         } 
207                                 }
208                                 if ( action.equals( "sub") || action.equals( "view") ) {
209                                         DmaapGrant g = new DmaapGrant( perm, subRole.getFullyQualifiedRole() );
210                                         rc = aaf.addGrant( g );
211                                         if ( rc != 201 && rc != 409 ) {
212                                                 err.setCode(rc);
213                                                 err.setMessage( "Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole() );
214                                                 logger.warn( err.getMessage());
215                                                 return;
216                                         } 
217                                 }
218                         }
219
220                 }
221         }
222
223         public Topic addTopic( Topic topic, ApiError err, Boolean useExisting ) {
224                 logger.info( "Entry: addTopic");
225                 logger.info( "Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle() );
226                 String nFqtn =  topic.genFqtn();
227                 logger.info( "FQTN=" + nFqtn );
228                 Topic pTopic = getTopic( nFqtn, err );
229                 if ( pTopic != null ) {
230                         String t = "topic already exists: " + nFqtn;
231                         logger.info( t );
232                         if (  useExisting ) {
233                                 err.setCode(Status.OK.getStatusCode());
234                                 return pTopic;
235                         }
236                         err.setMessage( t );
237                         err.setFields( "fqtn");
238                         err.setCode(Status.CONFLICT.getStatusCode());
239                         return null;
240                 }
241                 err.reset();  // err filled with NOT_FOUND is expected case, but don't want to litter...
242
243                 topic.setFqtn( nFqtn );
244                 
245                 aafTopicSetup( topic, err );
246                 if ( err.getCode() >= 400 ) {
247                         return null;
248                 }       
249
250                 if ( topic.getReplicationCase().involvesGlobal() ) {
251                         if ( topic.getGlobalMrURL() == null ) {
252                                 topic.setGlobalMrURL(defaultGlobalMrHost);
253                         }
254                         if ( ! Fqdn.isValid( topic.getGlobalMrURL())) {
255                                 logger.error( "GlobalMR FQDN not valid: " + topic.getGlobalMrURL());
256                                 topic.setStatus( DmaapObject_Status.INVALID);
257                                 err.setCode(500);
258                                 err.setMessage("Value is not a valid FQDN:" +  topic.getGlobalMrURL() );
259                                 err.setFields("globalMrURL");
260         
261                                 return null;
262                         }
263                 }
264
265
266                 if ( topic.getNumClients() > 0 ) {
267                         ArrayList<MR_Client> clients = new ArrayList<MR_Client>(topic.getClients());
268                 
269         
270                         ArrayList<MR_Client> clients2 = new ArrayList<MR_Client>();
271                         for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
272                                 MR_Client c = it.next();
273
274                                 logger.info( "c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL());
275                                 MR_Client nc = new MR_Client( c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction());
276                                 nc.setFqtn(topic.getFqtn());
277                                 nc.setClientIdentity( c.getClientIdentity());
278                                 logger.info( "nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL());
279                                 clients2.add( clientService.addMr_Client(nc, topic, err));
280                                 if ( ! err.is2xx()) {
281                                         return null;
282                                 }
283                         }
284
285                         topic.setClients(clients2);
286                 }
287
288                 Topic ntopic = checkForBridge( topic, err );
289                 if ( ntopic == null ) {
290                         topic.setStatus( DmaapObject_Status.INVALID);
291                         if ( ! err.is2xx()) {
292                                 return null;
293                         }
294                 }
295
296                 
297                 mr_topics.put( nFqtn, ntopic );
298
299                 err.setCode(Status.OK.getStatusCode());
300                 return ntopic;
301         }
302         
303                 
304         public Topic updateTopic( Topic topic, ApiError err ) {
305                 logger.info( "updateTopic: entry");
306                 logger.info( "updateTopic: topic=" + topic);
307                 logger.info( "updateTopic: fqtn=" + topic.getFqtn() );
308                 if ( topic.getFqtn().isEmpty()) {
309                         return null;
310                 }
311                 logger.info( "updateTopic: call checkForBridge");
312                 Topic ntopic = checkForBridge( topic, err );
313                 if ( ntopic == null ) {
314                         topic.setStatus( DmaapObject_Status.INVALID);
315                         if ( ! err.is2xx() ) {
316                                 return null;
317                         }
318                 }
319                 if(ntopic != null) {
320                         logger.info( "updateTopic: call put");
321                         mr_topics.put( ntopic.getFqtn(), ntopic );
322                 }
323                 err.setCode(Status.OK.getStatusCode());
324                 return ntopic;
325         }
326                 
327         public Topic removeTopic( String pubId, ApiError apiError ) {
328                 Topic topic = mr_topics.get(pubId);
329                 if ( topic == null ) {
330                         apiError.setCode(Status.NOT_FOUND.getStatusCode());
331                         apiError.setMessage("Topic " + pubId + " does not exist");
332                         apiError.setFields("fqtn");
333                         return null;
334                 }
335                 ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients( pubId ));
336                 for ( Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
337                         MR_Client c = it.next();
338                         
339         
340                         clientService.removeMr_Client(c.getMrClientId(), false, apiError);
341                         if ( ! apiError.is2xx()) {
342                                 return null;
343                         }
344                 }
345                 apiError.setCode(Status.OK.getStatusCode());
346                 return mr_topics.remove(pubId);
347         }       
348         public static ApiError setBridgeClientPerms( MR_Cluster node ) {
349                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
350                 String mmProvRole = p.getProperty("MM.ProvRole");
351                 String mmAgentRole = p.getProperty("MM.AgentRole");
352                 String[] Roles = { mmProvRole, mmAgentRole };
353                 String[] actions = { "view", "pub", "sub" };
354                 Topic bridgeAdminTopic = new Topic().init();
355                 bridgeAdminTopic.setTopicName( dmaapSvc.getBridgeAdminFqtn() );
356                 bridgeAdminTopic.setTopicDescription( "RESERVED topic for MirroMaker Provisioning");
357                 bridgeAdminTopic.setOwner( "DBC" );
358                 
359                 ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
360                 for( String role: Roles ) {
361                         MR_Client client = new MR_Client();
362                         client.setAction(actions);
363                         client.setClientRole(role);
364                         client.setDcaeLocationName( node.getDcaeLocationName());
365                         clients.add( client );
366                 }
367                 bridgeAdminTopic.setClients(clients);
368                 
369                 TopicService ts = new TopicService();
370                 ApiError err = new ApiError();
371                 ts.addTopic(bridgeAdminTopic, err, true);
372                 
373                 if ( err.is2xx() || err.getCode() == 409 ){
374                         err.setCode(200);
375                         return err;
376                 }
377                 
378                 errorLogger.error( DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR,  bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage());
379                 return err;
380         }       
381         
382         
383         public Topic checkForBridge( Topic topic, ApiError err ) {
384                 logger.info( "checkForBridge: entry");
385                 logger.info( "fqtn=" + topic.getFqtn() + "replicatonType=" + topic.getReplicationCase());
386                 if ( topic.getReplicationCase() == ReplicationType.REPLICATION_NONE ) {
387                         topic.setStatus( DmaapObject_Status.VALID);
388                         return topic;   
389                 }
390                 
391                 boolean anythingWrong = false;
392                 
393                 Set<String> groups = clusters.getGroups();
394                 for ( String g : groups ) {
395                         logger.info( "buildBridge for " + topic.getFqtn() + " on group" + g);
396                         anythingWrong |= buildBridge( topic, err, g );
397                 }
398                 if ( anythingWrong ) {
399                         topic.setStatus( DmaapObject_Status.INVALID);
400                         if ( ! err.is2xx() ) {
401                                 return null;
402                         }       
403                 } else {
404                         topic.setStatus( DmaapObject_Status.VALID);
405                 }
406                 return topic;
407         }
408                 
409         private boolean buildBridge( Topic topic, ApiError err, String group ) {
410                 logger.info( "buildBridge: entry");
411                 boolean anythingWrong = false;
412                 Graph graph;
413                 logger.info( "buildBridge: strictGraph=" + strictGraph );
414                 if ( group == null || group.isEmpty() ) {
415                         graph = new Graph( topic.getClients(), strictGraph );
416                 } else {
417                         graph = new Graph( topic.getClients(), strictGraph, group );
418                 }
419                 logger.info( "buildBridge: graph=" + graph );
420                 MR_Cluster groupCentralCluster = null;
421                 
422                 
423                 if ( graph.isEmpty() ) {
424                         logger.info( "buildBridge: graph is empty.  return false" );
425                         return false;
426                 } else if ( group == null &&  topic.getReplicationCase().involvesFQDN() ) {
427                         logger.info( "buildBridge: group is null and replicationCaseInvolvesFQDN. return false" );
428                         return false;
429                 } else if ( ! graph.hasCentral() ) {
430                         logger.warn( "Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
431                         return true;
432                 } else {
433                         groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
434                 }
435                 Collection<String> clientLocations = graph.getKeys();
436                 for( String loc : clientLocations ) {
437                         logger.info( "loc=" + loc );
438                         DcaeLocation location = locations.getDcaeLocation(loc);
439                         MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
440                         logger.info( "cluster=" + cluster );
441
442                         
443                                 
444                         String source = null;
445                         String target = null;
446                         
447                         /*
448                          * Provision Edge to Central bridges...
449                          */
450                         if ( ! location.isCentral()  && ! graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
451                                 switch( topic.getReplicationCase() ) {
452                                 case REPLICATION_EDGE_TO_CENTRAL:
453                                 case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
454                                         source = cluster.getFqdn();
455                                         target = centralCname;
456                                         break;
457                                 case REPLICATION_CENTRAL_TO_EDGE:
458                                 case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
459                                         source = centralCname;
460                                         target = cluster.getFqdn();
461                                         break;
462                                 case REPLICATION_CENTRAL_TO_GLOBAL:
463                                 case REPLICATION_GLOBAL_TO_CENTRAL:
464                                 case REPLICATION_FQDN_TO_GLOBAL:
465                                 case REPLICATION_GLOBAL_TO_FQDN:
466                                         break;
467
468                                 case REPLICATION_EDGE_TO_FQDN:
469                                 case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2C portion only
470                                         source = cluster.getFqdn();
471                                         target = groupCentralCluster.getFqdn();
472                                         break;
473                                 case REPLICATION_FQDN_TO_EDGE:
474                                 case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for F2E portion only
475                                         source = groupCentralCluster.getFqdn();
476                                         target = cluster.getFqdn();
477                                         break;
478
479                                 default:
480                                         logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
481                                         anythingWrong = true;
482                                         err.setCode(400);
483                                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
484                                                         + topic.getReplicationCase() );
485                                         err.setMessage("Unexpected value for ReplicationType");
486                                         continue;
487                                 }
488
489                         } else if ( location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName()) ) {
490                                 /*
491                                  * Provision Central to Global bridges
492                                  */
493                                 switch( topic.getReplicationCase() ) {
494
495                                 case REPLICATION_CENTRAL_TO_GLOBAL:
496                                 case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
497                                         source = centralCname;
498                                         target = topic.getGlobalMrURL();
499                                         break;
500                                 case REPLICATION_GLOBAL_TO_CENTRAL:
501                                 case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
502                                         source = topic.getGlobalMrURL();
503                                         target = centralCname;
504                                         break;
505
506                                 case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2F portion only
507                                         source = groupCentralCluster.getFqdn();
508                                         target = topic.getGlobalMrURL();
509                                         break;
510
511                                 case REPLICATION_FQDN_TO_GLOBAL:
512                                         source = groupCentralCluster.getFqdn();
513                                         target = topic.getGlobalMrURL();
514                                         break;
515                                         
516                                 case REPLICATION_GLOBAL_TO_FQDN:
517                                 case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for G2F portion only
518                                         source = topic.getGlobalMrURL();
519                                         target = groupCentralCluster.getFqdn();
520                                         break;
521
522                                 case REPLICATION_FQDN_TO_EDGE:
523                                 case REPLICATION_EDGE_TO_FQDN:
524                                 case REPLICATION_EDGE_TO_CENTRAL:
525                                 case REPLICATION_CENTRAL_TO_EDGE:
526                                         break;
527                                 default:
528                                         logger.error( "Unexpected value for ReplicationType ("+ topic.getReplicationCase() + ") for topic " + topic.getFqtn() );
529                                         anythingWrong = true;
530                                         err.setCode(400);
531                                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
532                                                         + topic.getReplicationCase() );
533                                         err.setMessage("Unexpected value for ReplicationType");
534                                         continue;
535                                 }                               
536                         } else {
537                                 logger.warn( "dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
538                                 anythingWrong = true;
539                                 continue;
540                         }
541                         if ( source != null && target != null ) {
542                                 try { 
543                                         logger.info( "Create a MM from " + source + " to " + target );
544                                         MirrorMaker mm = bridge.getNextMM( source, target, topic.getFqtn());
545                                         mm.addTopic(topic.getFqtn());
546                                         bridge.updateMirrorMaker(mm);
547                                 } catch ( Exception ex ) {
548                                         err.setCode(500);
549                                         err.setFields( "mirror_maker.topic");
550                                         err.setMessage("Unexpected condition: " + ex );
551                                         anythingWrong = true;
552                                         break;
553                                 }
554                         }                       
555
556                         
557                 }
558                 return  anythingWrong;
559
560         }
561         
562         
563         /*
564          * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
565          * This was determined automatically based on presence of edge publishers and central subscribers.
566          * The following method is a modification of that original logic, to preserve some backwards compatibility, 
567          * i.e. to be used when no ReplicationType is specified.
568          */
569         public ReplicationType reviewTopic( Topic topic ) {
570         
571                 
572                 if ( topic.getNumClients() > 1 ) {
573                         Graph graph = new Graph( topic.getClients(), false );
574                         
575                         String centralFqdn = new String();
576                         if ( graph.hasCentral() ) {
577                                 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
578                                 centralFqdn = p.getProperty("MR.CentralCname");
579                         }
580
581                         Collection<String> locations = graph.getKeys();
582                         for( String loc : locations ) {
583                                 logger.info( "loc=" + loc );
584                                 MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
585                                 if ( cluster == null ) {
586                                         logger.info( "No MR cluster for location " + loc );
587                                         continue;
588                                 }
589                                 if ( graph.hasCentral() &&  ! graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
590                                         logger.info( "Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn );
591                                         return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
592                                         
593                                 }
594                                 
595                         }
596                 }
597         
598                 return ReplicationType.REPLICATION_NONE;
599         }
600
601 }