2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.plugins.grtoolkit;
24 import java.io.BufferedReader;
26 import java.io.FileInputStream;
27 import java.io.FileReader;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.io.OutputStream;
31 import java.net.HttpURLConnection;
33 import java.nio.charset.StandardCharsets;
34 import java.sql.Connection;
35 import java.sql.SQLException;
36 import java.util.ArrayList;
37 import java.util.Collection;
38 import java.util.HashMap;
40 import java.util.Properties;
41 import java.util.List;
42 import java.util.concurrent.ExecutorService;
43 import java.util.concurrent.Executors;
44 import java.util.regex.Matcher;
45 import java.util.regex.Pattern;
46 import javax.annotation.Nonnull;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
51 import org.onap.ccsdk.sli.core.dblib.DbLibService;
52 import org.onap.ccsdk.sli.plugins.grtoolkit.data.ClusterActor;
53 import org.onap.ccsdk.sli.plugins.grtoolkit.data.MemberBuilder;
55 import org.json.JSONArray;
56 import org.json.JSONException;
57 import org.json.JSONObject;
59 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
60 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
61 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
62 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
63 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
64 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
65 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
66 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
67 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
68 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
69 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
70 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
71 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
72 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
73 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
74 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
75 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
76 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
77 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
78 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
79 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
80 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
81 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
82 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
83 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
84 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
85 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
86 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
87 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
88 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
89 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
90 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
91 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
92 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
93 import org.opendaylight.yangtools.yang.common.RpcResult;
94 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
96 import org.slf4j.Logger;
97 import org.slf4j.LoggerFactory;
99 public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
100 private static final String APP_NAME = "gr-toolkit";
101 private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
102 private static final String HEALTHY = "HEALTHY";
103 private static final String FAULTY = "FAULTY";
104 private static final String VALUE = "value";
105 private static final String OUTPUT = "output";
106 private static final int CONNECTION_TIMEOUT = 5000; // 5 second timeout
107 private String akkaConfig;
108 private String jolokiaClusterPath;
109 private String shardManagerPath;
110 private String shardPathTemplate;
111 private String credentials;
112 private String httpProtocol;
113 private String siteIdentifier = System.getenv("SITE_NAME");
114 private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
115 private final ExecutorService executor;
116 protected DataBroker dataBroker;
117 protected NotificationPublishService notificationService;
118 protected RpcProviderRegistry rpcRegistry;
119 protected BindingAwareBroker.RpcRegistration<GrToolkitService> rpcRegistration;
120 protected DbLibService dbLib;
121 private String member;
122 private ClusterActor self;
123 private HashMap<String, ClusterActor> memberMap;
124 private SiteConfiguration siteConfiguration;
125 private Properties properties;
126 private DistributedDataStoreInterface configDatastore;
127 public GrToolkitProvider(DataBroker dataBroker,
128 NotificationPublishService notificationProviderService,
129 RpcProviderRegistry rpcProviderRegistry,
130 DistributedDataStoreInterface configDatastore,
131 DbLibService dbLibService) {
132 this.log.info("Creating provider for {}", APP_NAME);
133 this.executor = Executors.newFixedThreadPool(1);
134 this.dataBroker = dataBroker;
135 this.notificationService = notificationProviderService;
136 this.rpcRegistry = rpcProviderRegistry;
137 this.configDatastore = configDatastore;
138 this.dbLib = dbLibService;
142 private void initialize() {
143 log.info("Initializing provider for {}", APP_NAME);
144 // Create the top level containers
149 rpcRegistration = rpcRegistry.addRpcImplementation(GrToolkitService.class, this);
150 log.info("Initialization complete for {}", APP_NAME);
153 private void setProperties() {
154 log.info("Loading properties from {}", PROPERTIES_FILE);
155 properties = new Properties();
156 File propertiesFile = new File(PROPERTIES_FILE);
157 if(!propertiesFile.exists()) {
158 log.warn("Properties file not found.");
161 try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
162 properties.load(fileInputStream);
163 if(!properties.containsKey(PropertyKeys.SITE_IDENTIFIER)) {
164 properties.put(PropertyKeys.SITE_IDENTIFIER, "Unknown Site");
166 String port = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? properties.getProperty(PropertyKeys.CONTROLLER_PORT_SSL).trim() : properties.getProperty(PropertyKeys.CONTROLLER_PORT_HTTP).trim();
167 httpProtocol = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? "https://" : "http://";
168 akkaConfig = properties.getProperty(PropertyKeys.AKKA_CONF_LOCATION).trim();
169 jolokiaClusterPath = ":" + port + properties.getProperty(PropertyKeys.MBEAN_CLUSTER).trim();
170 shardManagerPath = ":" + port + properties.getProperty(PropertyKeys.MBEAN_SHARD_MANAGER).trim();
171 shardPathTemplate = ":" + port + properties.getProperty(PropertyKeys.MBEAN_SHARD_CONFIG).trim();
172 if(siteIdentifier == null || siteIdentifier.isEmpty()) {
173 siteIdentifier = properties.getProperty(PropertyKeys.SITE_IDENTIFIER).trim();
175 credentials = properties.getProperty(PropertyKeys.CONTROLLER_CREDENTIALS).trim();
176 log.info("Loaded properties.");
177 } catch(IOException e) {
178 log.error("Error loading properties.", e);
182 private void defineMembers() {
183 member = configDatastore.getActorContext().getCurrentMemberName().getName();
184 log.info("Cluster member: {}", member);
186 log.info("Parsing akka.conf for cluster memberMap...");
188 File akkaConfigFile = new File(this.akkaConfig);
189 try(FileReader fileReader = new FileReader(akkaConfigFile);
190 BufferedReader bufferedReader = new BufferedReader(fileReader)) {
192 while((line = bufferedReader.readLine()) != null) {
193 if(line.contains("seed-nodes =")) {
194 parseSeedNodes(line);
199 } catch(IOException e) {
200 log.error("Couldn't load akka", e);
201 } catch(NullPointerException e) {
202 log.error("akkaConfig is null. Check properties file and restart {} bundle.", APP_NAME);
204 log.info("self:\n{}", self);
207 private void createContainers() {
208 // Replace with MD-SAL write for FailoverStatus
211 protected void initializeChild() {
212 // Override if you have custom initialization intelligence
216 public void close() throws Exception {
217 log.info("Closing provider for {}", APP_NAME);
219 rpcRegistration.close();
220 log.info("Successfully closed provider for {}", APP_NAME);
224 public void onDataTreeChanged(@Nonnull Collection changes) {
225 log.info("onDataTreeChanged() called. but there is no change here");
229 public ListenableFuture<RpcResult<ClusterHealthOutput>> clusterHealth(ClusterHealthInput input) {
230 log.info("{}:cluster-health invoked.", APP_NAME);
231 getControllerHealth();
232 return buildClusterHealthOutput("200");
236 public ListenableFuture<RpcResult<SiteHealthOutput>> siteHealth(SiteHealthInput input) {
237 log.info("{}:site-health invoked.", APP_NAME);
238 getControllerHealth();
239 return buildSiteHealthOutput("200", getAdminHealth(), getDatabaseHealth());
243 public ListenableFuture<RpcResult<DatabaseHealthOutput>> databaseHealth(DatabaseHealthInput input) {
244 log.info("{}:database-health invoked.", APP_NAME);
245 DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
246 outputBuilder.setStatus("200");
247 outputBuilder.setHealth(getDatabaseHealth());
248 outputBuilder.setServedBy(member);
250 return Futures.immediateFuture(RpcResultBuilder.<DatabaseHealthOutput>status(true).withResult(outputBuilder.build()).build());
254 public ListenableFuture<RpcResult<AdminHealthOutput>> adminHealth(AdminHealthInput input) {
255 log.info("{}:admin-health invoked.", APP_NAME);
256 AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
257 outputBuilder.setStatus("200");
258 outputBuilder.setHealth(getAdminHealth());
259 outputBuilder.setServedBy(member);
260 log.info(outputBuilder.build().toString());
261 return Futures.immediateFuture(RpcResultBuilder.<AdminHealthOutput>status(true).withResult(outputBuilder.build()).build());
265 public ListenableFuture<RpcResult<HaltAkkaTrafficOutput>> haltAkkaTraffic(HaltAkkaTrafficInput input) {
266 log.info("{}:halt-akka-traffic invoked.", APP_NAME);
267 HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
268 outputBuilder.setStatus("200");
269 modifyIpTables(IpTables.ADD, input.getNodeInfo().toArray());
270 outputBuilder.setServedBy(member);
272 return Futures.immediateFuture(RpcResultBuilder.<HaltAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
276 public ListenableFuture<RpcResult<ResumeAkkaTrafficOutput>> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
277 log.info("{}:resume-akka-traffic invoked.", APP_NAME);
278 ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
279 outputBuilder.setStatus("200");
280 modifyIpTables(IpTables.DELETE, input.getNodeInfo().toArray());
281 outputBuilder.setServedBy(member);
283 return Futures.immediateFuture(RpcResultBuilder.<ResumeAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
287 public ListenableFuture<RpcResult<SiteIdentifierOutput>> siteIdentifier(SiteIdentifierInput input) {
288 log.info("{}:site-identifier invoked.", APP_NAME);
289 SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
290 outputBuilder.setStatus("200");
291 outputBuilder.setId(siteIdentifier);
292 outputBuilder.setServedBy(member);
294 return Futures.immediateFuture(RpcResultBuilder.<SiteIdentifierOutput>status(true).withResult(outputBuilder.build()).build());
298 public ListenableFuture<RpcResult<FailoverOutput>> failover(FailoverInput input) {
299 log.info("{}:failover invoked.", APP_NAME);
300 FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
301 outputBuilder.setServedBy(member);
302 if(siteConfiguration != SiteConfiguration.GEO) {
303 log.info("Cannot failover non-GEO site.");
304 outputBuilder.setMessage("Failover aborted. This is not a GEO configuration.");
305 outputBuilder.setStatus("400");
306 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
308 ArrayList<ClusterActor> activeSite = new ArrayList<>();
309 ArrayList<ClusterActor> standbySite = new ArrayList<>();
311 log.info("Performing preliminary cluster health check...");
312 // Necessary to populate all member info. Health is not used for judgement calls.
313 getControllerHealth();
315 log.info("Determining active site...");
316 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
317 String key = entry.getKey();
318 ClusterActor clusterActor = entry.getValue();
319 if(clusterActor.isVoting()) {
320 activeSite.add(clusterActor);
321 log.debug("Active Site member: {}", key);
324 standbySite.add(clusterActor);
325 log.debug("Standby Site member: {}", key);
329 String port = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL)) ? properties.getProperty(PropertyKeys.CONTROLLER_PORT_SSL) : properties.getProperty(PropertyKeys.CONTROLLER_PORT_HTTP);
331 if(Boolean.parseBoolean(input.getBackupData())) {
332 backupMdSal(activeSite, port);
335 if(!changeClusterVoting(outputBuilder, activeSite, standbySite, port))
336 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
338 if(Boolean.parseBoolean(input.getIsolate())) {
339 isolateSiteFromCluster(activeSite, standbySite, port);
341 if(Boolean.parseBoolean(input.getDownUnreachable())) {
342 downUnreachableNodes(activeSite, standbySite, port);
346 log.info("{}:failover complete.", APP_NAME);
348 outputBuilder.setMessage("Failover complete.");
349 outputBuilder.setStatus("200");
350 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
353 private void isolateSiteFromCluster(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
354 log.info("Halting Akka traffic...");
355 for(ClusterActor actor : standbySite) {
357 log.info("Halting Akka traffic for: {}", actor.getNode());
358 // Build JSON with activeSite actor Node and actor AkkaPort
359 JSONObject akkaInput = new JSONObject();
360 JSONObject inputBlock = new JSONObject();
361 JSONArray votingStateArray = new JSONArray();
363 for(ClusterActor node : activeSite) {
364 nodeInfo = new JSONObject();
365 nodeInfo.put("node", node.getNode());
366 nodeInfo.put("port", node.getAkkaPort());
367 votingStateArray.put(nodeInfo);
369 inputBlock.put("node-info", votingStateArray);
370 akkaInput.put("input", inputBlock);
371 getRequestContent(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", HttpMethod.POST, akkaInput.toString());
372 } catch(IOException e) {
373 log.error("Could not halt Akka traffic for: " + actor.getNode(), e);
378 private void downUnreachableNodes(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
379 log.info("Setting site unreachable...");
380 JSONObject jolokiaInput = new JSONObject();
381 jolokiaInput.put("type", "EXEC");
382 jolokiaInput.put("mbean", "akka:type=Cluster");
383 jolokiaInput.put("operation", "down");
384 JSONArray arguments = new JSONArray();
385 for(ClusterActor actor : activeSite) {
386 // Build Jolokia input
387 // May need to change from akka port to actor.getAkkaPort()
388 arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty(PropertyKeys.CONTROLLER_PORT_AKKA));
390 jolokiaInput.put("arguments", arguments);
391 log.debug("{}", jolokiaInput);
393 log.info("Setting nodes unreachable");
394 getRequestContent(httpProtocol + standbySite.get(0).getNode() + ":" + port + "/jolokia", HttpMethod.POST, jolokiaInput.toString());
395 } catch(IOException e) {
396 log.error("Error setting nodes unreachable", e);
400 private boolean changeClusterVoting(FailoverOutputBuilder outputBuilder, ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
401 log.info("Changing voting for all shards to standby site...");
403 JSONObject votingInput = new JSONObject();
404 JSONObject inputBlock = new JSONObject();
405 JSONArray votingStateArray = new JSONArray();
406 JSONObject memberVotingState;
407 for(ClusterActor actor : activeSite) {
408 memberVotingState = new JSONObject();
409 memberVotingState.put("member-name", actor.getMember());
410 memberVotingState.put("voting", false);
411 votingStateArray.put(memberVotingState);
413 for(ClusterActor actor : standbySite) {
414 memberVotingState = new JSONObject();
415 memberVotingState.put("member-name", actor.getMember());
416 memberVotingState.put("voting", true);
417 votingStateArray.put(memberVotingState);
419 inputBlock.put("member-voting-state", votingStateArray);
420 votingInput.put("input", inputBlock);
421 log.debug("{}", votingInput);
422 // Change voting all shards
423 getRequestContent(httpProtocol + self.getNode() + ":" + port + "/restconf/operations/cluster-admin:change-member-voting-states-for-all-shards", HttpMethod.POST, votingInput.toString());
424 } catch(IOException e) {
425 log.error("Changing voting", e);
426 outputBuilder.setMessage("Failover aborted. Failed to change voting.");
427 outputBuilder.setStatus("500");
433 private void backupMdSal(ArrayList<ClusterActor> activeSite, String port) {
434 log.info("Backing up data...");
436 log.info("Scheduling backup for: {}", activeSite.get(0).getNode());
437 getRequestContent(httpProtocol + activeSite.get(0).getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", HttpMethod.POST, "{ \"input\": { \"run-at\": \"30\" } }");
438 } catch(IOException e) {
439 log.error("Error backing up MD-SAL", e);
441 for(ClusterActor actor : activeSite) {
444 log.info("Backing up data for: {}", actor.getNode());
445 getRequestContent(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", HttpMethod.POST);
446 } catch(IOException e) {
447 log.error("Error backing up data.", e);
452 private ListenableFuture<RpcResult<ClusterHealthOutput>> buildClusterHealthOutput(String statusCode) {
453 ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
454 outputBuilder.setStatus(statusCode);
455 outputBuilder.setMembers((List) new ArrayList<Member>());
459 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
460 ClusterActor clusterActor = entry.getValue();
461 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
462 if(ClusterActor.SITE_1.equals(clusterActor.getSite()))
464 else if(ClusterActor.SITE_2.equals(clusterActor.getSite()))
467 outputBuilder.getMembers().add(new MemberBuilder(clusterActor).build());
469 if(siteConfiguration == SiteConfiguration.SOLO) {
470 outputBuilder.setSite1Health(HEALTHY);
472 else if(site1Health > 1) {
473 outputBuilder.setSite1Health(HEALTHY);
476 outputBuilder.setSite1Health(FAULTY);
479 if(siteConfiguration == SiteConfiguration.GEO && site2Health > 1) {
480 outputBuilder.setSite2Health(HEALTHY);
482 else if(siteConfiguration == SiteConfiguration.GEO) {
483 outputBuilder.setSite2Health(FAULTY);
486 outputBuilder.setServedBy(member);
487 RpcResult<ClusterHealthOutput> rpcResult = RpcResultBuilder.<ClusterHealthOutput>status(true).withResult(outputBuilder.build()).build();
488 log.info("{}:cluster-health: Site 1 | Healthy ODLs {}", APP_NAME, site1Health);
489 if(siteConfiguration == SiteConfiguration.GEO) {
490 log.info("{}:cluster-health: Site 2 | Healthy ODLs {}", APP_NAME, site2Health);
492 return Futures.immediateFuture(rpcResult);
495 private ListenableFuture<RpcResult<SiteHealthOutput>> buildSiteHealthOutput(String statusCode, String adminHealth, String databaseHealth) {
496 SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
497 outputBuilder.setStatus(statusCode);
498 outputBuilder.setSites((List) new ArrayList<Site>());
500 if(siteConfiguration != SiteConfiguration.GEO) {
502 SitesBuilder builder = new SitesBuilder();
503 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
504 ClusterActor clusterActor = entry.getValue();
505 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
509 if(siteConfiguration != SiteConfiguration.SOLO) {
510 builder.setHealth(HEALTHY);
511 builder.setRole("ACTIVE");
512 builder.setId(siteIdentifier);
515 builder = getSitesBuilder(healthyODLs, true, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), siteIdentifier);
517 outputBuilder.getSites().add(builder.build());
520 int site1HealthyODLs = 0;
521 int site2HealthyODLs = 0;
522 boolean site1Voting = false;
523 boolean site2Voting = false;
524 boolean performedCrossSiteHealthCheck = false;
525 boolean crossSiteAdminHealthy = false;
526 boolean crossSiteDbHealthy = false;
527 String crossSiteIdentifier = "UNKNOWN_SITE";
528 String port = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL)) ? properties.getProperty(PropertyKeys.CONTROLLER_PORT_SSL) : properties.getProperty(PropertyKeys.CONTROLLER_PORT_HTTP);
530 // Make calls over to site 2 healthchecks
531 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
532 ClusterActor clusterActor = entry.getValue();
533 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
534 if(ClusterActor.SITE_1.equals(clusterActor.getSite())) {
536 if(clusterActor.isVoting()) {
542 if(clusterActor.isVoting()) {
545 if(!performedCrossSiteHealthCheck) {
547 String content = getRequestContent(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:site-identifier", HttpMethod.POST);
548 crossSiteIdentifier = new JSONObject(content).getJSONObject(OUTPUT).getString("id");
549 crossSiteDbHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:database-health");
550 crossSiteAdminHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:admin-health");
551 performedCrossSiteHealthCheck = true;
552 } catch(Exception e) {
553 log.info("Cannot get cross site health from {}", clusterActor.getNode());
554 log.info("siteIdentifier: {} | dbHealth: {} | adminHealth: {}", crossSiteIdentifier, crossSiteDbHealthy, crossSiteAdminHealthy);
555 log.error("Site Health Error", e);
561 SitesBuilder builder = getSitesBuilder(site1HealthyODLs, site1Voting, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), siteIdentifier);
562 outputBuilder.getSites().add(builder.build());
563 builder = getSitesBuilder(site2HealthyODLs, site2Voting, crossSiteAdminHealthy, crossSiteDbHealthy, crossSiteIdentifier);
564 outputBuilder.getSites().add(builder.build());
565 log.info("{}:site-health: Site 1 ({}) | hasVotingMembers?: {} | Healthy ODLs: {} | ADM isHealthy?: {} | DB isHealthy?: {}", APP_NAME, siteIdentifier, site1Voting, site1HealthyODLs, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth));
566 log.info("{}:site-health: Site 2 ({}) | hasVotingMembers?: {} | Healthy ODLs: {} | ADM isHealthy?: {} | DB isHealthy?: {}", APP_NAME, crossSiteIdentifier, site2Voting, site2HealthyODLs, crossSiteAdminHealthy, crossSiteDbHealthy);
569 // Make calls over to site 1 healthchecks
570 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
571 ClusterActor clusterActor = entry.getValue();
572 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
573 if(ClusterActor.SITE_1.equals(clusterActor.getSite())) {
575 if(clusterActor.isVoting()) {
578 if(!performedCrossSiteHealthCheck) {
580 String content = getRequestContent(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:site-identifier", HttpMethod.POST);
581 crossSiteIdentifier = new JSONObject(content).getJSONObject(OUTPUT).getString("id");
582 crossSiteDbHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:database-health");
583 crossSiteAdminHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:admin-health");
584 performedCrossSiteHealthCheck = true;
585 } catch(Exception e) {
586 log.info("Cannot get cross site health from {}", clusterActor.getNode());
587 log.info("siteIdentifier: {} | dbHealth: {} | adminHealth: {}", crossSiteIdentifier, crossSiteDbHealthy, crossSiteAdminHealthy);
588 log.error("Site Health Error", e);
594 if(clusterActor.isVoting()) {
601 SitesBuilder builder = getSitesBuilder(site1HealthyODLs, site1Voting, crossSiteAdminHealthy, crossSiteDbHealthy, crossSiteIdentifier);
602 outputBuilder.getSites().add(builder.build());
603 builder = getSitesBuilder(site2HealthyODLs, site2Voting, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), siteIdentifier);
604 outputBuilder.getSites().add(builder.build());
605 log.info("{}:site-health: Site 1 ({}) | hasVotingMembers?: {} | Healthy ODLs: {} | ADM isHealthy?: {} | DB isHealthy?: {}", APP_NAME, siteIdentifier, site1Voting, site1HealthyODLs, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth));
606 log.info("{}:site-health: Site 2 ({}) | hasVotingMembers?: {} | Healthy ODLs: {} | ADM isHealthy?: {} | DB isHealthy?: {}", APP_NAME, crossSiteIdentifier, site2Voting, site2HealthyODLs, crossSiteAdminHealthy, crossSiteDbHealthy);
610 outputBuilder.setServedBy(member);
611 RpcResult<SiteHealthOutput> rpcResult = RpcResultBuilder.<SiteHealthOutput>status(true).withResult(outputBuilder.build()).build();
612 return Futures.immediateFuture(rpcResult);
615 private SitesBuilder getSitesBuilder(int siteHealthyODLs, boolean siteVoting, boolean adminHealthy, boolean dbHealthy, String siteIdentifier) {
616 SitesBuilder builder = new SitesBuilder();
617 if(siteHealthyODLs > 1) {
618 builder.setHealth(HEALTHY);
621 log.warn("{} Healthy ODLs: {}", siteIdentifier, siteHealthyODLs);
622 builder.setHealth(FAULTY);
625 log.warn("{} Admin Health: {}", siteIdentifier, FAULTY);
626 builder.setHealth(FAULTY);
629 log.warn("{} Database Health: {}", siteIdentifier, FAULTY);
630 builder.setHealth(FAULTY);
633 builder.setRole("ACTIVE");
636 builder.setRole("STANDBY");
638 builder.setId(siteIdentifier);
642 private boolean isSite1() {
643 int memberNumber = Integer.parseInt(member.split("-")[1]);
644 boolean isSite1 = memberNumber < 4;
645 log.info("isSite1(): {}", isSite1);
649 private void parseSeedNodes(String line) {
650 memberMap = new HashMap<>();
651 line = line.substring(line.indexOf("[\""), line.indexOf(']'));
652 String[] splits = line.split(",");
654 for(int ndx = 0; ndx < splits.length; ndx++) {
655 String nodeName = splits[ndx];
656 int delimLocation = nodeName.indexOf('@');
657 String port = nodeName.substring(splits[ndx].indexOf(':', delimLocation) + 1, splits[ndx].indexOf('"', splits[ndx].indexOf(':')));
658 splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(':', delimLocation));
659 log.info("Adding node: {}:{}", splits[ndx], port);
660 ClusterActor clusterActor = new ClusterActor();
661 clusterActor.setNode(splits[ndx]);
662 clusterActor.setAkkaPort(port);
663 clusterActor.setMember("member-" + (ndx + 1));
665 clusterActor.setSite(ClusterActor.SITE_1);
668 clusterActor.setSite(ClusterActor.SITE_2);
671 if(member.equals(clusterActor.getMember())) {
674 memberMap.put(clusterActor.getNode(), clusterActor);
675 log.info("{}", clusterActor);
678 if(memberMap.size() == 1) {
679 log.info("1 member found. This is a solo environment.");
680 siteConfiguration = SiteConfiguration.SOLO;
682 else if(memberMap.size() == 3) {
683 log.info("This is a single site.");
684 siteConfiguration = SiteConfiguration.SINGLE;
686 else if(memberMap.size() == 6) {
687 log.info("This is a georedundant site.");
688 siteConfiguration = SiteConfiguration.GEO;
692 private void getMemberStatus(ClusterActor clusterActor) throws IOException {
693 log.info("Getting member status for {}", clusterActor.getNode());
694 String content = getRequestContent(httpProtocol + clusterActor.getNode() + jolokiaClusterPath, HttpMethod.GET);
696 JSONObject responseJson = new JSONObject(content);
697 JSONObject responseValue = responseJson.getJSONObject(VALUE);
698 clusterActor.setUp("Up".equals(responseValue.getString("MemberStatus")));
699 clusterActor.setUnreachable(false);
700 } catch(JSONException e) {
701 log.error("Error parsing response from {}", clusterActor.getNode(), e);
702 clusterActor.setUp(false);
703 clusterActor.setUnreachable(true);
707 private void getShardStatus(ClusterActor clusterActor) throws IOException {
708 log.info("Getting shard status for {}", clusterActor.getNode());
709 String content = getRequestContent(httpProtocol + clusterActor.getNode() + shardManagerPath, HttpMethod.GET);
711 JSONObject responseValue = new JSONObject(content).getJSONObject(VALUE);
712 JSONArray shardList = responseValue.getJSONArray("LocalShards");
714 String pattern = "-config$";
715 Pattern r = Pattern.compile(pattern);
717 for(int ndx = 0; ndx < shardList.length(); ndx++) {
718 String configShardName = shardList.getString(ndx);
719 m = r.matcher(configShardName);
720 String operationalShardName = m.replaceFirst("-operational");
721 String shardConfigPath = String.format(shardPathTemplate, configShardName);
722 String shardOperationalPath = String.format(shardPathTemplate, operationalShardName).replace("Config", "Operational");
723 extractShardInfo(clusterActor, configShardName, shardConfigPath);
724 extractShardInfo(clusterActor, operationalShardName, shardOperationalPath);
726 } catch(JSONException e) {
727 log.error("Error parsing response from " + clusterActor.getNode(), e);
731 private void extractShardInfo(ClusterActor clusterActor, String shardName, String shardPath) throws IOException {
732 log.info("Extracting shard info for {}", shardName);
733 log.debug("Pulling config info for {} from: {}", shardName, shardPath);
734 String content = getRequestContent(httpProtocol + clusterActor.getNode() + shardPath, HttpMethod.GET);
735 log.debug("Response: {}", content);
738 JSONObject shardValue = new JSONObject(content).getJSONObject(VALUE);
739 clusterActor.setVoting(shardValue.getBoolean("Voting"));
740 if(shardValue.getString("PeerAddresses").length() > 0) {
741 clusterActor.getReplicaShards().add(shardName);
742 if(shardValue.getString("Leader").startsWith(clusterActor.getMember())) {
743 clusterActor.getShardLeader().add(shardName);
747 clusterActor.getNonReplicaShards().add(shardName);
749 JSONArray followerInfo = shardValue.getJSONArray("FollowerInfo");
750 for(int followerNdx = 0; followerNdx < followerInfo.length(); followerNdx++) {
751 int commitIndex = shardValue.getInt("CommitIndex");
752 int matchIndex = followerInfo.getJSONObject(followerNdx).getInt("matchIndex");
753 if(commitIndex != -1 && matchIndex != -1) {
754 int commitsBehind = commitIndex - matchIndex;
755 clusterActor.getCommits().put(followerInfo.getJSONObject(followerNdx).getString("id"), commitsBehind);
758 } catch(JSONException e) {
759 log.error("Error parsing response from " + clusterActor.getNode(), e);
763 private void getControllerHealth() {
764 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
765 ClusterActor clusterActor = entry.getValue();
766 String key = entry.getKey();
768 // First flush out the old values
769 clusterActor.flush();
770 log.info("Gathering info for {}", clusterActor.getNode());
771 getMemberStatus(clusterActor);
772 getShardStatus(clusterActor);
773 log.info("MemberInfo:\n{}", clusterActor);
774 } catch(IOException e) {
775 log.error("Connection Error", e);
776 memberMap.get(key).setUnreachable(true);
777 memberMap.get(key).setUp(false);
778 log.info("MemberInfo:\n{}", memberMap.get(key));
783 private void modifyIpTables(IpTables task, Object[] nodeInfo) {
784 log.info("Modifying IPTables rules...");
785 if(task == IpTables.ADD) {
786 for(Object node : nodeInfo) {
787 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
788 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
789 log.info("Isolating {}", n.getNode());
790 executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
791 executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
794 } else if(task == IpTables.DELETE) {
795 for(Object node : nodeInfo) {
796 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
797 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
798 log.info("De-isolating {}", n.getNode());
799 executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
800 executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
804 executeCommand("sudo /sbin/iptables -L");
807 private void executeCommand(String command) {
808 log.info("Executing command: {}", command);
809 String[] cmd = command.split(" ");
811 Process p = Runtime.getRuntime().exec(cmd);
812 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
814 StringBuilder content = new StringBuilder();
815 while((inputLine = bufferedReader.readLine()) != null) {
816 content.append(inputLine);
818 bufferedReader.close();
819 log.info("{}", content);
820 } catch(IOException e) {
821 log.error("Error executing command", e);
825 private boolean crossSiteHealthRequest(String path) throws IOException {
826 String content = getRequestContent(path, HttpMethod.POST);
828 JSONObject responseJson = new JSONObject(content);
829 JSONObject responseValue = responseJson.getJSONObject(OUTPUT);
830 return HEALTHY.equals(responseValue.getString("health"));
831 } catch(JSONException e) {
832 log.error("Error parsing JSON", e);
833 throw new IOException();
837 private String getAdminHealth() {
838 String protocol = "true".equals(properties.getProperty(PropertyKeys.ADM_USE_SSL)) ? "https://" : "http://";
839 String port = "true".equals(properties.getProperty(PropertyKeys.ADM_USE_SSL)) ? properties.getProperty(PropertyKeys.ADM_PORT_SSL) : properties.getProperty(PropertyKeys.ADM_PORT_HTTP);
840 String path = protocol + properties.getProperty(PropertyKeys.ADM_FQDN) + ":" + port + properties.getProperty(PropertyKeys.ADM_HEALTHCHECK);
841 log.info("Requesting healthcheck from {}", path);
843 int response = getRequestStatus(path, HttpMethod.GET);
844 log.info("Response: {}", response);
848 } catch(IOException e) {
849 log.error("Problem getting ADM health.", e);
854 private String getDatabaseHealth() {
855 log.info("Determining database health...");
857 Connection connection = dbLib.getConnection();
858 log.debug("DBLib isActive(): {}", dbLib.isActive());
859 log.debug("DBLib isReadOnly(): {}", connection.isReadOnly());
860 log.debug("DBLib isClosed(): {}", connection.isClosed());
861 if(!dbLib.isActive() || connection.isClosed() || connection.isReadOnly()) {
862 log.warn("Database is FAULTY");
867 log.info("Database is HEALTHY");
868 } catch(SQLException e) {
869 log.error("Database is FAULTY");
870 log.error("Error", e);
877 private String getRequestContent(String path, HttpMethod method) throws IOException {
878 return getRequestContent(path, method, null);
881 private String getRequestContent(String path, HttpMethod method, String input) throws IOException {
882 HttpURLConnection connection = getConnection(path);
883 connection.setRequestMethod(method.getMethod());
884 connection.setDoInput(true);
887 sendPayload(input, connection);
890 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
892 StringBuilder content = new StringBuilder();
893 while((inputLine = bufferedReader.readLine()) != null) {
894 content.append(inputLine);
896 bufferedReader.close();
897 connection.disconnect();
899 String response = content.toString();
900 log.debug("getRequestContent(): Response:\n{}", response);
904 private int getRequestStatus(String path, HttpMethod method) throws IOException {
905 return getRequestStatus(path, method, null);
908 private int getRequestStatus(String path, HttpMethod method, String input) throws IOException {
909 HttpURLConnection connection = getConnection(path);
910 connection.setRequestMethod(method.getMethod());
911 connection.setDoInput(true);
914 sendPayload(input, connection);
916 int response = connection.getResponseCode();
917 log.info("Received {} response code from {}", response, path);
918 connection.disconnect();
922 private void sendPayload(String input, HttpURLConnection connection) throws IOException {
923 byte[] out = input.getBytes(StandardCharsets.UTF_8);
924 int length = out.length;
926 connection.setFixedLengthStreamingMode(length);
927 connection.setRequestProperty("Content-Type", "application/json");
928 connection.setDoOutput(true);
929 connection.connect();
930 try(OutputStream os = connection.getOutputStream()) {
935 private HttpURLConnection getConnection(String host) throws IOException {
936 log.info("Getting connection to: {}", host);
937 URL url = new URL(host);
938 String auth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(credentials.getBytes());
939 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
940 connection.addRequestProperty("Authorization", auth);
941 connection.setRequestProperty("Connection", "keep-alive");
942 connection.setRequestProperty("Proxy-Connection", "keep-alive");
943 connection.setConnectTimeout(CONNECTION_TIMEOUT);
944 connection.setReadTimeout(CONNECTION_TIMEOUT);
953 enum SiteConfiguration {
963 private String method;
964 HttpMethod(String method) {
965 this.method = method;
967 public String getMethod() {
973 static final String SITE_IDENTIFIER = "site.identifier";
974 static final String CONTROLLER_USE_SSL = "controller.useSsl";
975 static final String CONTROLLER_PORT_SSL = "controller.port.ssl";
976 static final String CONTROLLER_PORT_HTTP = "controller.port.http";
977 static final String CONTROLLER_PORT_AKKA = "controller.port.akka";
978 static final String CONTROLLER_CREDENTIALS = "controller.credentials";
979 static final String AKKA_CONF_LOCATION = "akka.conf.location";
980 static final String MBEAN_CLUSTER = "mbean.cluster";
981 static final String MBEAN_SHARD_MANAGER = "mbean.shardManager";
982 static final String MBEAN_SHARD_CONFIG = "mbean.shard.config";
983 static final String ADM_USE_SSL = "adm.useSsl";
984 static final String ADM_PORT_SSL = "adm.port.ssl";
985 static final String ADM_PORT_HTTP = "adm.port.http";
986 static final String ADM_FQDN = "adm.fqdn";
987 static final String ADM_HEALTHCHECK= "adm.healthcheck";