83591ddafe88aeb0e8a311233891276a89ef436c
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dmaap
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  *
7  * Modifications Copyright (C) 2019 IBM.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.dmaap.dbcapi.service;
24
25 import org.onap.dmaap.dbcapi.aaf.AafNamespace;
26 import org.onap.dmaap.dbcapi.aaf.AafRole;
27 import org.onap.dmaap.dbcapi.aaf.AafService;
28 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
29 import org.onap.dmaap.dbcapi.aaf.DmaapGrant;
30 import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
31 import org.onap.dmaap.dbcapi.database.DatabaseClass;
32 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
33 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
34 import org.onap.dmaap.dbcapi.model.ApiError;
35 import org.onap.dmaap.dbcapi.model.DcaeLocation;
36 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
37 import org.onap.dmaap.dbcapi.model.MR_Client;
38 import org.onap.dmaap.dbcapi.model.MR_Cluster;
39 import org.onap.dmaap.dbcapi.model.MirrorMaker;
40 import org.onap.dmaap.dbcapi.model.ReplicationType;
41 import org.onap.dmaap.dbcapi.model.Topic;
42 import org.onap.dmaap.dbcapi.util.DmaapConfig;
43 import org.onap.dmaap.dbcapi.util.Fqdn;
44 import org.onap.dmaap.dbcapi.util.Graph;
45
46 import javax.ws.rs.core.Response.Status;
47 import java.util.ArrayList;
48 import java.util.Collection;
49 import java.util.Iterator;
50 import java.util.List;
51 import java.util.Map;
52 import java.util.Set;
53
54 public class TopicService extends BaseLoggingClass {
55
56
57     // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
58     private static String defaultGlobalMrHost;
59
60     private Map<String, Topic> mr_topics;
61
62     private static DmaapService dmaapSvc = new DmaapService();
63     private MR_ClientService clientService;
64     private MR_ClusterService clusters;
65     private DcaeLocationService locations;
66     private MirrorMakerService bridge;
67
68     private static String centralCname;
69     private static boolean createTopicRoles;
70     private boolean strictGraph = true;
71     private boolean mmPerMR;
72
73
74     public TopicService() {
75         this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig) DmaapConfig.getConfig(),
76                 new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService());
77
78     }
79
80     TopicService(Map<String, Topic> mr_topics, MR_ClientService clientService, DmaapConfig p,
81                  MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) {
82         this.mr_topics = mr_topics;
83         this.clientService = clientService;
84         defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
85         centralCname = p.getProperty("MR.CentralCname");
86         createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
87         String unit_test = p.getProperty("UnitTest", "No");
88         if ("Yes".equals(unit_test)) {
89             strictGraph = false;
90         }
91         mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
92         logger.info("TopicService properties: CentralCname=" + centralCname +
93                 "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
94                 " createTopicRoles=" + createTopicRoles +
95                 " mmPerMR=" + mmPerMR);
96         this.clusters = clusters;
97         this.locations = locations;
98         this.bridge = bridge;
99     }
100
101     public Map<String, Topic> getTopics() {
102         return mr_topics;
103     }
104
105     public List<Topic> getAllTopics() {
106         return getAllTopics(true);
107     }
108
109     public List<Topic> getAllTopicsWithoutClients() {
110         return getAllTopics(false);
111     }
112
113     private List<Topic> getAllTopics(Boolean withClients) {
114         ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
115         if (withClients) {
116             for (Topic topic : topics) {
117                 topic.setClients(clientService.getAllMrClients(topic.getFqtn()));
118             }
119         }
120         return topics;
121     }
122
123
124     public Topic getTopic(String key, ApiError apiError) {
125         logger.info("getTopic: key=" + key);
126         Topic t = mr_topics.get(key);
127         if (t == null) {
128             apiError.setCode(Status.NOT_FOUND.getStatusCode());
129             apiError.setFields("fqtn");
130             apiError.setMessage("topic with fqtn " + key + " not found");
131             return null;
132         }
133         t.setClients(clientService.getAllMrClients(key));
134         apiError.setCode(Status.OK.getStatusCode());
135         return t;
136     }
137
138     private void aafTopicSetup(Topic topic, ApiError err) {
139
140         String nsr = dmaapSvc.getDmaap().getTopicNsRoot();
141         if (nsr == null) {
142             err.setCode(500);
143             err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)");
144             err.setFields("topicNsRoot");
145             return;
146         }
147
148         // establish AAF Connection using TopicMgr identity
149         AafService aaf = new AafService(ServiceType.AAF_TopicMgr);
150
151         AafRole pubRole = null;
152         AafRole subRole = null;
153
154         // creating Topic Roles was not an original feature.
155         // For backwards compatibility, only do this if the feature is enabled.
156         // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns)
157         // then we likely don't have permission to create sub-ns and Roles so don't try.
158         if (createTopicRoles && topic.getFqtn().startsWith(nsr)) {
159             // create AAF namespace for this topic
160             AafNamespace ns = new AafNamespace(topic.getFqtn(), aaf.getIdentity());
161             {
162                 int rc = aaf.addNamespace(ns);
163                 if (rc != 201 && rc != 409) {
164                     err.setCode(500);
165                     err.setMessage("Unexpected response from AAF:" + rc);
166                     err.setFields("namespace:" + topic.getFqtn() + " identity=" + aaf.getIdentity());
167                     return;
168                 }
169             }
170
171             // create AAF Roles for MR clients of this topic
172             String rn = "publisher";
173             pubRole = new AafRole(topic.getFqtn(), rn);
174             int rc = aaf.addRole(pubRole);
175             if (rc != 201 && rc != 409) {
176                 err.setCode(500);
177                 err.setMessage("Unexpected response from AAF:" + rc);
178                 err.setFields("topic:" + topic.getFqtn() + " role=" + rn);
179                 return;
180             }
181             topic.setPublisherRole(pubRole.getFullyQualifiedRole());
182
183             rn = "subscriber";
184             subRole = new AafRole(topic.getFqtn(), rn);
185             rc = aaf.addRole(subRole);
186             if (rc != 201 && rc != 409) {
187                 err.setCode(500);
188                 err.setMessage("Unexpected response from AAF:" + rc);
189                 err.setFields("topic:" + topic.getFqtn() + " role=" + rn);
190                 return;
191             }
192             topic.setSubscriberRole(subRole.getFullyQualifiedRole());
193         }
194
195         // create AAF perms checked by MR
196         String instance = ":topic." + topic.getFqtn();
197         String[] actions = {"pub", "sub", "view"};
198         String t = dmaapSvc.getTopicPerm();
199         for (String action : actions) {
200             DmaapPerm perm = new DmaapPerm(t, instance, action);
201             int rc = aaf.addPerm(perm);
202             if (rc != 201 && rc != 409) {
203                 err.setCode(500);
204                 err.setMessage("Unexpected response from AAF:" + rc);
205                 err.setFields("t=" + t + " instance=" + instance + " action=" + action);
206                 return;
207             }
208             if (createTopicRoles) {
209                 // Grant perms to our default Roles
210                 if (action.equals("pub") || action.equals("view")) {
211                     DmaapGrant g = new DmaapGrant(perm, pubRole.getFullyQualifiedRole());
212                     rc = aaf.addGrant(g);
213                     if (rc != 201 && rc != 409) {
214                         err.setCode(rc);
215                         err.setMessage("Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole());
216                         logger.warn(err.getMessage());
217                         return;
218                     }
219                 }
220                 if (action.equals("sub") || action.equals("view")) {
221                     DmaapGrant g = new DmaapGrant(perm, subRole.getFullyQualifiedRole());
222                     rc = aaf.addGrant(g);
223                     if (rc != 201 && rc != 409) {
224                         err.setCode(rc);
225                         err.setMessage("Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole());
226                         logger.warn(err.getMessage());
227                         return;
228                     }
229                 }
230             }
231
232         }
233     }
234
235     public Topic addTopic(Topic topic, ApiError err, Boolean useExisting) {
236         logger.info("Entry: addTopic");
237         logger.info("Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle());
238         String nFqtn = topic.genFqtn();
239         logger.info("FQTN=" + nFqtn);
240         Topic pTopic = getTopic(nFqtn, err);
241         if (pTopic != null) {
242             String t = "topic already exists: " + nFqtn;
243             logger.info(t);
244             if (useExisting) {
245                 err.setCode(Status.OK.getStatusCode());
246                 return pTopic;
247             }
248             err.setMessage(t);
249             err.setFields("fqtn");
250             err.setCode(Status.CONFLICT.getStatusCode());
251             return null;
252         }
253         err.reset();  // err filled with NOT_FOUND is expected case, but don't want to litter...
254
255         topic.setFqtn(nFqtn);
256
257         aafTopicSetup(topic, err);
258         if (err.getCode() >= 400) {
259             return null;
260         }
261
262         if (topic.getReplicationCase().involvesGlobal()) {
263             if (topic.getGlobalMrURL() == null) {
264                 topic.setGlobalMrURL(defaultGlobalMrHost);
265             }
266             if (!Fqdn.isValid(topic.getGlobalMrURL())) {
267                 logger.error("GlobalMR FQDN not valid: " + topic.getGlobalMrURL());
268                 topic.setStatus(DmaapObject_Status.INVALID);
269                 err.setCode(500);
270                 err.setMessage("Value is not a valid FQDN:" + topic.getGlobalMrURL());
271                 err.setFields("globalMrURL");
272
273                 return null;
274             }
275         }
276
277
278         if (topic.getNumClients() > 0) {
279             ArrayList<MR_Client> clients = new ArrayList<MR_Client>(topic.getClients());
280
281
282             ArrayList<MR_Client> clients2 = new ArrayList<MR_Client>();
283             for (Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
284                 MR_Client c = it.next();
285
286                 logger.info("c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL());
287                 MR_Client nc = new MR_Client(c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction());
288                 nc.setFqtn(topic.getFqtn());
289                 nc.setClientIdentity(c.getClientIdentity());
290                 logger.info("nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL());
291                 clients2.add(clientService.addMr_Client(nc, topic, err));
292                 if (!err.is2xx()) {
293                     return null;
294                 }
295             }
296
297             topic.setClients(clients2);
298         }
299
300         Topic ntopic = checkForBridge(topic, err);
301         if (ntopic == null) {
302             topic.setStatus(DmaapObject_Status.INVALID);
303             if (!err.is2xx()) {
304                 return null;
305             }
306         }
307
308
309         mr_topics.put(nFqtn, ntopic);
310
311         err.setCode(Status.OK.getStatusCode());
312         return ntopic;
313     }
314
315
316     public Topic updateTopic(Topic topic, ApiError err) {
317         logger.info("updateTopic: entry");
318         logger.info("updateTopic: topic=" + topic);
319         logger.info("updateTopic: fqtn=" + topic.getFqtn());
320         if (topic.getFqtn().isEmpty()) {
321             return null;
322         }
323         logger.info("updateTopic: call checkForBridge");
324         Topic ntopic = checkForBridge(topic, err);
325         if (ntopic == null) {
326             topic.setStatus(DmaapObject_Status.INVALID);
327             if (!err.is2xx()) {
328                 return null;
329             }
330         }
331         if (ntopic != null) {
332             logger.info("updateTopic: call put");
333             mr_topics.put(ntopic.getFqtn(), ntopic);
334         }
335         err.setCode(Status.OK.getStatusCode());
336         return ntopic;
337     }
338
339     public Topic removeTopic(String pubId, ApiError apiError) {
340         Topic topic = mr_topics.get(pubId);
341         if (topic == null) {
342             apiError.setCode(Status.NOT_FOUND.getStatusCode());
343             apiError.setMessage("Topic " + pubId + " does not exist");
344             apiError.setFields("fqtn");
345             return null;
346         }
347         ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients(pubId));
348         for (Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
349             MR_Client c = it.next();
350
351
352             clientService.removeMr_Client(c.getMrClientId(), false, apiError);
353             if (!apiError.is2xx()) {
354                 return null;
355             }
356         }
357         apiError.setCode(Status.OK.getStatusCode());
358         return mr_topics.remove(pubId);
359     }
360
361     public static ApiError setBridgeClientPerms(MR_Cluster node) {
362         DmaapConfig p = (DmaapConfig) DmaapConfig.getConfig();
363         String mmProvRole = p.getProperty("MM.ProvRole");
364         String mmAgentRole = p.getProperty("MM.AgentRole");
365         String[] Roles = {mmProvRole, mmAgentRole};
366         String[] actions = {"view", "pub", "sub"};
367         Topic bridgeAdminTopic = new Topic().init();
368         bridgeAdminTopic.setTopicName(dmaapSvc.getBridgeAdminFqtn());
369         bridgeAdminTopic.setTopicDescription("RESERVED topic for MirroMaker Provisioning");
370         bridgeAdminTopic.setOwner("DBC");
371
372         ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
373         for (String role : Roles) {
374             MR_Client client = new MR_Client();
375             client.setAction(actions);
376             client.setClientRole(role);
377             client.setDcaeLocationName(node.getDcaeLocationName());
378             clients.add(client);
379         }
380         bridgeAdminTopic.setClients(clients);
381
382         TopicService ts = new TopicService();
383         ApiError err = new ApiError();
384         ts.addTopic(bridgeAdminTopic, err, true);
385
386         if (err.is2xx() || err.getCode() == 409) {
387             err.setCode(200);
388             return err;
389         }
390
391         errorLogger.error(DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR, bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage());
392         return err;
393     }
394
395
396     public Topic checkForBridge(Topic topic, ApiError err) {
397         logger.info("checkForBridge: entry");
398         logger.info("fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase());
399         if (topic.getReplicationCase() == ReplicationType.REPLICATION_NONE) {
400             topic.setStatus(DmaapObject_Status.VALID);
401             return topic;
402         }
403
404         boolean anythingWrong = false;
405
406         Set<String> groups = clusters.getGroups();
407         for (String g : groups) {
408             logger.info("buildBridge for " + topic.getFqtn() + " on group" + g);
409             anythingWrong |= buildBridge(topic, err, g);
410         }
411         if (anythingWrong) {
412             topic.setStatus(DmaapObject_Status.INVALID);
413             if (!err.is2xx()) {
414                 return null;
415             }
416         } else {
417             topic.setStatus(DmaapObject_Status.VALID);
418         }
419         return topic;
420     }
421
422     private boolean buildBridge(Topic topic, ApiError err, String group) {
423         logger.info("buildBridge: entry");
424         boolean anythingWrong = false;
425         Graph graph;
426         logger.info("buildBridge: strictGraph=" + strictGraph);
427         if (group == null || group.isEmpty()) {
428             graph = new Graph(topic.getClients(), strictGraph);
429         } else {
430             graph = new Graph(topic.getClients(), strictGraph, group);
431         }
432         logger.info("buildBridge: graph=" + graph);
433         MR_Cluster groupCentralCluster = null;
434
435
436         if (graph.isEmpty()) {
437             logger.info("buildBridge: graph is empty.  return false");
438             return false;
439         } else if (group == null && topic.getReplicationCase().involvesFQDN()) {
440             logger.info("buildBridge: group is null and replicationCaseInvolvesFQDN. return false");
441             return false;
442         } else if (!graph.hasCentral()) {
443             logger.warn("Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
444             return true;
445         } else {
446             groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
447         }
448         Collection<String> clientLocations = graph.getKeys();
449         for (String loc : clientLocations) {
450             logger.info("loc=" + loc);
451             DcaeLocation location = locations.getDcaeLocation(loc);
452             MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
453             logger.info("cluster=" + cluster + " at " + cluster.getDcaeLocationName());
454             logger.info("location.isCentral()=" + location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc());
455
456
457             String source = null;
458             String target = null;
459
460             /*
461              * Provision Edge to Central bridges...
462              */
463             if (!location.isCentral() && !graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
464                 switch (topic.getReplicationCase()) {
465                     case REPLICATION_EDGE_TO_CENTRAL:
466                     case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
467                         source = cluster.getFqdn();
468                         target = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
469                         logger.info("REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" + target);
470                         break;
471                     case REPLICATION_CENTRAL_TO_EDGE:
472                     case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
473                         source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
474                         target = cluster.getFqdn();
475                         break;
476                     case REPLICATION_CENTRAL_TO_GLOBAL:
477                     case REPLICATION_GLOBAL_TO_CENTRAL:
478                     case REPLICATION_FQDN_TO_GLOBAL:
479                     case REPLICATION_GLOBAL_TO_FQDN:
480                         break;
481
482                     case REPLICATION_EDGE_TO_FQDN:
483                     case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2C portion only
484                         source = cluster.getFqdn();
485                         target = groupCentralCluster.getFqdn();
486                         break;
487                     case REPLICATION_FQDN_TO_EDGE:
488                     case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for F2E portion only
489                         source = groupCentralCluster.getFqdn();
490                         target = cluster.getFqdn();
491                         break;
492
493                     default:
494                         logger.error("Unexpected value for ReplicationType (" + topic.getReplicationCase() + ") for topic " + topic.getFqtn());
495                         anythingWrong = true;
496                         err.setCode(400);
497                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
498                                 + topic.getReplicationCase());
499                         err.setMessage("Unexpected value for ReplicationType");
500                         continue;
501                 }
502
503             } else if (location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
504                 /*
505                  * Provision Central to Global bridges
506                  */
507                 switch (topic.getReplicationCase()) {
508
509                     case REPLICATION_CENTRAL_TO_GLOBAL:
510                     case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
511                         source = centralCname;
512                         target = topic.getGlobalMrURL();
513                         break;
514                     case REPLICATION_GLOBAL_TO_CENTRAL:
515                     case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
516                         source = topic.getGlobalMrURL();
517                         target = centralCname;
518                         break;
519
520                     case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2F portion only
521                         source = groupCentralCluster.getFqdn();
522                         target = topic.getGlobalMrURL();
523                         break;
524
525                     case REPLICATION_FQDN_TO_GLOBAL:
526                         source = groupCentralCluster.getFqdn();
527                         target = topic.getGlobalMrURL();
528                         break;
529
530                     case REPLICATION_GLOBAL_TO_FQDN:
531                     case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for G2F portion only
532                         source = topic.getGlobalMrURL();
533                         target = groupCentralCluster.getFqdn();
534                         break;
535
536                     case REPLICATION_FQDN_TO_EDGE:
537                     case REPLICATION_EDGE_TO_FQDN:
538                     case REPLICATION_EDGE_TO_CENTRAL:
539                     case REPLICATION_CENTRAL_TO_EDGE:
540                         break;
541                     default:
542                         logger.error("Unexpected value for ReplicationType (" + topic.getReplicationCase() + ") for topic " + topic.getFqtn());
543                         anythingWrong = true;
544                         err.setCode(400);
545                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
546                                 + topic.getReplicationCase());
547                         err.setMessage("Unexpected value for ReplicationType");
548                         continue;
549                 }
550             } else {
551                 logger.warn("dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
552                 anythingWrong = true;
553                 continue;
554             }
555             if (source != null && target != null) {
556                 try {
557                     logger.info("Create a MM from " + source + " to " + target);
558                     MirrorMaker mm = bridge.findNextMM(source, target, topic.getFqtn());
559                     mm.addTopic(topic.getFqtn());
560                     bridge.updateMirrorMaker(mm);
561                 } catch (Exception ex) {
562                     err.setCode(500);
563                     err.setFields("mirror_maker.topic");
564                     err.setMessage("Unexpected condition: " + ex);
565                     anythingWrong = true;
566                     break;
567                 }
568             }
569
570
571         }
572         return anythingWrong;
573
574     }
575
576
577     /*
578      * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
579      * This was determined automatically based on presence of edge publishers and central subscribers.
580      * The following method is a modification of that original logic, to preserve some backwards compatibility,
581      * i.e. to be used when no ReplicationType is specified.
582      */
583     public ReplicationType reviewTopic(Topic topic) {
584
585
586         if (topic.getNumClients() > 1) {
587             Graph graph = new Graph(topic.getClients(), false);
588
589             String centralFqdn = new String();
590             if (graph.hasCentral()) {
591                 DmaapConfig p = (DmaapConfig) DmaapConfig.getConfig();
592                 centralFqdn = p.getProperty("MR.CentralCname");
593             }
594
595             Collection<String> locations = graph.getKeys();
596             for (String loc : locations) {
597                 logger.info("loc=" + loc);
598                 MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
599                 if (cluster == null) {
600                     logger.info("No MR cluster for location " + loc);
601                     continue;
602                 }
603                 if (graph.hasCentral() && !graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
604                     logger.info("Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn);
605                     return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
606
607                 }
608
609             }
610         }
611
612         return ReplicationType.REPLICATION_NONE;
613     }
614
615 }