2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.plugins;
24 import java.io.BufferedReader;
26 import java.io.FileInputStream;
27 import java.io.FileReader;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.io.OutputStream;
31 import java.net.HttpURLConnection;
33 import java.nio.charset.StandardCharsets;
34 import java.sql.SQLException;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.HashMap;
38 import java.util.Properties;
39 import java.util.List;
40 import java.util.concurrent.ExecutionException;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.regex.Matcher;
44 import java.util.regex.Pattern;
45 import javax.annotation.Nonnull;
47 import com.google.common.util.concurrent.CheckedFuture;
48 import com.google.common.util.concurrent.Futures;
49 import com.google.common.util.concurrent.ListenableFuture;
51 import org.onap.ccsdk.sli.core.dblib.DbLibService;
52 import org.onap.ccsdk.sli.plugins.data.ClusterActor;
53 import org.onap.ccsdk.sli.plugins.data.MemberBuilder;
55 import org.json.JSONArray;
56 import org.json.JSONException;
57 import org.json.JSONObject;
59 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
60 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
61 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
62 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
63 import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
64 import org.opendaylight.controller.md.sal.common.api.data.TransactionCommitFailedException;
65 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
66 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
67 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
68 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
69 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
70 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
71 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
72 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
73 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
74 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
75 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
76 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
77 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
78 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
79 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
80 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
81 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
82 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
83 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
84 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
85 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
86 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
87 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
88 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
89 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
90 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
91 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
92 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
93 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
94 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
95 import org.opendaylight.yangtools.yang.common.RpcResult;
96 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
98 import org.slf4j.Logger;
99 import org.slf4j.LoggerFactory;
101 public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
102 private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
103 private static final String HEALTHY = "HEALTHY";
104 private static final String FAULTY = "FAULTY";
105 private static String AKKA_CONFIG;
106 private static String JOLOKIA_CLUSTER_PATH;
107 private static String SHARD_MANAGER_PATH;
108 private static String SHARD_PATH_TEMPLATE;
109 private static String CREDENTIALS;
110 private static String HTTP_PROTOCOL;
111 private static String SITE_IDENTIFIER = System.getenv("SITE_NAME");
112 private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
113 private final String appName = "gr-toolkit";
114 private final ExecutorService executor;
115 protected DataBroker dataBroker;
116 protected NotificationPublishService notificationService;
117 protected RpcProviderRegistry rpcRegistry;
118 protected BindingAwareBroker.RpcRegistration<GrToolkitService> rpcRegistration;
119 protected DbLibService dbLib;
120 private String member;
121 private ClusterActor self;
122 private HashMap<String, ClusterActor> members;
123 private SiteConfiguration siteConfiguration;
124 private Properties properties;
125 private DistributedDataStoreInterface configDatastore;
126 public GrToolkitProvider(DataBroker dataBroker,
127 NotificationPublishService notificationProviderService,
128 RpcProviderRegistry rpcProviderRegistry,
129 DistributedDataStoreInterface configDatastore,
130 DbLibService dbLibService) {
131 this.log.info("Creating provider for " + appName);
132 this.executor = Executors.newFixedThreadPool(1);
133 this.dataBroker = dataBroker;
134 this.notificationService = notificationProviderService;
135 this.rpcRegistry = rpcProviderRegistry;
136 this.configDatastore = configDatastore;
137 this.dbLib = dbLibService;
141 public void initialize() {
142 log.info("Initializing provider for " + appName);
143 // Create the top level containers
146 GrToolkitUtil.loadProperties();
147 } catch (Exception e) {
148 log.error("Caught Exception while trying to load properties file.", e);
154 rpcRegistration = rpcRegistry.addRpcImplementation(GrToolkitService.class, this);
155 log.info("Initialization complete for " + appName);
158 private void setProperties() {
159 log.info("Loading properties from " + PROPERTIES_FILE);
160 properties = new Properties();
161 File propertiesFile = new File(PROPERTIES_FILE);
162 if(!propertiesFile.exists()) {
163 log.warn("Properties file not found.");
166 try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
167 properties.load(fileInputStream);
168 if(!properties.containsKey("site.identifier")) {
169 properties.put("site.identifier", "Unknown Site");
171 String port = "true".equals(properties.getProperty("controller.useSsl").trim()) ? properties.getProperty("controller.port.ssl").trim() : properties.getProperty("controller.port.http").trim();
172 HTTP_PROTOCOL = "true".equals(properties.getProperty("controller.useSsl").trim()) ? "https://" : "http://";
173 AKKA_CONFIG = properties.getProperty("akka.conf.location").trim();
174 JOLOKIA_CLUSTER_PATH = ":" + port + properties.getProperty("mbean.cluster").trim();
175 SHARD_MANAGER_PATH = ":" + port + properties.getProperty("mbean.shardManager").trim();
176 SHARD_PATH_TEMPLATE = ":" + port + properties.getProperty("mbean.shard.config").trim();
177 if(SITE_IDENTIFIER == null || SITE_IDENTIFIER.isEmpty()) {
178 SITE_IDENTIFIER = properties.getProperty("site.identifier").trim();
180 CREDENTIALS = properties.getProperty("controller.credentials").trim();
181 log.info("Loaded properties.");
182 } catch(IOException e) {
183 log.error("Error loading properties.", e);
187 private void defineMembers() {
188 member = configDatastore.getActorContext().getCurrentMemberName().getName();
189 log.info("Cluster member: " + member);
191 log.info("Parsing akka.conf for cluster members...");
193 File akkaConfig = new File(AKKA_CONFIG);
194 FileReader fileReader = new FileReader(akkaConfig);
195 BufferedReader bufferedReader = new BufferedReader(fileReader);
197 while((line = bufferedReader.readLine()) != null) {
198 if(line.contains("seed-nodes =")) {
199 parseSeedNodes(line);
203 bufferedReader.close();
205 } catch(IOException e) {
206 log.error("Couldn't load akka", e);
208 log.info("self:\n{}", self.toString());
211 private void createContainers() {
212 final WriteTransaction t = dataBroker.newReadWriteTransaction();
214 CheckedFuture<Void, TransactionCommitFailedException>checkedFuture = t.submit();
216 log.info("Create Containers succeeded!");
217 } catch (InterruptedException | ExecutionException e) {
218 log.error("Create Containers Failed: " + e);
219 log.error("context", e);
223 protected void initializeChild() {
224 // Override if you have custom initialization intelligence
228 public void close() throws Exception {
229 log.info("Closing provider for " + appName);
231 rpcRegistration.close();
232 log.info("Successfully closed provider for " + appName);
236 public void onDataTreeChanged(@Nonnull Collection changes) {
237 log.info("onDataTreeChanged() called. but there is no change here");
241 public ListenableFuture<RpcResult<ClusterHealthOutput>> clusterHealth(ClusterHealthInput input) {
242 log.info(appName + ":cluster-health invoked.");
243 getControllerHealth();
244 return buildClusterHealthOutput("200");
248 public ListenableFuture<RpcResult<SiteHealthOutput>> siteHealth(SiteHealthInput input) {
249 log.info(appName + ":site-health invoked.");
250 getControllerHealth();
251 return buildSiteHealthOutput("200", getAdminHealth(), getDatabaseHealth());
255 public ListenableFuture<RpcResult<DatabaseHealthOutput>> databaseHealth(DatabaseHealthInput input) {
256 log.info(appName + ":database-health invoked.");
257 DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
258 outputBuilder.setStatus("200");
259 outputBuilder.setHealth(getDatabaseHealth());
261 return Futures.immediateFuture(RpcResultBuilder.<DatabaseHealthOutput>status(true).withResult(outputBuilder.build()).build());
265 public ListenableFuture<RpcResult<AdminHealthOutput>> adminHealth(AdminHealthInput input) {
266 log.info(appName + ":admin-health invoked.");
267 AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
268 outputBuilder.setStatus("200");
269 outputBuilder.setHealth(getAdminHealth());
271 return Futures.immediateFuture(RpcResultBuilder.<AdminHealthOutput>status(true).withResult(outputBuilder.build()).build());
275 public ListenableFuture<RpcResult<HaltAkkaTrafficOutput>> haltAkkaTraffic(HaltAkkaTrafficInput input) {
276 log.info(appName + ":halt-akka-traffic invoked.");
277 HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
278 outputBuilder.setStatus("200");
279 modifyIpTables(IpTables.Add, input.getNodeInfo().toArray());
281 return Futures.immediateFuture(RpcResultBuilder.<HaltAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
285 public ListenableFuture<RpcResult<ResumeAkkaTrafficOutput>> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
286 log.info(appName + ":resume-akka-traffic invoked.");
287 ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
288 outputBuilder.setStatus("200");
289 modifyIpTables(IpTables.Delete, input.getNodeInfo().toArray());
291 return Futures.immediateFuture(RpcResultBuilder.<ResumeAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
295 public ListenableFuture<RpcResult<SiteIdentifierOutput>> siteIdentifier(SiteIdentifierInput input) {
296 log.info(appName + ":site-identifier invoked.");
297 SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
298 outputBuilder.setStatus("200");
299 outputBuilder.setId(SITE_IDENTIFIER);
301 return Futures.immediateFuture(RpcResultBuilder.<SiteIdentifierOutput>status(true).withResult(outputBuilder.build()).build());
305 public ListenableFuture<RpcResult<FailoverOutput>> failover(FailoverInput input) {
306 log.info(appName + ":failover invoked.");
307 FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
308 if(siteConfiguration != SiteConfiguration.Geo) {
309 log.info("Cannot failover non-Geo site.");
310 outputBuilder.setMessage("Failover aborted. This is not a Geo configuration.");
311 outputBuilder.setStatus("400");
312 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
314 ArrayList<ClusterActor> activeSite = new ArrayList<>();
315 ArrayList<ClusterActor> standbySite = new ArrayList<>();
317 log.info("Performing preliminary cluster health check...");
318 // Necessary to populate all member info. Health is not used for judgement calls.
319 getControllerHealth();
321 log.info("Determining active site...");
322 for(String key : members.keySet()) {
323 if(members.get(key).isVoting()) {
324 activeSite.add(members.get(key));
325 log.debug("Active Site member: " + key);
328 standbySite.add(members.get(key));
329 log.debug("Standby Site member: " + key);
333 String port = "true".equals(properties.getProperty("controller.useSsl")) ? properties.getProperty("controller.port.ssl") : properties.getProperty("controller.port.http");
335 if(Boolean.parseBoolean(input.getBackupData())) {
336 log.info("Backing up data...");
337 for(ClusterActor actor : activeSite) {
340 log.info("Scheduling backup for: " + actor.getNode());
341 getRequestContent(HTTP_PROTOCOL + actor.getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", HttpMethod.Post, "");
344 log.info("Backing up data for: " + actor.getNode());
345 getRequestContent(HTTP_PROTOCOL + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", HttpMethod.Post);
346 } catch(IOException e) {
347 log.error("Error backing up data.", e);
351 catch(IOException e) {
352 log.error("Error exporting MD-SAL data.", e);
357 log.info("Changing voting for all shards to standby site...");
359 JSONObject votingInput = new JSONObject();
360 JSONObject inputBlock = new JSONObject();
361 JSONArray votingStateArray = new JSONArray();
362 JSONObject memberVotingState;
363 for(ClusterActor actor : activeSite) {
364 memberVotingState = new JSONObject();
365 memberVotingState.put("member-name", actor.getMember());
366 memberVotingState.put("voting", false);
367 votingStateArray.put(memberVotingState);
369 for(ClusterActor actor : standbySite) {
370 memberVotingState = new JSONObject();
371 memberVotingState.put("member-name", actor.getMember());
372 memberVotingState.put("voting", true);
373 votingStateArray.put(memberVotingState);
375 inputBlock.put("member-voting-state", votingStateArray);
376 votingInput.put("input", inputBlock);
377 log.debug(votingInput.toString(2));
378 // Change voting all shards
379 getRequestContent(HTTP_PROTOCOL + self.getNode() + ":" + port + "/restconf/operations/cluster-admin:change-member-voting-states-for-all-shards", HttpMethod.Post, votingInput.toString());
380 } catch(IOException e) {
381 log.error("Changing voting", e);
382 outputBuilder.setMessage("Failover aborted. Failed to change voting.");
383 outputBuilder.setStatus("500");
384 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
387 if(Boolean.parseBoolean(input.getIsolate())) {
388 log.info("Halting Akka traffic...");
389 for(ClusterActor actor : standbySite) {
391 log.info("Halting Akka traffic for: " + actor.getNode());
392 // Build JSON with activeSite actor.getNode() and actor.getAkkaPort();
393 JSONObject akkaInput = new JSONObject();
394 JSONObject inputBlock = new JSONObject();
395 JSONArray votingStateArray = new JSONArray();
397 for(ClusterActor node : activeSite) {
398 nodeInfo = new JSONObject();
399 nodeInfo.put("node", node.getNode());
400 nodeInfo.put("port", node.getAkkaPort());
401 votingStateArray.put(nodeInfo);
403 inputBlock.put("node-info", votingStateArray);
404 akkaInput.put("input", inputBlock);
405 getRequestContent(HTTP_PROTOCOL + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", HttpMethod.Post, akkaInput.toString());
406 } catch(IOException e) {
407 log.error("Could not halt Akka traffic for: " + actor.getNode(), e);
411 if(Boolean.parseBoolean(input.getDownUnreachable())) {
412 log.info("Setting site unreachable...");
413 JSONObject jolokiaInput = new JSONObject();
414 jolokiaInput.put("type", "EXEC");
415 jolokiaInput.put("mbean", "akka:type=Cluster");
416 jolokiaInput.put("operation", "down");
417 JSONArray arguments = new JSONArray();
418 for(ClusterActor actor : activeSite) {
419 // Build Jolokia input
420 //TODO: May need to change from akka port to actor.getAkkaPort()
421 arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty("controller.port.akka"));
423 jolokiaInput.put("arguments", arguments);
424 log.debug(jolokiaInput.toString(2));
426 log.info("Setting nodes unreachable");
427 getRequestContent(HTTP_PROTOCOL + standbySite.get(0).getNode() + ":" + port + "/jolokia", HttpMethod.Post, jolokiaInput.toString());
428 } catch(IOException e) {
429 log.error("Error setting nodes unreachable", e);
434 log.info(appName + ":failover complete.");
436 outputBuilder.setMessage("Failover complete.");
437 outputBuilder.setStatus("200");
438 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
441 private ListenableFuture<RpcResult<ClusterHealthOutput>> buildClusterHealthOutput(String statusCode) {
442 ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
443 outputBuilder.setStatus(statusCode);
444 outputBuilder.setMembers((List) new ArrayList<Member>());
448 for(String key : members.keySet()) {
449 if(members.get(key).isUp() && !members.get(key).isUnreachable()) {
450 if(ClusterActor.SITE_1.equals(members.get(key).getSite()))
452 else if(ClusterActor.SITE_2.equals(members.get(key).getSite()))
455 outputBuilder.getMembers().add(new MemberBuilder(members.get(key)).build());
457 if(siteConfiguration == SiteConfiguration.Solo) {
458 outputBuilder.setSite1Health(HEALTHY);
461 if(site1Health > 1) {
462 outputBuilder.setSite1Health(HEALTHY);
465 outputBuilder.setSite1Health(FAULTY);
468 if(siteConfiguration == SiteConfiguration.Geo) {
469 if(site2Health > 1) {
470 outputBuilder.setSite2Health(HEALTHY);
473 outputBuilder.setSite2Health(FAULTY);
477 RpcResult<ClusterHealthOutput> rpcResult = RpcResultBuilder.<ClusterHealthOutput>status(true).withResult(outputBuilder.build()).build();
478 return Futures.immediateFuture(rpcResult);
481 private ListenableFuture<RpcResult<SiteHealthOutput>> buildSiteHealthOutput(String statusCode, String adminHealth, String databaseHealth) {
482 SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
483 outputBuilder.setStatus(statusCode);
484 outputBuilder.setSites((List) new ArrayList<Site>());
486 if(siteConfiguration != SiteConfiguration.Geo) {
488 SitesBuilder builder = new SitesBuilder();
489 for(String key : members.keySet()) {
490 if(members.get(key).isUp() && !members.get(key).isUnreachable()) {
494 if(siteConfiguration != SiteConfiguration.Solo) {
495 builder.setHealth(HEALTHY);
496 builder.setRole("ACTIVE");
497 builder.setId(SITE_IDENTIFIER);
500 builder = getSitesBuilder(healthyODLs, true, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), SITE_IDENTIFIER);
502 outputBuilder.getSites().add(builder.build());
505 int site1HealthyODLs = 0;
506 int site2HealthyODLs = 0;
507 boolean site1Voting = false;
508 boolean site2Voting = false;
509 boolean performedCrossSiteHealthCheck = false;
510 boolean crossSiteAdminHealthy = false;
511 boolean crossSiteDbHealthy = false;
512 String crossSiteIdentifier = "UNKNOWN_SITE";
513 String port = "true".equals(properties.getProperty("controller.useSsl")) ? properties.getProperty("controller.port.ssl") : properties.getProperty("controller.port.http");
515 // Make calls over to site 2 healthchecks
516 for(String key : members.keySet()) {
517 if(members.get(key).isUp() && !members.get(key).isUnreachable()) {
518 if(ClusterActor.SITE_1.equals(members.get(key).getSite())) {
520 if(members.get(key).isVoting()) {
526 if(members.get(key).isVoting()) {
529 if(!performedCrossSiteHealthCheck) {
531 String content = getRequestContent(HTTP_PROTOCOL + members.get(key).getNode() + ":" + port + "/restconf/operations/gr-toolkit:site-identifier", HttpMethod.Post);
532 crossSiteIdentifier = new JSONObject(content).getJSONObject("output").getString("id");
533 crossSiteDbHealthy = crossSiteHealthRequest(HTTP_PROTOCOL + members.get(key).getNode() + ":" + port + "/restconf/operations/gr-toolkit:database-health");
534 crossSiteAdminHealthy = crossSiteHealthRequest(HTTP_PROTOCOL + members.get(key).getNode() + ":" + port + "/restconf/operations/gr-toolkit:admin-health");
535 performedCrossSiteHealthCheck = true;
536 } catch(Exception e) {
537 log.error("Cannot get site identifier from " + members.get(key).getNode(), e);
543 SitesBuilder builder = getSitesBuilder(site1HealthyODLs, site1Voting, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), SITE_IDENTIFIER);
544 outputBuilder.getSites().add(builder.build());
545 builder = getSitesBuilder(site2HealthyODLs, site2Voting, crossSiteAdminHealthy, crossSiteDbHealthy, crossSiteIdentifier);
546 outputBuilder.getSites().add(builder.build());
549 // Make calls over to site 1 healthchecks
550 for(String key : members.keySet()) {
551 if(members.get(key).isUp() && !members.get(key).isUnreachable()) {
552 if(ClusterActor.SITE_1.equals(members.get(key).getSite())) {
554 if(members.get(key).isVoting()) {
557 if(!performedCrossSiteHealthCheck) {
559 String content = getRequestContent(HTTP_PROTOCOL + members.get(key).getNode() + ":" + port + "/restconf/operations/gr-toolkit:site-identifier", HttpMethod.Post);
560 crossSiteIdentifier = new JSONObject(content).getJSONObject("output").getString("id");
561 crossSiteDbHealthy = crossSiteHealthRequest(HTTP_PROTOCOL + members.get(key).getNode() + ":" + port + "/restconf/operations/gr-toolkit:database-health");
562 crossSiteAdminHealthy = crossSiteHealthRequest(HTTP_PROTOCOL + members.get(key).getNode() + ":" + port + "/restconf/operations/gr-toolkit:admin-health");
563 performedCrossSiteHealthCheck = true;
564 } catch(Exception e) {
565 log.error("Cannot get site identifier from " + members.get(key).getNode(), e);
571 if(members.get(key).isVoting()) {
578 SitesBuilder builder = getSitesBuilder(site1HealthyODLs, site1Voting, crossSiteAdminHealthy, crossSiteDbHealthy, crossSiteIdentifier);
579 outputBuilder.getSites().add(builder.build());
580 builder = getSitesBuilder(site2HealthyODLs, site2Voting, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), SITE_IDENTIFIER);
581 outputBuilder.getSites().add(builder.build());
585 RpcResult<SiteHealthOutput> rpcResult = RpcResultBuilder.<SiteHealthOutput>status(true).withResult(outputBuilder.build()).build();
586 return Futures.immediateFuture(rpcResult);
589 private SitesBuilder getSitesBuilder(int siteHealthyODLs, boolean siteVoting, boolean adminHealthy, boolean dbHealthy, String siteIdentifier) {
590 SitesBuilder builder = new SitesBuilder();
591 if(siteHealthyODLs > 1) {
592 builder.setHealth(HEALTHY);
595 log.warn(siteIdentifier + " Healthy ODLs: " + siteHealthyODLs);
596 builder.setHealth(FAULTY);
599 log.warn(siteIdentifier + " Admin Health: " + FAULTY);
600 builder.setHealth(FAULTY);
603 log.warn(siteIdentifier + " Database Health: " + FAULTY);
604 builder.setHealth(FAULTY);
607 builder.setRole("ACTIVE");
610 builder.setRole("STANDBY");
612 builder.setId(siteIdentifier);
616 private boolean isSite1() {
617 int memberNumber = Integer.parseInt(member.split("-")[1]);
618 boolean isSite1 = memberNumber < 4;
619 log.info("isSite1(): " + isSite1);
623 private void parseSeedNodes(String line) {
624 members = new HashMap<>();
625 line = line.substring(line.indexOf("[\""), line.indexOf("]"));
626 String[] splits = line.split(",");
628 for(int ndx = 0; ndx < splits.length; ndx++) {
629 String nodeName = splits[ndx];
630 int delimLocation = nodeName.indexOf("@");
631 String port = nodeName.substring(splits[ndx].indexOf(":", delimLocation) + 1, splits[ndx].indexOf("\"", splits[ndx].indexOf(":")));
632 splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(":", delimLocation));
633 log.info("Adding node: " + splits[ndx] + ":" + port);
634 ClusterActor clusterActor = new ClusterActor();
635 clusterActor.setNode(splits[ndx]);
636 clusterActor.setAkkaPort(port);
637 clusterActor.setMember("member-" + (ndx + 1));
639 clusterActor.setSite(ClusterActor.SITE_1);
642 clusterActor.setSite(ClusterActor.SITE_2);
645 if(member.equals(clusterActor.getMember())) {
648 members.put(clusterActor.getNode(), clusterActor);
649 log.info(clusterActor.toString());
652 if(members.size() == 1) {
653 log.info("1 member found. This is a solo environment.");
654 siteConfiguration = SiteConfiguration.Solo;
656 else if(members.size() == 3) {
657 log.info("This is a single site.");
658 siteConfiguration = SiteConfiguration.Single;
660 else if(members.size() == 6) {
661 log.info("This is a georedundant site.");
662 siteConfiguration = SiteConfiguration.Geo;
666 private void getMemberStatus(ClusterActor clusterActor) throws IOException {
667 log.info("Getting member status for " + clusterActor.getNode());
668 String content = getRequestContent(HTTP_PROTOCOL + clusterActor.getNode() + JOLOKIA_CLUSTER_PATH, HttpMethod.Get);
670 JSONObject responseJson = new JSONObject(content);
671 JSONObject responseValue = responseJson.getJSONObject("value");
672 clusterActor.setUp("Up".equals(responseValue.getString("MemberStatus")));
673 clusterActor.setUnreachable(false);
674 } catch(JSONException e) {
675 log.error("Error parsing response from " + clusterActor.getNode(), e);
676 clusterActor.setUp(false);
677 clusterActor.setUnreachable(true);
681 private void getShardStatus(ClusterActor clusterActor) throws IOException {
682 log.info("Getting shard status for " + clusterActor.getNode());
683 String content = getRequestContent(HTTP_PROTOCOL + clusterActor.getNode() + SHARD_MANAGER_PATH, HttpMethod.Get);
685 JSONObject responseValue = new JSONObject(content).getJSONObject("value");
686 JSONArray shardList = responseValue.getJSONArray("LocalShards");
688 String pattern = "-config$";
689 Pattern r = Pattern.compile(pattern);
691 for(int ndx = 0; ndx < shardList.length(); ndx++) {
692 String configShardName = shardList.getString(ndx);
693 m = r.matcher(configShardName);
694 String operationalShardName = m.replaceFirst("-operational");
695 String shardConfigPath = String.format(SHARD_PATH_TEMPLATE, configShardName);
696 String shardOperationalPath = String.format(SHARD_PATH_TEMPLATE, operationalShardName).replace("Config", "Operational");
697 extractShardInfo(clusterActor, configShardName, shardConfigPath);
698 extractShardInfo(clusterActor, operationalShardName, shardOperationalPath);
700 } catch(JSONException e) {
701 log.error("Error parsing response from " + clusterActor.getNode(), e);
705 private void extractShardInfo(ClusterActor clusterActor, String shardName, String shardPath) throws IOException {
706 log.info("Extracting shard info for " + shardName);
707 log.debug("Pulling config info for " + shardName + " from: " + shardPath);
708 String content = getRequestContent(HTTP_PROTOCOL + clusterActor.getNode() + shardPath, HttpMethod.Get);
709 log.debug("Response: " + content);
712 JSONObject shardValue = new JSONObject(content).getJSONObject("value");
713 clusterActor.setVoting(shardValue.getBoolean("Voting"));
714 if(shardValue.getString("PeerAddresses").length() > 0) {
715 clusterActor.getReplicaShards().add(shardName);
716 if(shardValue.getString("Leader").startsWith(clusterActor.getMember())) {
717 clusterActor.getShardLeader().add(shardName);
721 clusterActor.getNonReplicaShards().add(shardName);
723 JSONArray followerInfo = shardValue.getJSONArray("FollowerInfo");
724 for(int followerNdx = 0; followerNdx < followerInfo.length(); followerNdx++) {
725 int commitIndex = shardValue.getInt("CommitIndex");
726 int matchIndex = followerInfo.getJSONObject(followerNdx).getInt("matchIndex");
727 if(commitIndex != -1 && matchIndex != -1) {
728 int commitsBehind = commitIndex - matchIndex;
729 clusterActor.getCommits().put(followerInfo.getJSONObject(followerNdx).getString("id"), commitsBehind);
732 } catch(JSONException e) {
733 log.error("Error parsing response from " + clusterActor.getNode(), e);
737 private void getControllerHealth() {
738 ClusterActor clusterActor;
739 for(String key : members.keySet()) {
741 clusterActor = members.get(key);
742 // First flush out the old values
743 clusterActor.flush();
744 log.info("Gathering info for " + clusterActor.getNode());
745 getMemberStatus(clusterActor);
746 getShardStatus(clusterActor);
747 log.info("MemberInfo:\n" + clusterActor.toString());
748 } catch(IOException e) {
749 log.error("Connection Error", e);
750 members.get(key).setUnreachable(true);
751 members.get(key).setUp(false);
752 log.info("MemberInfo:\n" + members.get(key).toString());
757 private void modifyIpTables(IpTables task, Object[] nodeInfo) {
758 log.info("Modifying IPTables rules...");
762 for(Object node : nodeInfo) {
763 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
764 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
765 log.info("Isolating " + n.getNode());
766 executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get("controller.port.akka"), n.getNode()));
767 executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -s %s", n.getPort(), n.getNode()));
771 for(Object node : nodeInfo) {
772 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
773 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
774 log.info("De-isolating " + n.getNode());
775 executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get("controller.port.akka"), n.getNode()));
776 executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -s %s", n.getPort(), n.getNode()));
780 executeCommand("sudo /sbin/iptables -L");
783 private void executeCommand(String command) {
784 log.info("Executing command: " + command);
785 String[] cmd = command.split(" ");
787 Process p = Runtime.getRuntime().exec(cmd);
788 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
790 StringBuffer content = new StringBuffer();
791 while((inputLine = bufferedReader.readLine()) != null) {
792 content.append(inputLine);
794 bufferedReader.close();
795 log.info(content.toString());
796 } catch(IOException e) {
797 log.error("Error executing command", e);
801 private boolean crossSiteHealthRequest(String path) throws IOException {
802 String content = getRequestContent(path, HttpMethod.Post);
804 JSONObject responseJson = new JSONObject(content);
805 JSONObject responseValue = responseJson.getJSONObject("value");
806 return HEALTHY.equals(responseValue.getString("health"));
807 } catch(JSONException e) {
808 log.error("Error parsing JSON", e);
809 throw new IOException();
813 private String getAdminHealth() {
814 String protocol = "true".equals(properties.getProperty("adm.useSsl")) ? "https://" : "http://";
815 String port = "true".equals(properties.getProperty("adm.useSsl")) ? properties.getProperty("adm.port.ssl") : properties.getProperty("adm.port.http");
816 String path = protocol + properties.getProperty("adm.fqdn") + ":" + port + properties.getProperty("adm.healthcheck");
817 log.info("Requesting healthcheck from " + path);
819 int response = getRequestStatus(path, HttpMethod.Get);
820 log.info("Response: " + response);
824 } catch(IOException e) {
825 log.error("Problem getting ADM health.", e);
830 private String getDatabaseHealth() {
831 log.info("Determining database health...");
833 log.info("DBLib isActive(): " + dbLib.isActive());
834 log.info("DBLib isReadOnly(): " + dbLib.getConnection().isReadOnly());
835 log.info("DBLib isClosed(): " + dbLib.getConnection().isClosed());
836 if(!dbLib.isActive() || dbLib.getConnection().isClosed() || dbLib.getConnection().isReadOnly()) {
837 log.warn("Database is FAULTY");
840 log.info("Database is HEALTHY");
841 } catch(SQLException e) {
842 log.error("Database is FAULTY");
843 log.error("Error", e);
850 private String getRequestContent(String path, HttpMethod method) throws IOException {
851 return getRequestContent(path, method, null);
854 private String getRequestContent(String path, HttpMethod method, String input) throws IOException {
855 HttpURLConnection connection = getConnection(path);
856 connection.setRequestMethod(method.getMethod());
857 connection.setDoInput(true);
860 sendPayload(input, connection);
863 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
865 StringBuffer content = new StringBuffer();
866 while((inputLine = bufferedReader.readLine()) != null) {
867 content.append(inputLine);
869 bufferedReader.close();
870 connection.disconnect();
871 return content.toString();
874 private int getRequestStatus(String path, HttpMethod method) throws IOException {
875 return getRequestStatus(path, method, null);
878 private int getRequestStatus(String path, HttpMethod method, String input) throws IOException {
879 HttpURLConnection connection = getConnection(path);
880 connection.setRequestMethod(method.getMethod());
881 connection.setDoInput(true);
884 sendPayload(input, connection);
886 int response = connection.getResponseCode();
887 log.info("Received " + response + " response code from " + path);
888 connection.disconnect();
892 private void sendPayload(String input, HttpURLConnection connection) throws IOException {
893 byte[] out = input.getBytes(StandardCharsets.UTF_8);
894 int length = out.length;
896 connection.setFixedLengthStreamingMode(length);
897 connection.setRequestProperty("Content-Type", "application/json");
898 connection.setDoOutput(true);
899 connection.connect();
900 try(OutputStream os = connection.getOutputStream()) {
905 private HttpURLConnection getConnection(String host) throws IOException {
906 log.info("Getting connection to: " + host);
907 URL url = new URL(host);
908 String auth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(CREDENTIALS.getBytes());
909 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
910 connection.addRequestProperty("Authorization", auth);
911 connection.setRequestProperty("Connection", "keep-alive");
912 connection.setRequestProperty("Proxy-Connection", "keep-alive");
916 private enum IpTables {
921 private enum SiteConfiguration {
927 private enum HttpMethod {
931 private String method;
932 HttpMethod(String method) {
933 this.method = method;
935 public String getMethod() {