/*-
* ============LICENSE_START=======================================================
* openECOMP : SDN-C
* ================================================================================
* Copyright (C) 2018 AT&T Intellectual Property. All rights
* reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* ============LICENSE_END=========================================================
*/
package org.onap.ccsdk.sli.plugins.grtoolkit;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nonnull;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.commons.lang.StringUtils;
import org.onap.ccsdk.sli.core.dblib.DbLibService;
import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionManager;
import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionResponse;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.AdminHealth;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.ClusterActor;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.DatabaseHealth;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.FailoverStatus;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.Health;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.MemberBuilder;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.PropertyKeys;
import org.onap.ccsdk.sli.plugins.grtoolkit.data.SiteHealth;
import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.HealthResolver;
import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SingleNodeHealthResolver;
import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SixNodeHealthResolver;
import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.ThreeNodeHealthResolver;
import org.json.JSONArray;
import org.json.JSONObject;
import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
import org.opendaylight.yangtools.yang.common.RpcResult;
import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* API implementation of the {@code GrToolkitService} interface generated from
* the gr-toolkit.yang model. The RPCs contained within this class are meant to
* run in an architecture agnostic fashion, where the response is repeatable
* and predictable across any given node configuration. To facilitate this,
* health checking and failover logic has been abstracted into the
* {@code HealthResolver} classes.
*
* Anyone who wishes to write a custom resolver for use with GR Toolkit should
* extend the {@code HealthResolver} class. The currently provided resolvers
* are useful references for further implementation.
*
* @author Anthony Haddox
* @see GrToolkitService
* @see HealthResolver
* @see SingleNodeHealthResolver
* @see ThreeNodeHealthResolver
* @see SixNodeHealthResolver
*/
public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
private static final String APP_NAME = "gr-toolkit";
private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
private String akkaConfig;
private String httpProtocol;
private String siteIdentifier = System.getenv("SITE_NAME");
private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
private final ExecutorService executor;
protected DataBroker dataBroker;
protected NotificationPublishService notificationService;
protected RpcProviderRegistry rpcRegistry;
protected BindingAwareBroker.RpcRegistration rpcRegistration;
protected DbLibService dbLib;
private String member;
private ClusterActor self;
private HashMap memberMap;
private Properties properties;
private DistributedDataStoreInterface configDatastore;
private HealthResolver resolver;
/**
* Constructs the provider for the GR Toolkit API. Dependencies are
* injected using the GrToolkit.xml blueprint.
*
* @param dataBroker The Data Broker
* @param notificationProviderService The Notification Service
* @param rpcProviderRegistry The RPC Registry
* @param configDatastore The Configuration Data Store provided by the controller
* @param dbLibService Reference to the controller provided DbLibService
*/
public GrToolkitProvider(DataBroker dataBroker,
NotificationPublishService notificationProviderService,
RpcProviderRegistry rpcProviderRegistry,
DistributedDataStoreInterface configDatastore,
DbLibService dbLibService) {
log.info("Creating provider for {}", APP_NAME);
this.executor = Executors.newFixedThreadPool(1);
this.dataBroker = dataBroker;
this.notificationService = notificationProviderService;
this.rpcRegistry = rpcProviderRegistry;
this.configDatastore = configDatastore;
this.dbLib = dbLibService;
initialize();
}
/**
* Initializes some structures necessary to hold health check information
* and perform failovers.
*/
private void initialize() {
log.info("Initializing provider for {}", APP_NAME);
createContainers();
setProperties();
defineMembers();
rpcRegistration = rpcRegistry.addRpcImplementation(GrToolkitService.class, this);
log.info("Initialization complete for {}", APP_NAME);
}
/**
* Creates the {@code Properties} object with the contents of
* gr-toolkit.properties, found at the {@code SDNC_CONFIG_DIR} directory,
* which should be set as an environment variable. If the properties file
* is not found, GR Toolkit will not function.
*/
private void setProperties() {
log.info("Loading properties from {}", PROPERTIES_FILE);
properties = new Properties();
File propertiesFile = new File(PROPERTIES_FILE);
if(!propertiesFile.exists()) {
log.warn("setProperties(): Properties file not found.");
} else {
try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
properties.load(fileInputStream);
if(!properties.containsKey(PropertyKeys.SITE_IDENTIFIER)) {
properties.put(PropertyKeys.SITE_IDENTIFIER, "Unknown Site");
}
httpProtocol = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? "https://" : "http://";
akkaConfig = properties.getProperty(PropertyKeys.AKKA_CONF_LOCATION).trim();
if(StringUtils.isEmpty(siteIdentifier)) {
siteIdentifier = properties.getProperty(PropertyKeys.SITE_IDENTIFIER).trim();
}
log.info("setProperties(): Loaded properties.");
} catch(IOException e) {
log.error("setProperties(): Error loading properties.", e);
}
}
}
/**
* Parses the akka.conf file used by the controller to define an akka
* cluster. This method requires the seed-nodes definition to exist
* on a single line.
*/
private void defineMembers() {
member = configDatastore.getActorUtils().getCurrentMemberName().getName();
log.info("defineMembers(): Cluster member: {}", member);
log.info("defineMembers(): Parsing akka.conf for cluster memberMap...");
try {
File akkaConfigFile = new File(this.akkaConfig);
try(FileReader fileReader = new FileReader(akkaConfigFile);
BufferedReader bufferedReader = new BufferedReader(fileReader)) {
String line;
while((line = bufferedReader.readLine()) != null) {
if(line.contains("seed-nodes =")) {
parseSeedNodes(line);
break;
}
}
}
} catch(IOException e) {
log.error("defineMembers(): Couldn't load akka", e);
} catch(NullPointerException e) {
log.error("defineMembers(): akkaConfig is null. Check properties file and restart {} bundle.", APP_NAME);
log.error("defineMembers(): NullPointerException", e);
}
log.info("self:\n{}", self);
}
/**
* Sets up the {@code InstanceIdentifier}s for Data Store transactions.
*/
private void createContainers() {
// Replace with MD-SAL write for FailoverStatus
}
/**
* Shuts down the {@code ExecutorService} and closes the RPC Provider Registry.
*/
@Override
public void close() throws Exception {
log.info("Closing provider for {}", APP_NAME);
executor.shutdown();
rpcRegistration.close();
log.info("close(): Successfully closed provider for {}", APP_NAME);
}
/**
* Listens for changes to the Data tree.
*
* @param changes Data tree changes.
*/
@Override
public void onDataTreeChanged(@Nonnull Collection changes) {
log.info("onDataTreeChanged(): No changes.");
}
/**
* Makes a call to {@code resolver.getClusterHealth()} to determine the
* health of the akka clustered controllers.
*
* @param input request body adhering to the model for
* {@code ClusterHealthInput}
* @return response adhering to the model for {@code ClusterHealthOutput}
* @see HealthResolver
* @see ClusterHealthInput
* @see ClusterHealthOutput
*/
@Override
public ListenableFuture> clusterHealth(ClusterHealthInput input) {
log.info("{}:cluster-health invoked.", APP_NAME);
resolver.getClusterHealth();
return buildClusterHealthOutput();
}
/**
* Makes a call to {@code resolver.getSiteHealth()} to determine the health
* of all of the application components of a site. In a multi-site config,
* this will gather the health of all sites.
*
* @param input request body adhering to the model for
* {@code SiteHealthInput}
* @return response adhering to the model for {@code SiteHealthOutput}
* @see HealthResolver
* @see SiteHealthInput
* @see SiteHealthOutput
*/
@Override
public ListenableFuture> siteHealth(SiteHealthInput input) {
log.info("{}:site-health invoked.", APP_NAME);
List sites = resolver.getSiteHealth();
return buildSiteHealthOutput(sites);
}
/**
* Makes a call to {@code resolver.getDatabaseHealth()} to determine the
* health of the database(s) used by the controller.
*
* @param input request body adhering to the model for
* {@code DatabaseHealthInput}
* @return response adhering to the model for {@code DatabaseHealthOutput}
* @see HealthResolver
* @see DatabaseHealthInput
* @see DatabaseHealthOutput
*/
@Override
public ListenableFuture> databaseHealth(DatabaseHealthInput input) {
log.info("{}:database-health invoked.", APP_NAME);
DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
DatabaseHealth health = resolver.getDatabaseHealth();
outputBuilder.setStatus(health.getHealth().equals(Health.HEALTHY) ? "200" : "500");
outputBuilder.setHealth(health.getHealth().toString());
outputBuilder.setServedBy(member);
log.info("databaseHealth(): Health: {}", health.getHealth());
return Futures.immediateFuture(RpcResultBuilder.status(true).withResult(outputBuilder.build()).build());
}
/**
* Makes a call to {@code resolver.getAdminHealth()} to determine the
* health of the administrative portal(s) used by the controller.
*
* @param input request body adhering to the model for
* {@code AdminHealthInput}
* @return response adhering to the model for {@code AdminHealthOutput}
* @see HealthResolver
* @see AdminHealthInput
* @see AdminHealthOutput
*/
@Override
public ListenableFuture> adminHealth(AdminHealthInput input) {
log.info("{}:admin-health invoked.", APP_NAME);
AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
AdminHealth adminHealth = resolver.getAdminHealth();
outputBuilder.setStatus(Integer.toString(adminHealth.getStatusCode()));
outputBuilder.setHealth(adminHealth.getHealth().toString());
outputBuilder.setServedBy(member);
log.info("adminHealth(): Status: {} | Health: {}", adminHealth.getStatusCode(), adminHealth.getHealth());
return Futures.immediateFuture(RpcResultBuilder.status(true).withResult(outputBuilder.build()).build());
}
/**
* Places IP Tables rules in place to drop akka communications traffic with
* one or mode nodes. This method does not not perform any checks to see if
* rules currently exist, and assumes success.
*
* @param input request body adhering to the model for
* {@code HaltAkkaTrafficInput}
* @return response adhering to the model for {@code HaltAkkaTrafficOutput}
* @see HaltAkkaTrafficInput
* @see HaltAkkaTrafficOutput
*/
@Override
public ListenableFuture> haltAkkaTraffic(HaltAkkaTrafficInput input) {
log.info("{}:halt-akka-traffic invoked.", APP_NAME);
HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
outputBuilder.setStatus("200");
modifyIpTables(IpTables.ADD, input.getNodeInfo().toArray());
outputBuilder.setServedBy(member);
return Futures.immediateFuture(RpcResultBuilder.status(true).withResult(outputBuilder.build()).build());
}
/**
* Removes IP Tables rules in place to permit akka communications traffic
* with one or mode nodes. This method does not not perform any checks to
* see if rules currently exist, and assumes success.
*
* @param input request body adhering to the model for
* {@code ResumeAkkaTrafficInput}
* @return response adhering to the model for {@code ResumeAkkaTrafficOutput}
* @see ResumeAkkaTrafficInput
* @see ResumeAkkaTrafficOutput
*/
@Override
public ListenableFuture> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
log.info("{}:resume-akka-traffic invoked.", APP_NAME);
ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
outputBuilder.setStatus("200");
modifyIpTables(IpTables.DELETE, input.getNodeInfo().toArray());
outputBuilder.setServedBy(member);
return Futures.immediateFuture(RpcResultBuilder.status(true).withResult(outputBuilder.build()).build());
}
/**
* Returns a canned response containing the identifier for this
* controller's site.
*
* @param input request body adhering to the model for
* {@code SiteIdentifierInput}
* @return response adhering to the model for {@code SiteIdentifierOutput}
* @see SiteIdentifierInput
* @see SiteIdentifierOutput
*/
@Override
public ListenableFuture> siteIdentifier(SiteIdentifierInput input) {
log.info("{}:site-identifier invoked.", APP_NAME);
SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
outputBuilder.setStatus("200");
outputBuilder.setId(siteIdentifier);
outputBuilder.setServedBy(member);
return Futures.immediateFuture(RpcResultBuilder.status(true).withResult(outputBuilder.build()).build());
}
/**
* Makes a call to {@code resolver.tryFailover()} to try a failover defined
* by the active {@code HealthResolver}.
*
* @param input request body adhering to the model for
* {@code FailoverInput}
* @return response adhering to the model for {@code FailoverOutput}
* @see HealthResolver
* @see FailoverInput
* @see FailoverOutput
*/
@Override
public ListenableFuture> failover(FailoverInput input) {
log.info("{}:failover invoked.", APP_NAME);
FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
FailoverStatus failoverStatus = resolver.tryFailover(input);
outputBuilder.setServedBy(member);
outputBuilder.setMessage(failoverStatus.getMessage());
outputBuilder.setStatus(Integer.toString(failoverStatus.getStatusCode()));
log.info("{}:{}.", APP_NAME, failoverStatus.getMessage());
return Futures.immediateFuture(RpcResultBuilder.status(true).withResult(outputBuilder.build()).build());
}
/**
* Performs an akka traffic isolation of the active site from the standby
* site in an Active/Standby architecture. Invokes the
* {@code halt-akka-traffic} RPC against the standby site nodes using the
* information of the active site nodes.
*
* @param activeSite list of nodes in the active site
* @param standbySite list of nodes in the standby site
* @param port http or https port of the controller
* @deprecated No longer used since the refactor to use the HealthResolver
* pattern. Retained so the logic can be replicated later.
*/
@Deprecated
private void isolateSiteFromCluster(ArrayList activeSite, ArrayList standbySite, String port) {
log.info("isolateSiteFromCluster(): Halting Akka traffic...");
for(ClusterActor actor : standbySite) {
try {
log.info("Halting Akka traffic for: {}", actor.getNode());
// Build JSON with activeSite actor Node and actor AkkaPort
JSONObject akkaInput = new JSONObject();
JSONObject inputBlock = new JSONObject();
JSONArray votingStateArray = new JSONArray();
JSONObject nodeInfo;
for(ClusterActor node : activeSite) {
nodeInfo = new JSONObject();
nodeInfo.put("node", node.getNode());
nodeInfo.put("port", node.getAkkaPort());
votingStateArray.put(nodeInfo);
}
inputBlock.put("node-info", votingStateArray);
akkaInput.put("input", inputBlock);
ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", ConnectionManager.HttpMethod.POST, akkaInput.toString(), "");
} catch(IOException e) {
log.error("isolateSiteFromCluster(): Could not halt Akka traffic for: " + actor.getNode(), e);
}
}
}
/**
* Invokes the down unreachable action through the Jolokia mbean API.
*
* @param activeSite list of nodes in the active site
* @param standbySite list of nodes in the standby site
* @param port http or https port of the controller
* @deprecated No longer used since the refactor to use the HealthResolver
* pattern. Retained so the logic can be replicated later.
*/
@Deprecated
private void downUnreachableNodes(ArrayList activeSite, ArrayList standbySite, String port) {
log.info("downUnreachableNodes(): Setting site unreachable...");
JSONObject jolokiaInput = new JSONObject();
jolokiaInput.put("type", "EXEC");
jolokiaInput.put("mbean", "akka:type=Cluster");
jolokiaInput.put("operation", "down");
JSONArray arguments = new JSONArray();
for(ClusterActor actor : activeSite) {
// Build Jolokia input
// May need to change from akka port to actor.getAkkaPort()
arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty(PropertyKeys.CONTROLLER_PORT_AKKA));
}
jolokiaInput.put("arguments", arguments);
log.debug("downUnreachableNodes(): {}", jolokiaInput);
try {
log.info("downUnreachableNodes(): Setting nodes unreachable");
ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + standbySite.get(0).getNode() + ":" + port + "/jolokia", ConnectionManager.HttpMethod.POST, jolokiaInput.toString(), "");
} catch(IOException e) {
log.error("downUnreachableNodes(): Error setting nodes unreachable", e);
}
}
/**
* Triggers a data backup and export sequence of MD-SAL data. Invokes the
* {@code data-export-import:schedule-export} RPC to schedule a data export
* and subsequently the {@code daexim-offsite-backup:backup-data} RPC
* against the active site to export and backup the data. Assumes the
* controllers have the org.onap.ccsdk.sli.northbound.daeximoffsitebackup
* bundle installed.
*
* @param activeSite list of nodes in the active site
* @param port http or https port of the controller
* @deprecated No longer used since the refactor to use the HealthResolver
* pattern. Retained so the logic can be replicated later.
*/
@Deprecated
private void backupMdSal(ArrayList activeSite, String port) {
log.info("backupMdSal(): Backing up data...");
try {
log.info("backupMdSal(): Scheduling backup for: {}", activeSite.get(0).getNode());
ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + activeSite.get(0).getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", ConnectionManager.HttpMethod.POST, "{ \"input\": { \"run-at\": \"30\" } }", "");
} catch(IOException e) {
log.error("backupMdSal(): Error backing up MD-SAL", e);
}
for(ClusterActor actor : activeSite) {
try {
// Move data offsite
log.info("backupMdSal(): Backing up data for: {}", actor.getNode());
ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", ConnectionManager.HttpMethod.POST, null, "");
} catch(IOException e) {
log.error("backupMdSal(): Error backing up data.", e);
}
}
}
/**
* Builds a response object for {@code clusterHealth()}. Sorts and iterates
* over the contents of the {@code memberMap}, which contains the health
* information of the cluster, and adds them to the {@code outputBuilder}.
* If the ClusterActor is healthy, according to
* {@code resolver.isControllerHealthy()}, the {@code ClusterHealthOutput}
* status has a {@code 0} appended, otherwise a {@code 1} is appended. A
* status of all zeroes denotes a healthy cluster. This status should be
* easily decoded by tools which use the output.
*
* @return future containing a completed {@code ClusterHealthOutput}
* @see ClusterActor
* @see ClusterHealthOutput
* @see HealthResolver
*/
@SuppressWarnings("unchecked")
private ListenableFuture> buildClusterHealthOutput() {
ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
outputBuilder.setServedBy(member);
List memberList = new ArrayList();
StringBuilder stat = new StringBuilder();
memberMap.values()
.stream()
.sorted(Comparator.comparingInt(member -> Integer.parseInt(member.getMember().split("-")[1])))
.forEach(member -> {
memberList.add(new MemberBuilder(member).build());
// 0 is a healthy controller, 1 is unhealthy.
// The list is sorted so users can decode to find unhealthy nodes
// This will also let them figure out health on a per-site basis
// Depending on any tools they use with this API
if(resolver.isControllerHealthy(member)) {
stat.append("0");
} else {
stat.append("1");
}
});
outputBuilder.setStatus(stat.toString());
outputBuilder.setMembers(memberList);
RpcResult rpcResult = RpcResultBuilder.status(true).withResult(outputBuilder.build()).build();
return Futures.immediateFuture(rpcResult);
}
/**
* Builds a response object for {@code siteHealth()}. Iterates over a list
* of {@code SiteHealth} objects and populates the {@code SiteHealthOutput}
* with the information.
*
* @param sites list of sites
* @return future containing a completed {@code SiteHealthOutput}
* @see SiteHealth
* @see HealthResolver
*/
@SuppressWarnings("unchecked")
private ListenableFuture> buildSiteHealthOutput(List sites) {
SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
SitesBuilder siteBuilder = new SitesBuilder();
outputBuilder.setStatus("200");
outputBuilder.setSites((List) new ArrayList());
for(SiteHealth site : sites) {
siteBuilder.setHealth(site.getHealth().toString());
siteBuilder.setRole(site.getRole());
siteBuilder.setId(site.getId());
outputBuilder.getSites().add(siteBuilder.build());
log.info("buildSiteHealthOutput(): Health for {}: {}", site.getId(), site.getHealth().getHealth());
}
outputBuilder.setServedBy(member);
RpcResult rpcResult = RpcResultBuilder.status(true).withResult(outputBuilder.build()).build();
return Futures.immediateFuture(rpcResult);
}
/**
* Parses a line containing the akka networking information of the akka
* controller cluster. Assumes entries of the format:
*
* akka.tcp://opendaylight-cluster-data@:
*
* The information is stored in a {@code ClusterActor} object, and then
* added to the memberMap HashMap, with the {@code FQDN} as the key. The
* final step is a call to {@code createHealthResolver} to create the
* health resolver for the provider.
*
* @param line the line containing all of the seed nodes
* @see ClusterActor
* @see HealthResolver
*/
private void parseSeedNodes(String line) {
memberMap = new HashMap<>();
line = line.substring(line.indexOf("[\""), line.indexOf(']'));
String[] splits = line.split(",");
for(int ndx = 0; ndx < splits.length; ndx++) {
String nodeName = splits[ndx];
int delimLocation = nodeName.indexOf('@');
String port = nodeName.substring(splits[ndx].indexOf(':', delimLocation) + 1, splits[ndx].indexOf('"', splits[ndx].indexOf(':')));
splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(':', delimLocation));
log.info("parseSeedNodes(): Adding node: {}:{}", splits[ndx], port);
ClusterActor clusterActor = new ClusterActor();
clusterActor.setNode(splits[ndx]);
clusterActor.setAkkaPort(port);
clusterActor.setMember("member-" + (ndx + 1));
if(member.equals(clusterActor.getMember())) {
self = clusterActor;
}
memberMap.put(clusterActor.getNode(), clusterActor);
log.info("parseSeedNodes(): {}", clusterActor);
}
createHealthResolver();
}
/**
* Creates the specific health resolver requested by the user, as specified
* in the gr-toolkit.properties file. If a resolver is not specified, or
* there is an issue creating the resolver, it will use a fallback resolver
* based on how many nodes are added to the memberMap HashMap.
*
* @see HealthResolver
* @see SingleNodeHealthResolver
* @see ThreeNodeHealthResolver
* @see SixNodeHealthResolver
*/
private void createHealthResolver() {
log.info("createHealthResolver(): Creating health resolver...");
try {
Class resolverClass = null;
String userDefinedResolver = properties.getProperty(PropertyKeys.RESOLVER);
if(StringUtils.isEmpty(userDefinedResolver)) {
throw new InstantiationException();
}
resolverClass = Class.forName(userDefinedResolver);
Class[] types = { Map.class , properties.getClass(), DbLibService.class };
Constructor constructor = resolverClass.getConstructor(types);
Object[] parameters = { memberMap, properties, dbLib };
resolver = constructor.newInstance(parameters);
log.info("createHealthResolver(): Created resolver from name {}", resolver.toString());
} catch(ClassNotFoundException | InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
log.warn("createHealthResolver(): Could not create user defined resolver", e);
if(memberMap.size() == 1) {
log.info("createHealthResolver(): FALLBACK: Initializing SingleNodeHealthResolver...");
resolver = new SingleNodeHealthResolver(memberMap, properties, dbLib);
} else if(memberMap.size() == 3) {
log.info("createHealthResolver(): FALLBACK: Initializing ThreeNodeHealthResolver...");
resolver = new ThreeNodeHealthResolver(memberMap, properties, dbLib);
} else if(memberMap.size() == 6) {
log.info("createHealthResolver(): FALLBACK: Initializing SixNodeHealthResolver...");
resolver = new SixNodeHealthResolver(memberMap, properties, dbLib);
}
}
}
/**
* Adds or drops IPTables rules to block or resume akka traffic for a node
* in the akka cluster. Assumes that the user or group that the controller
* is run as has the ability to run sudo /sbin/iptables without requiring a
* password. This method will run indefinitely if that assumption is not
* correct. This method does not check to see if any rules around the node
* are preexisting, so multiple uses will result in multiple additions and
* removals from IPTables.
*
* @param task the operation to be performed against IPTables
* @param nodeInfo array containing the nodes to be added or dropped from
* IPTables
*/
private void modifyIpTables(IpTables task, Object[] nodeInfo) {
log.info("modifyIpTables(): Modifying IPTables rules...");
if(task == IpTables.ADD) {
for(Object node : nodeInfo) {
org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
(org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
log.info("modifyIpTables(): Isolating {}", n.getNode());
executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
}
} else if(task == IpTables.DELETE) {
for(Object node : nodeInfo) {
org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
(org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
log.info("modifyIpTables(): De-isolating {}", n.getNode());
executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
}
}
if(nodeInfo.length > 0) {
executeCommand("sudo /sbin/iptables -L");
}
}
/**
* Opens a shell session and executes a command.
*
* @param command the shell command to execute
*/
private void executeCommand(String command) {
log.info("executeCommand(): Executing command: {}", command);
String[] cmd = command.split(" ");
try {
Process p = Runtime.getRuntime().exec(cmd);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
String inputLine;
StringBuilder content = new StringBuilder();
while((inputLine = bufferedReader.readLine()) != null) {
content.append(inputLine);
}
bufferedReader.close();
log.info("executeCommand(): {}", content);
} catch(IOException e) {
log.error("executeCommand(): Error executing command", e);
}
}
/**
* The IPTables operations this module can perform.
*/
enum IpTables {
ADD,
DELETE
}
}