AafService - interface was introduced
[dmaap/dbcapi.git] / src / main / java / org / onap / dmaap / dbcapi / service / TopicService.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dmaap
4  * ================================================================================
5  * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6  *
7  * Modifications Copyright (C) 2019 IBM.
8  * ================================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=========================================================
21  */
22
23 package org.onap.dmaap.dbcapi.service;
24
25 import org.onap.dmaap.dbcapi.aaf.AafNamespace;
26 import org.onap.dmaap.dbcapi.aaf.AafRole;
27 import org.onap.dmaap.dbcapi.aaf.AafService;
28 import org.onap.dmaap.dbcapi.aaf.AafService.ServiceType;
29 import org.onap.dmaap.dbcapi.aaf.AafServiceImpl;
30 import org.onap.dmaap.dbcapi.aaf.DmaapGrant;
31 import org.onap.dmaap.dbcapi.aaf.DmaapPerm;
32 import org.onap.dmaap.dbcapi.database.DatabaseClass;
33 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
34 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
35 import org.onap.dmaap.dbcapi.model.ApiError;
36 import org.onap.dmaap.dbcapi.model.DcaeLocation;
37 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
38 import org.onap.dmaap.dbcapi.model.MR_Client;
39 import org.onap.dmaap.dbcapi.model.MR_Cluster;
40 import org.onap.dmaap.dbcapi.model.MirrorMaker;
41 import org.onap.dmaap.dbcapi.model.ReplicationType;
42 import org.onap.dmaap.dbcapi.model.Topic;
43 import org.onap.dmaap.dbcapi.util.DmaapConfig;
44 import org.onap.dmaap.dbcapi.util.Fqdn;
45 import org.onap.dmaap.dbcapi.util.Graph;
46
47 import javax.ws.rs.core.Response.Status;
48 import java.util.ArrayList;
49 import java.util.Collection;
50 import java.util.Iterator;
51 import java.util.List;
52 import java.util.Map;
53 import java.util.Set;
54
55 public class TopicService extends BaseLoggingClass {
56
57
58     // REF: https://wiki.web.att.com/pages/viewpage.action?pageId=519703122
59     private static String defaultGlobalMrHost;
60
61     private Map<String, Topic> mr_topics;
62
63     private static DmaapService dmaapSvc = new DmaapService();
64     private MR_ClientService clientService;
65     private MR_ClusterService clusters;
66     private DcaeLocationService locations;
67     private MirrorMakerService bridge;
68
69     private static String centralCname;
70     private static boolean createTopicRoles;
71     private boolean strictGraph = true;
72     private boolean mmPerMR;
73
74
75     public TopicService() {
76         this(DatabaseClass.getTopics(), new MR_ClientService(), (DmaapConfig) DmaapConfig.getConfig(),
77                 new MR_ClusterService(), new DcaeLocationService(), new MirrorMakerService());
78
79     }
80
81     TopicService(Map<String, Topic> mr_topics, MR_ClientService clientService, DmaapConfig p,
82                  MR_ClusterService clusters, DcaeLocationService locations, MirrorMakerService bridge) {
83         this.mr_topics = mr_topics;
84         this.clientService = clientService;
85         defaultGlobalMrHost = p.getProperty("MR.globalHost", "global.host.not.set");
86         centralCname = p.getProperty("MR.CentralCname");
87         createTopicRoles = "true".equalsIgnoreCase(p.getProperty("aaf.CreateTopicRoles", "true"));
88         String unit_test = p.getProperty("UnitTest", "No");
89         if ("Yes".equals(unit_test)) {
90             strictGraph = false;
91         }
92         mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
93         logger.info("TopicService properties: CentralCname=" + centralCname +
94                 "   defaultGlobarlMrHost=" + defaultGlobalMrHost +
95                 " createTopicRoles=" + createTopicRoles +
96                 " mmPerMR=" + mmPerMR);
97         this.clusters = clusters;
98         this.locations = locations;
99         this.bridge = bridge;
100     }
101
102     public Map<String, Topic> getTopics() {
103         return mr_topics;
104     }
105
106     public List<Topic> getAllTopics() {
107         return getAllTopics(true);
108     }
109
110     public List<Topic> getAllTopicsWithoutClients() {
111         return getAllTopics(false);
112     }
113
114     private List<Topic> getAllTopics(Boolean withClients) {
115         ArrayList<Topic> topics = new ArrayList<>(mr_topics.values());
116         if (withClients) {
117             for (Topic topic : topics) {
118                 topic.setClients(clientService.getAllMrClients(topic.getFqtn()));
119             }
120         }
121         return topics;
122     }
123
124
125     public Topic getTopic(String key, ApiError apiError) {
126         logger.info("getTopic: key=" + key);
127         Topic t = mr_topics.get(key);
128         if (t == null) {
129             apiError.setCode(Status.NOT_FOUND.getStatusCode());
130             apiError.setFields("fqtn");
131             apiError.setMessage("topic with fqtn " + key + " not found");
132             return null;
133         }
134         t.setClients(clientService.getAllMrClients(key));
135         apiError.setCode(Status.OK.getStatusCode());
136         return t;
137     }
138
139     private void aafTopicSetup(Topic topic, ApiError err) {
140
141         String nsr = dmaapSvc.getDmaap().getTopicNsRoot();
142         if (nsr == null) {
143             err.setCode(500);
144             err.setMessage("Unable to establish AAF namespace root: (check /dmaap object)");
145             err.setFields("topicNsRoot");
146             return;
147         }
148
149         // establish AAF Connection using TopicMgr identity
150         AafService aaf = new AafServiceImpl(ServiceType.AAF_TopicMgr);
151
152         AafRole pubRole = null;
153         AafRole subRole = null;
154
155         // creating Topic Roles was not an original feature.
156         // For backwards compatibility, only do this if the feature is enabled.
157         // Also, if the namespace of the topic is a foreign namespace, (i.e. not the same as our root ns)
158         // then we likely don't have permission to create sub-ns and Roles so don't try.
159         if (createTopicRoles && topic.getFqtn().startsWith(nsr)) {
160             // create AAF namespace for this topic
161             AafNamespace ns = new AafNamespace(topic.getFqtn(), aaf.getIdentity());
162             {
163                 int rc = aaf.addNamespace(ns);
164                 if (rc != 201 && rc != 409) {
165                     err.setCode(500);
166                     err.setMessage("Unexpected response from AAF:" + rc);
167                     err.setFields("namespace:" + topic.getFqtn() + " identity=" + aaf.getIdentity());
168                     return;
169                 }
170             }
171
172             // create AAF Roles for MR clients of this topic
173             String rn = "publisher";
174             pubRole = new AafRole(topic.getFqtn(), rn);
175             int rc = aaf.addRole(pubRole);
176             if (rc != 201 && rc != 409) {
177                 err.setCode(500);
178                 err.setMessage("Unexpected response from AAF:" + rc);
179                 err.setFields("topic:" + topic.getFqtn() + " role=" + rn);
180                 return;
181             }
182             topic.setPublisherRole(pubRole.getFullyQualifiedRole());
183
184             rn = "subscriber";
185             subRole = new AafRole(topic.getFqtn(), rn);
186             rc = aaf.addRole(subRole);
187             if (rc != 201 && rc != 409) {
188                 err.setCode(500);
189                 err.setMessage("Unexpected response from AAF:" + rc);
190                 err.setFields("topic:" + topic.getFqtn() + " role=" + rn);
191                 return;
192             }
193             topic.setSubscriberRole(subRole.getFullyQualifiedRole());
194         }
195
196         // create AAF perms checked by MR
197         String instance = ":topic." + topic.getFqtn();
198         String[] actions = {"pub", "sub", "view"};
199         String t = dmaapSvc.getTopicPerm();
200         for (String action : actions) {
201             DmaapPerm perm = new DmaapPerm(t, instance, action);
202             int rc = aaf.addPerm(perm);
203             if (rc != 201 && rc != 409) {
204                 err.setCode(500);
205                 err.setMessage("Unexpected response from AAF:" + rc);
206                 err.setFields("t=" + t + " instance=" + instance + " action=" + action);
207                 return;
208             }
209             if (createTopicRoles) {
210                 // Grant perms to our default Roles
211                 if (action.equals("pub") || action.equals("view")) {
212                     DmaapGrant g = new DmaapGrant(perm, pubRole.getFullyQualifiedRole());
213                     rc = aaf.addGrant(g);
214                     if (rc != 201 && rc != 409) {
215                         err.setCode(rc);
216                         err.setMessage("Grant of " + perm.toString() + " failed for " + pubRole.getFullyQualifiedRole());
217                         logger.warn(err.getMessage());
218                         return;
219                     }
220                 }
221                 if (action.equals("sub") || action.equals("view")) {
222                     DmaapGrant g = new DmaapGrant(perm, subRole.getFullyQualifiedRole());
223                     rc = aaf.addGrant(g);
224                     if (rc != 201 && rc != 409) {
225                         err.setCode(rc);
226                         err.setMessage("Grant of " + perm.toString() + " failed for " + subRole.getFullyQualifiedRole());
227                         logger.warn(err.getMessage());
228                         return;
229                     }
230                 }
231             }
232
233         }
234     }
235
236     public Topic addTopic(Topic topic, ApiError err, Boolean useExisting) {
237         logger.info("Entry: addTopic");
238         logger.info("Topic name=" + topic.getTopicName() + " fqtnStyle=" + topic.getFqtnStyle());
239         String nFqtn = topic.genFqtn();
240         logger.info("FQTN=" + nFqtn);
241         Topic pTopic = getTopic(nFqtn, err);
242         if (pTopic != null) {
243             String t = "topic already exists: " + nFqtn;
244             logger.info(t);
245             if (useExisting) {
246                 err.setCode(Status.OK.getStatusCode());
247                 return pTopic;
248             }
249             err.setMessage(t);
250             err.setFields("fqtn");
251             err.setCode(Status.CONFLICT.getStatusCode());
252             return null;
253         }
254         err.reset();  // err filled with NOT_FOUND is expected case, but don't want to litter...
255
256         topic.setFqtn(nFqtn);
257
258         aafTopicSetup(topic, err);
259         if (err.getCode() >= 400) {
260             return null;
261         }
262
263         if (topic.getReplicationCase().involvesGlobal()) {
264             if (topic.getGlobalMrURL() == null) {
265                 topic.setGlobalMrURL(defaultGlobalMrHost);
266             }
267             if (!Fqdn.isValid(topic.getGlobalMrURL())) {
268                 logger.error("GlobalMR FQDN not valid: " + topic.getGlobalMrURL());
269                 topic.setStatus(DmaapObject_Status.INVALID);
270                 err.setCode(500);
271                 err.setMessage("Value is not a valid FQDN:" + topic.getGlobalMrURL());
272                 err.setFields("globalMrURL");
273
274                 return null;
275             }
276         }
277
278
279         if (topic.getNumClients() > 0) {
280             ArrayList<MR_Client> clients = new ArrayList<MR_Client>(topic.getClients());
281
282
283             ArrayList<MR_Client> clients2 = new ArrayList<MR_Client>();
284             for (Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
285                 MR_Client c = it.next();
286
287                 logger.info("c fqtn=" + c.getFqtn() + " ID=" + c.getMrClientId() + " url=" + c.getTopicURL());
288                 MR_Client nc = new MR_Client(c.getDcaeLocationName(), topic.getFqtn(), c.getClientRole(), c.getAction());
289                 nc.setFqtn(topic.getFqtn());
290                 nc.setClientIdentity(c.getClientIdentity());
291                 logger.info("nc fqtn=" + nc.getFqtn() + " ID=" + nc.getMrClientId() + " url=" + nc.getTopicURL());
292                 clients2.add(clientService.addMr_Client(nc, topic, err));
293                 if (!err.is2xx()) {
294                     return null;
295                 }
296             }
297
298             topic.setClients(clients2);
299         }
300
301         Topic ntopic = checkForBridge(topic, err);
302         if (ntopic == null) {
303             topic.setStatus(DmaapObject_Status.INVALID);
304             if (!err.is2xx()) {
305                 return null;
306             }
307         }
308
309
310         mr_topics.put(nFqtn, ntopic);
311
312         err.setCode(Status.OK.getStatusCode());
313         return ntopic;
314     }
315
316
317     public Topic updateTopic(Topic topic, ApiError err) {
318         logger.info("updateTopic: entry");
319         logger.info("updateTopic: topic=" + topic);
320         logger.info("updateTopic: fqtn=" + topic.getFqtn());
321         if (topic.getFqtn().isEmpty()) {
322             return null;
323         }
324         logger.info("updateTopic: call checkForBridge");
325         Topic ntopic = checkForBridge(topic, err);
326         if (ntopic == null) {
327             topic.setStatus(DmaapObject_Status.INVALID);
328             if (!err.is2xx()) {
329                 return null;
330             }
331         }
332         if (ntopic != null) {
333             logger.info("updateTopic: call put");
334             mr_topics.put(ntopic.getFqtn(), ntopic);
335         }
336         err.setCode(Status.OK.getStatusCode());
337         return ntopic;
338     }
339
340     public Topic removeTopic(String pubId, ApiError apiError) {
341         Topic topic = mr_topics.get(pubId);
342         if (topic == null) {
343             apiError.setCode(Status.NOT_FOUND.getStatusCode());
344             apiError.setMessage("Topic " + pubId + " does not exist");
345             apiError.setFields("fqtn");
346             return null;
347         }
348         ArrayList<MR_Client> clients = new ArrayList<MR_Client>(clientService.getAllMrClients(pubId));
349         for (Iterator<MR_Client> it = clients.iterator(); it.hasNext(); ) {
350             MR_Client c = it.next();
351
352
353             clientService.removeMr_Client(c.getMrClientId(), false, apiError);
354             if (!apiError.is2xx()) {
355                 return null;
356             }
357         }
358         apiError.setCode(Status.OK.getStatusCode());
359         return mr_topics.remove(pubId);
360     }
361
362     public static ApiError setBridgeClientPerms(MR_Cluster node) {
363         DmaapConfig p = (DmaapConfig) DmaapConfig.getConfig();
364         String mmProvRole = p.getProperty("MM.ProvRole");
365         String mmAgentRole = p.getProperty("MM.AgentRole");
366         String[] Roles = {mmProvRole, mmAgentRole};
367         String[] actions = {"view", "pub", "sub"};
368         Topic bridgeAdminTopic = new Topic().init();
369         bridgeAdminTopic.setTopicName(dmaapSvc.getBridgeAdminFqtn());
370         bridgeAdminTopic.setTopicDescription("RESERVED topic for MirroMaker Provisioning");
371         bridgeAdminTopic.setOwner("DBC");
372
373         ArrayList<MR_Client> clients = new ArrayList<MR_Client>();
374         for (String role : Roles) {
375             MR_Client client = new MR_Client();
376             client.setAction(actions);
377             client.setClientRole(role);
378             client.setDcaeLocationName(node.getDcaeLocationName());
379             clients.add(client);
380         }
381         bridgeAdminTopic.setClients(clients);
382
383         TopicService ts = new TopicService();
384         ApiError err = new ApiError();
385         ts.addTopic(bridgeAdminTopic, err, true);
386
387         if (err.is2xx() || err.getCode() == 409) {
388             err.setCode(200);
389             return err;
390         }
391
392         errorLogger.error(DmaapbcLogMessageEnum.TOPIC_CREATE_ERROR, bridgeAdminTopic.getFqtn(), Integer.toString(err.getCode()), err.getFields(), err.getMessage());
393         return err;
394     }
395
396
397     public Topic checkForBridge(Topic topic, ApiError err) {
398         logger.info("checkForBridge: entry");
399         logger.info("fqtn=" + topic.getFqtn() + " replicatonType=" + topic.getReplicationCase());
400         if (topic.getReplicationCase() == ReplicationType.REPLICATION_NONE) {
401             topic.setStatus(DmaapObject_Status.VALID);
402             return topic;
403         }
404
405         boolean anythingWrong = false;
406
407         Set<String> groups = clusters.getGroups();
408         for (String g : groups) {
409             logger.info("buildBridge for " + topic.getFqtn() + " on group" + g);
410             anythingWrong |= buildBridge(topic, err, g);
411         }
412         if (anythingWrong) {
413             topic.setStatus(DmaapObject_Status.INVALID);
414             if (!err.is2xx()) {
415                 return null;
416             }
417         } else {
418             topic.setStatus(DmaapObject_Status.VALID);
419         }
420         return topic;
421     }
422
423     private boolean buildBridge(Topic topic, ApiError err, String group) {
424         logger.info("buildBridge: entry");
425         boolean anythingWrong = false;
426         Graph graph;
427         logger.info("buildBridge: strictGraph=" + strictGraph);
428         if (group == null || group.isEmpty()) {
429             graph = new Graph(topic.getClients(), strictGraph);
430         } else {
431             graph = new Graph(topic.getClients(), strictGraph, group);
432         }
433         logger.info("buildBridge: graph=" + graph);
434         MR_Cluster groupCentralCluster = null;
435
436
437         if (graph.isEmpty()) {
438             logger.info("buildBridge: graph is empty.  return false");
439             return false;
440         } else if (group == null && topic.getReplicationCase().involvesFQDN()) {
441             logger.info("buildBridge: group is null and replicationCaseInvolvesFQDN. return false");
442             return false;
443         } else if (!graph.hasCentral()) {
444             logger.warn("Topic " + topic.getFqtn() + " wants to be " + topic.getReplicationCase() + " but has no central clients");
445             return true;
446         } else {
447             groupCentralCluster = clusters.getMr_ClusterByLoc(graph.getCentralLoc());
448         }
449         Collection<String> clientLocations = graph.getKeys();
450         for (String loc : clientLocations) {
451             logger.info("loc=" + loc);
452             DcaeLocation location = locations.getDcaeLocation(loc);
453             MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
454             logger.info("cluster=" + cluster + " at " + cluster.getDcaeLocationName());
455             logger.info("location.isCentral()=" + location.isCentral() + " getCentralLoc()=" + graph.getCentralLoc());
456
457
458             String source = null;
459             String target = null;
460
461             /*
462              * Provision Edge to Central bridges...
463              */
464             if (!location.isCentral() && !graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
465                 switch (topic.getReplicationCase()) {
466                     case REPLICATION_EDGE_TO_CENTRAL:
467                     case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:  // NOTE: this is for E2C portion only
468                         source = cluster.getFqdn();
469                         target = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
470                         logger.info("REPLICATION_EDGE_TO_CENTRAL: source=" + source + " target=" + target);
471                         break;
472                     case REPLICATION_CENTRAL_TO_EDGE:
473                     case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for C2E portion only
474                         source = (mmPerMR) ? groupCentralCluster.getFqdn() : centralCname;
475                         target = cluster.getFqdn();
476                         break;
477                     case REPLICATION_CENTRAL_TO_GLOBAL:
478                     case REPLICATION_GLOBAL_TO_CENTRAL:
479                     case REPLICATION_FQDN_TO_GLOBAL:
480                     case REPLICATION_GLOBAL_TO_FQDN:
481                         break;
482
483                     case REPLICATION_EDGE_TO_FQDN:
484                     case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2C portion only
485                         source = cluster.getFqdn();
486                         target = groupCentralCluster.getFqdn();
487                         break;
488                     case REPLICATION_FQDN_TO_EDGE:
489                     case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for F2E portion only
490                         source = groupCentralCluster.getFqdn();
491                         target = cluster.getFqdn();
492                         break;
493
494                     default:
495                         logger.error("Unexpected value for ReplicationType (" + topic.getReplicationCase() + ") for topic " + topic.getFqtn());
496                         anythingWrong = true;
497                         err.setCode(400);
498                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
499                                 + topic.getReplicationCase());
500                         err.setMessage("Unexpected value for ReplicationType");
501                         continue;
502                 }
503
504             } else if (location.isCentral() && graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
505                 /*
506                  * Provision Central to Global bridges
507                  */
508                 switch (topic.getReplicationCase()) {
509
510                     case REPLICATION_CENTRAL_TO_GLOBAL:
511                     case REPLICATION_EDGE_TO_CENTRAL_TO_GLOBAL:
512                         source = centralCname;
513                         target = topic.getGlobalMrURL();
514                         break;
515                     case REPLICATION_GLOBAL_TO_CENTRAL:
516                     case REPLICATION_GLOBAL_TO_CENTRAL_TO_EDGE:  // NOTE: this is for G2C portion only
517                         source = topic.getGlobalMrURL();
518                         target = centralCname;
519                         break;
520
521                     case REPLICATION_EDGE_TO_FQDN_TO_GLOBAL:  // NOTE: this is for E2F portion only
522                         source = groupCentralCluster.getFqdn();
523                         target = topic.getGlobalMrURL();
524                         break;
525
526                     case REPLICATION_FQDN_TO_GLOBAL:
527                         source = groupCentralCluster.getFqdn();
528                         target = topic.getGlobalMrURL();
529                         break;
530
531                     case REPLICATION_GLOBAL_TO_FQDN:
532                     case REPLICATION_GLOBAL_TO_FQDN_TO_EDGE:  // NOTE: this is for G2F portion only
533                         source = topic.getGlobalMrURL();
534                         target = groupCentralCluster.getFqdn();
535                         break;
536
537                     case REPLICATION_FQDN_TO_EDGE:
538                     case REPLICATION_EDGE_TO_FQDN:
539                     case REPLICATION_EDGE_TO_CENTRAL:
540                     case REPLICATION_CENTRAL_TO_EDGE:
541                         break;
542                     default:
543                         logger.error("Unexpected value for ReplicationType (" + topic.getReplicationCase() + ") for topic " + topic.getFqtn());
544                         anythingWrong = true;
545                         err.setCode(400);
546                         err.setFields("topic=" + topic.genFqtn() + " replicationCase="
547                                 + topic.getReplicationCase());
548                         err.setMessage("Unexpected value for ReplicationType");
549                         continue;
550                 }
551             } else {
552                 logger.warn("dcaeLocation " + loc + " is neither Edge nor Central so no mmagent provisioning was done");
553                 anythingWrong = true;
554                 continue;
555             }
556             if (source != null && target != null) {
557                 try {
558                     logger.info("Create a MM from " + source + " to " + target);
559                     MirrorMaker mm = bridge.findNextMM(source, target, topic.getFqtn());
560                     mm.addTopic(topic.getFqtn());
561                     bridge.updateMirrorMaker(mm);
562                 } catch (Exception ex) {
563                     err.setCode(500);
564                     err.setFields("mirror_maker.topic");
565                     err.setMessage("Unexpected condition: " + ex);
566                     anythingWrong = true;
567                     break;
568                 }
569             }
570
571
572         }
573         return anythingWrong;
574
575     }
576
577
578     /*
579      * Prior to 1707, we only supported EDGE_TO_CENTRAL replication.
580      * This was determined automatically based on presence of edge publishers and central subscribers.
581      * The following method is a modification of that original logic, to preserve some backwards compatibility,
582      * i.e. to be used when no ReplicationType is specified.
583      */
584     public ReplicationType reviewTopic(Topic topic) {
585
586
587         if (topic.getNumClients() > 1) {
588             Graph graph = new Graph(topic.getClients(), false);
589
590             String centralFqdn = new String();
591             if (graph.hasCentral()) {
592                 DmaapConfig p = (DmaapConfig) DmaapConfig.getConfig();
593                 centralFqdn = p.getProperty("MR.CentralCname");
594             }
595
596             Collection<String> locations = graph.getKeys();
597             for (String loc : locations) {
598                 logger.info("loc=" + loc);
599                 MR_Cluster cluster = clusters.getMr_ClusterByLoc(loc);
600                 if (cluster == null) {
601                     logger.info("No MR cluster for location " + loc);
602                     continue;
603                 }
604                 if (graph.hasCentral() && !graph.getCentralLoc().equals(cluster.getDcaeLocationName())) {
605                     logger.info("Detected case for EDGE_TO_CENTRAL from " + cluster.getFqdn() + " to " + centralFqdn);
606                     return ReplicationType.REPLICATION_EDGE_TO_CENTRAL;
607
608                 }
609
610             }
611         }
612
613         return ReplicationType.REPLICATION_NONE;
614     }
615
616 }