2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.plugins.grtoolkit;
24 import java.io.BufferedReader;
26 import java.io.FileInputStream;
27 import java.io.FileReader;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.lang.reflect.Constructor;
31 import java.lang.reflect.InvocationTargetException;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Comparator;
35 import java.util.HashMap;
37 import java.util.Properties;
38 import java.util.List;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import javax.annotation.Nonnull;
43 import com.google.common.util.concurrent.Futures;
44 import com.google.common.util.concurrent.ListenableFuture;
46 import org.apache.commons.lang.StringUtils;
48 import org.onap.ccsdk.sli.core.dblib.DbLibService;
49 import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionManager;
50 import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionResponse;
51 import org.onap.ccsdk.sli.plugins.grtoolkit.data.AdminHealth;
52 import org.onap.ccsdk.sli.plugins.grtoolkit.data.ClusterActor;
53 import org.onap.ccsdk.sli.plugins.grtoolkit.data.DatabaseHealth;
54 import org.onap.ccsdk.sli.plugins.grtoolkit.data.FailoverStatus;
55 import org.onap.ccsdk.sli.plugins.grtoolkit.data.Health;
56 import org.onap.ccsdk.sli.plugins.grtoolkit.data.MemberBuilder;
57 import org.onap.ccsdk.sli.plugins.grtoolkit.data.PropertyKeys;
58 import org.onap.ccsdk.sli.plugins.grtoolkit.data.SiteHealth;
59 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.HealthResolver;
60 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SingleNodeHealthResolver;
61 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SixNodeHealthResolver;
62 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.ThreeNodeHealthResolver;
64 import org.json.JSONArray;
65 import org.json.JSONObject;
67 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
68 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
69 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
70 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
71 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
72 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
73 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
74 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
75 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
76 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
77 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
78 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
79 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
80 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
81 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
82 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
83 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
84 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
85 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
86 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
87 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
88 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
89 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
90 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
91 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
92 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
93 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
94 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
95 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
96 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
97 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
98 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
99 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
100 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
104 import org.slf4j.Logger;
105 import org.slf4j.LoggerFactory;
108 * API implementation of the {@code GrToolkitService} interface generated from
109 * the gr-toolkit.yang model. The RPCs contained within this class are meant to
110 * run in an architecture agnostic fashion, where the response is repeatable
111 * and predictable across any given node configuration. To facilitate this,
112 * health checking and failover logic has been abstracted into the
113 * {@code HealthResolver} classes.
115 * Anyone who wishes to write a custom resolver for use with GR Toolkit should
116 * extend the {@code HealthResolver} class. The currently provided resolvers
117 * are useful references for further implementation.
119 * @author Anthony Haddox
120 * @see GrToolkitService
121 * @see HealthResolver
122 * @see SingleNodeHealthResolver
123 * @see ThreeNodeHealthResolver
124 * @see SixNodeHealthResolver
126 public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
127 private static final String APP_NAME = "gr-toolkit";
128 private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
129 private String akkaConfig;
130 private String httpProtocol;
131 private String siteIdentifier = System.getenv("SITE_NAME");
132 private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
133 private final ExecutorService executor;
134 protected DataBroker dataBroker;
135 protected NotificationPublishService notificationService;
136 protected RpcProviderRegistry rpcRegistry;
137 protected BindingAwareBroker.RpcRegistration<GrToolkitService> rpcRegistration;
138 protected DbLibService dbLib;
139 private String member;
140 private ClusterActor self;
141 private HashMap<String, ClusterActor> memberMap;
142 private Properties properties;
143 private DistributedDataStoreInterface configDatastore;
144 private HealthResolver resolver;
147 * Constructs the provider for the GR Toolkit API. Dependencies are
148 * injected using the GrToolkit.xml blueprint.
150 * @param dataBroker The Data Broker
151 * @param notificationProviderService The Notification Service
152 * @param rpcProviderRegistry The RPC Registry
153 * @param configDatastore The Configuration Data Store provided by the controller
154 * @param dbLibService Reference to the controller provided DbLibService
156 public GrToolkitProvider(DataBroker dataBroker,
157 NotificationPublishService notificationProviderService,
158 RpcProviderRegistry rpcProviderRegistry,
159 DistributedDataStoreInterface configDatastore,
160 DbLibService dbLibService) {
161 log.info("Creating provider for {}", APP_NAME);
162 this.executor = Executors.newFixedThreadPool(1);
163 this.dataBroker = dataBroker;
164 this.notificationService = notificationProviderService;
165 this.rpcRegistry = rpcProviderRegistry;
166 this.configDatastore = configDatastore;
167 this.dbLib = dbLibService;
172 * Initializes some structures necessary to hold health check information
173 * and perform failovers.
175 private void initialize() {
176 log.info("Initializing provider for {}", APP_NAME);
180 rpcRegistration = rpcRegistry.addRpcImplementation(GrToolkitService.class, this);
181 log.info("Initialization complete for {}", APP_NAME);
185 * Creates the {@code Properties} object with the contents of
186 * gr-toolkit.properties, found at the {@code SDNC_CONFIG_DIR} directory,
187 * which should be set as an environment variable. If the properties file
188 * is not found, GR Toolkit will not function.
190 private void setProperties() {
191 log.info("Loading properties from {}", PROPERTIES_FILE);
192 properties = new Properties();
193 File propertiesFile = new File(PROPERTIES_FILE);
194 if(!propertiesFile.exists()) {
195 log.warn("setProperties(): Properties file not found.");
197 try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
198 properties.load(fileInputStream);
199 if(!properties.containsKey(PropertyKeys.SITE_IDENTIFIER)) {
200 properties.put(PropertyKeys.SITE_IDENTIFIER, "Unknown Site");
202 httpProtocol = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? "https://" : "http://";
203 akkaConfig = properties.getProperty(PropertyKeys.AKKA_CONF_LOCATION).trim();
204 if(StringUtils.isEmpty(siteIdentifier)) {
205 siteIdentifier = properties.getProperty(PropertyKeys.SITE_IDENTIFIER).trim();
207 log.info("setProperties(): Loaded properties.");
208 } catch(IOException e) {
209 log.error("setProperties(): Error loading properties.", e);
215 * Parses the akka.conf file used by the controller to define an akka
216 * cluster. This method requires the <i>seed-nodes</i> definition to exist
219 private void defineMembers() {
220 member = configDatastore.getActorUtils().getCurrentMemberName().getName();
221 log.info("defineMembers(): Cluster member: {}", member);
223 log.info("defineMembers(): Parsing akka.conf for cluster memberMap...");
225 File akkaConfigFile = new File(this.akkaConfig);
226 try(FileReader fileReader = new FileReader(akkaConfigFile);
227 BufferedReader bufferedReader = new BufferedReader(fileReader)) {
229 while((line = bufferedReader.readLine()) != null) {
230 if(line.contains("seed-nodes =")) {
231 parseSeedNodes(line);
236 } catch(IOException e) {
237 log.error("defineMembers(): Couldn't load akka", e);
238 } catch(NullPointerException e) {
239 log.error("defineMembers(): akkaConfig is null. Check properties file and restart {} bundle.", APP_NAME);
240 log.error("defineMembers(): NullPointerException", e);
242 log.info("self:\n{}", self);
246 * Sets up the {@code InstanceIdentifier}s for Data Store transactions.
248 private void createContainers() {
249 // Replace with MD-SAL write for FailoverStatus
253 * Shuts down the {@code ExecutorService} and closes the RPC Provider Registry.
256 public void close() throws Exception {
257 log.info("Closing provider for {}", APP_NAME);
259 rpcRegistration.close();
260 log.info("close(): Successfully closed provider for {}", APP_NAME);
264 * Listens for changes to the Data tree.
266 * @param changes Data tree changes.
269 public void onDataTreeChanged(@Nonnull Collection changes) {
270 log.info("onDataTreeChanged(): No changes.");
274 * Makes a call to {@code resolver.getClusterHealth()} to determine the
275 * health of the akka clustered controllers.
277 * @param input request body adhering to the model for
278 * {@code ClusterHealthInput}
279 * @return response adhering to the model for {@code ClusterHealthOutput}
280 * @see HealthResolver
281 * @see ClusterHealthInput
282 * @see ClusterHealthOutput
285 public ListenableFuture<RpcResult<ClusterHealthOutput>> clusterHealth(ClusterHealthInput input) {
286 log.info("{}:cluster-health invoked.", APP_NAME);
287 resolver.getClusterHealth();
288 return buildClusterHealthOutput();
292 * Makes a call to {@code resolver.getSiteHealth()} to determine the health
293 * of all of the application components of a site. In a multi-site config,
294 * this will gather the health of all sites.
296 * @param input request body adhering to the model for
297 * {@code SiteHealthInput}
298 * @return response adhering to the model for {@code SiteHealthOutput}
299 * @see HealthResolver
300 * @see SiteHealthInput
301 * @see SiteHealthOutput
304 public ListenableFuture<RpcResult<SiteHealthOutput>> siteHealth(SiteHealthInput input) {
305 log.info("{}:site-health invoked.", APP_NAME);
306 List<SiteHealth> sites = resolver.getSiteHealth();
307 return buildSiteHealthOutput(sites);
311 * Makes a call to {@code resolver.getDatabaseHealth()} to determine the
312 * health of the database(s) used by the controller.
314 * @param input request body adhering to the model for
315 * {@code DatabaseHealthInput}
316 * @return response adhering to the model for {@code DatabaseHealthOutput}
317 * @see HealthResolver
318 * @see DatabaseHealthInput
319 * @see DatabaseHealthOutput
322 public ListenableFuture<RpcResult<DatabaseHealthOutput>> databaseHealth(DatabaseHealthInput input) {
323 log.info("{}:database-health invoked.", APP_NAME);
324 DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
325 DatabaseHealth health = resolver.getDatabaseHealth();
326 outputBuilder.setStatus(health.getHealth().equals(Health.HEALTHY) ? "200" : "500");
327 outputBuilder.setHealth(health.getHealth().toString());
328 outputBuilder.setServedBy(member);
329 log.info("databaseHealth(): Health: {}", health.getHealth());
330 return Futures.immediateFuture(RpcResultBuilder.<DatabaseHealthOutput>status(true).withResult(outputBuilder.build()).build());
334 * Makes a call to {@code resolver.getAdminHealth()} to determine the
335 * health of the administrative portal(s) used by the controller.
337 * @param input request body adhering to the model for
338 * {@code AdminHealthInput}
339 * @return response adhering to the model for {@code AdminHealthOutput}
340 * @see HealthResolver
341 * @see AdminHealthInput
342 * @see AdminHealthOutput
345 public ListenableFuture<RpcResult<AdminHealthOutput>> adminHealth(AdminHealthInput input) {
346 log.info("{}:admin-health invoked.", APP_NAME);
347 AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
348 AdminHealth adminHealth = resolver.getAdminHealth();
349 outputBuilder.setStatus(Integer.toString(adminHealth.getStatusCode()));
350 outputBuilder.setHealth(adminHealth.getHealth().toString());
351 outputBuilder.setServedBy(member);
352 log.info("adminHealth(): Status: {} | Health: {}", adminHealth.getStatusCode(), adminHealth.getHealth());
353 return Futures.immediateFuture(RpcResultBuilder.<AdminHealthOutput>status(true).withResult(outputBuilder.build()).build());
357 * Places IP Tables rules in place to drop akka communications traffic with
358 * one or mode nodes. This method does not not perform any checks to see if
359 * rules currently exist, and assumes success.
361 * @param input request body adhering to the model for
362 * {@code HaltAkkaTrafficInput}
363 * @return response adhering to the model for {@code HaltAkkaTrafficOutput}
364 * @see HaltAkkaTrafficInput
365 * @see HaltAkkaTrafficOutput
368 public ListenableFuture<RpcResult<HaltAkkaTrafficOutput>> haltAkkaTraffic(HaltAkkaTrafficInput input) {
369 log.info("{}:halt-akka-traffic invoked.", APP_NAME);
370 HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
371 outputBuilder.setStatus("200");
372 modifyIpTables(IpTables.ADD, input.getNodeInfo().toArray());
373 outputBuilder.setServedBy(member);
375 return Futures.immediateFuture(RpcResultBuilder.<HaltAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
379 * Removes IP Tables rules in place to permit akka communications traffic
380 * with one or mode nodes. This method does not not perform any checks to
381 * see if rules currently exist, and assumes success.
383 * @param input request body adhering to the model for
384 * {@code ResumeAkkaTrafficInput}
385 * @return response adhering to the model for {@code ResumeAkkaTrafficOutput}
386 * @see ResumeAkkaTrafficInput
387 * @see ResumeAkkaTrafficOutput
390 public ListenableFuture<RpcResult<ResumeAkkaTrafficOutput>> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
391 log.info("{}:resume-akka-traffic invoked.", APP_NAME);
392 ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
393 outputBuilder.setStatus("200");
394 modifyIpTables(IpTables.DELETE, input.getNodeInfo().toArray());
395 outputBuilder.setServedBy(member);
397 return Futures.immediateFuture(RpcResultBuilder.<ResumeAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
401 * Returns a canned response containing the identifier for this
404 * @param input request body adhering to the model for
405 * {@code SiteIdentifierInput}
406 * @return response adhering to the model for {@code SiteIdentifierOutput}
407 * @see SiteIdentifierInput
408 * @see SiteIdentifierOutput
411 public ListenableFuture<RpcResult<SiteIdentifierOutput>> siteIdentifier(SiteIdentifierInput input) {
412 log.info("{}:site-identifier invoked.", APP_NAME);
413 SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
414 outputBuilder.setStatus("200");
415 outputBuilder.setId(siteIdentifier);
416 outputBuilder.setServedBy(member);
417 return Futures.immediateFuture(RpcResultBuilder.<SiteIdentifierOutput>status(true).withResult(outputBuilder.build()).build());
421 * Makes a call to {@code resolver.tryFailover()} to try a failover defined
422 * by the active {@code HealthResolver}.
424 * @param input request body adhering to the model for
425 * {@code FailoverInput}
426 * @return response adhering to the model for {@code FailoverOutput}
427 * @see HealthResolver
429 * @see FailoverOutput
432 public ListenableFuture<RpcResult<FailoverOutput>> failover(FailoverInput input) {
433 log.info("{}:failover invoked.", APP_NAME);
434 FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
435 FailoverStatus failoverStatus = resolver.tryFailover(input);
436 outputBuilder.setServedBy(member);
437 outputBuilder.setMessage(failoverStatus.getMessage());
438 outputBuilder.setStatus(Integer.toString(failoverStatus.getStatusCode()));
439 log.info("{}:{}.", APP_NAME, failoverStatus.getMessage());
440 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
444 * Performs an akka traffic isolation of the active site from the standby
445 * site in an Active/Standby architecture. Invokes the
446 * {@code halt-akka-traffic} RPC against the standby site nodes using the
447 * information of the active site nodes.
449 * @param activeSite list of nodes in the active site
450 * @param standbySite list of nodes in the standby site
451 * @param port http or https port of the controller
452 * @deprecated No longer used since the refactor to use the HealthResolver
453 * pattern. Retained so the logic can be replicated later.
456 private void isolateSiteFromCluster(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
457 log.info("isolateSiteFromCluster(): Halting Akka traffic...");
458 for(ClusterActor actor : standbySite) {
460 log.info("Halting Akka traffic for: {}", actor.getNode());
461 // Build JSON with activeSite actor Node and actor AkkaPort
462 JSONObject akkaInput = new JSONObject();
463 JSONObject inputBlock = new JSONObject();
464 JSONArray votingStateArray = new JSONArray();
466 for(ClusterActor node : activeSite) {
467 nodeInfo = new JSONObject();
468 nodeInfo.put("node", node.getNode());
469 nodeInfo.put("port", node.getAkkaPort());
470 votingStateArray.put(nodeInfo);
472 inputBlock.put("node-info", votingStateArray);
473 akkaInput.put("input", inputBlock);
474 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", ConnectionManager.HttpMethod.POST, akkaInput.toString(), "");
475 } catch(IOException e) {
476 log.error("isolateSiteFromCluster(): Could not halt Akka traffic for: " + actor.getNode(), e);
482 * Invokes the down unreachable action through the Jolokia mbean API.
484 * @param activeSite list of nodes in the active site
485 * @param standbySite list of nodes in the standby site
486 * @param port http or https port of the controller
487 * @deprecated No longer used since the refactor to use the HealthResolver
488 * pattern. Retained so the logic can be replicated later.
491 private void downUnreachableNodes(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
492 log.info("downUnreachableNodes(): Setting site unreachable...");
493 JSONObject jolokiaInput = new JSONObject();
494 jolokiaInput.put("type", "EXEC");
495 jolokiaInput.put("mbean", "akka:type=Cluster");
496 jolokiaInput.put("operation", "down");
497 JSONArray arguments = new JSONArray();
498 for(ClusterActor actor : activeSite) {
499 // Build Jolokia input
500 // May need to change from akka port to actor.getAkkaPort()
501 arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty(PropertyKeys.CONTROLLER_PORT_AKKA));
503 jolokiaInput.put("arguments", arguments);
504 log.debug("downUnreachableNodes(): {}", jolokiaInput);
506 log.info("downUnreachableNodes(): Setting nodes unreachable");
507 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + standbySite.get(0).getNode() + ":" + port + "/jolokia", ConnectionManager.HttpMethod.POST, jolokiaInput.toString(), "");
508 } catch(IOException e) {
509 log.error("downUnreachableNodes(): Error setting nodes unreachable", e);
514 * Triggers a data backup and export sequence of MD-SAL data. Invokes the
515 * {@code data-export-import:schedule-export} RPC to schedule a data export
516 * and subsequently the {@code daexim-offsite-backup:backup-data} RPC
517 * against the active site to export and backup the data. Assumes the
518 * controllers have the org.onap.ccsdk.sli.northbound.daeximoffsitebackup
521 * @param activeSite list of nodes in the active site
522 * @param port http or https port of the controller
523 * @deprecated No longer used since the refactor to use the HealthResolver
524 * pattern. Retained so the logic can be replicated later.
527 private void backupMdSal(ArrayList<ClusterActor> activeSite, String port) {
528 log.info("backupMdSal(): Backing up data...");
530 log.info("backupMdSal(): Scheduling backup for: {}", activeSite.get(0).getNode());
531 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + activeSite.get(0).getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", ConnectionManager.HttpMethod.POST, "{ \"input\": { \"run-at\": \"30\" } }", "");
532 } catch(IOException e) {
533 log.error("backupMdSal(): Error backing up MD-SAL", e);
535 for(ClusterActor actor : activeSite) {
538 log.info("backupMdSal(): Backing up data for: {}", actor.getNode());
539 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", ConnectionManager.HttpMethod.POST, null, "");
540 } catch(IOException e) {
541 log.error("backupMdSal(): Error backing up data.", e);
547 * Builds a response object for {@code clusterHealth()}. Sorts and iterates
548 * over the contents of the {@code memberMap}, which contains the health
549 * information of the cluster, and adds them to the {@code outputBuilder}.
550 * If the ClusterActor is healthy, according to
551 * {@code resolver.isControllerHealthy()}, the {@code ClusterHealthOutput}
552 * status has a {@code 0} appended, otherwise a {@code 1} is appended. A
553 * status of all zeroes denotes a healthy cluster. This status should be
554 * easily decoded by tools which use the output.
556 * @return future containing a completed {@code ClusterHealthOutput}
558 * @see ClusterHealthOutput
559 * @see HealthResolver
561 @SuppressWarnings("unchecked")
562 private ListenableFuture<RpcResult<ClusterHealthOutput>> buildClusterHealthOutput() {
563 ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
564 outputBuilder.setServedBy(member);
565 List memberList = new ArrayList<Member>();
566 StringBuilder stat = new StringBuilder();
569 .sorted(Comparator.comparingInt(member -> Integer.parseInt(member.getMember().split("-")[1])))
571 memberList.add(new MemberBuilder(member).build());
572 // 0 is a healthy controller, 1 is unhealthy.
573 // The list is sorted so users can decode to find unhealthy nodes
574 // This will also let them figure out health on a per-site basis
575 // Depending on any tools they use with this API
576 if(resolver.isControllerHealthy(member)) {
582 outputBuilder.setStatus(stat.toString());
583 outputBuilder.setMembers(memberList);
584 RpcResult<ClusterHealthOutput> rpcResult = RpcResultBuilder.<ClusterHealthOutput>status(true).withResult(outputBuilder.build()).build();
585 return Futures.immediateFuture(rpcResult);
589 * Builds a response object for {@code siteHealth()}. Iterates over a list
590 * of {@code SiteHealth} objects and populates the {@code SiteHealthOutput}
591 * with the information.
593 * @param sites list of sites
594 * @return future containing a completed {@code SiteHealthOutput}
596 * @see HealthResolver
598 @SuppressWarnings("unchecked")
599 private ListenableFuture<RpcResult<SiteHealthOutput>> buildSiteHealthOutput(List<SiteHealth> sites) {
600 SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
601 SitesBuilder siteBuilder = new SitesBuilder();
602 outputBuilder.setStatus("200");
603 outputBuilder.setSites((List) new ArrayList<Site>());
605 for(SiteHealth site : sites) {
606 siteBuilder.setHealth(site.getHealth().toString());
607 siteBuilder.setRole(site.getRole());
608 siteBuilder.setId(site.getId());
609 outputBuilder.getSites().add(siteBuilder.build());
610 log.info("buildSiteHealthOutput(): Health for {}: {}", site.getId(), site.getHealth().getHealth());
613 outputBuilder.setServedBy(member);
614 RpcResult<SiteHealthOutput> rpcResult = RpcResultBuilder.<SiteHealthOutput>status(true).withResult(outputBuilder.build()).build();
615 return Futures.immediateFuture(rpcResult);
619 * Parses a line containing the akka networking information of the akka
620 * controller cluster. Assumes entries of the format:
622 * akka.tcp://opendaylight-cluster-data@<FQDN>:<AKKA_PORT>
624 * The information is stored in a {@code ClusterActor} object, and then
625 * added to the memberMap HashMap, with the {@code FQDN} as the key. The
626 * final step is a call to {@code createHealthResolver} to create the
627 * health resolver for the provider.
629 * @param line the line containing all of the seed nodes
631 * @see HealthResolver
633 private void parseSeedNodes(String line) {
634 memberMap = new HashMap<>();
635 line = line.substring(line.indexOf("[\""), line.indexOf(']'));
636 String[] splits = line.split(",");
638 for(int ndx = 0; ndx < splits.length; ndx++) {
639 String nodeName = splits[ndx];
640 int delimLocation = nodeName.indexOf('@');
641 String port = nodeName.substring(splits[ndx].indexOf(':', delimLocation) + 1, splits[ndx].indexOf('"', splits[ndx].indexOf(':')));
642 splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(':', delimLocation));
643 log.info("parseSeedNodes(): Adding node: {}:{}", splits[ndx], port);
644 ClusterActor clusterActor = new ClusterActor();
645 clusterActor.setNode(splits[ndx]);
646 clusterActor.setAkkaPort(port);
647 clusterActor.setMember("member-" + (ndx + 1));
648 if(member.equals(clusterActor.getMember())) {
651 memberMap.put(clusterActor.getNode(), clusterActor);
652 log.info("parseSeedNodes(): {}", clusterActor);
655 createHealthResolver();
659 * Creates the specific health resolver requested by the user, as specified
660 * in the gr-toolkit.properties file. If a resolver is not specified, or
661 * there is an issue creating the resolver, it will use a fallback resolver
662 * based on how many nodes are added to the memberMap HashMap.
664 * @see HealthResolver
665 * @see SingleNodeHealthResolver
666 * @see ThreeNodeHealthResolver
667 * @see SixNodeHealthResolver
669 private void createHealthResolver() {
670 log.info("createHealthResolver(): Creating health resolver...");
672 Class resolverClass = null;
673 String userDefinedResolver = properties.getProperty(PropertyKeys.RESOLVER);
674 if(StringUtils.isEmpty(userDefinedResolver)) {
675 throw new InstantiationException();
677 resolverClass = Class.forName(userDefinedResolver);
678 Class[] types = { Map.class , properties.getClass(), DbLibService.class };
679 Constructor<HealthResolver> constructor = resolverClass.getConstructor(types);
680 Object[] parameters = { memberMap, properties, dbLib };
681 resolver = constructor.newInstance(parameters);
682 log.info("createHealthResolver(): Created resolver from name {}", resolver.toString());
683 } catch(ClassNotFoundException | InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
684 log.warn("createHealthResolver(): Could not create user defined resolver", e);
685 if(memberMap.size() == 1) {
686 log.info("createHealthResolver(): FALLBACK: Initializing SingleNodeHealthResolver...");
687 resolver = new SingleNodeHealthResolver(memberMap, properties, dbLib);
688 } else if(memberMap.size() == 3) {
689 log.info("createHealthResolver(): FALLBACK: Initializing ThreeNodeHealthResolver...");
690 resolver = new ThreeNodeHealthResolver(memberMap, properties, dbLib);
691 } else if(memberMap.size() == 6) {
692 log.info("createHealthResolver(): FALLBACK: Initializing SixNodeHealthResolver...");
693 resolver = new SixNodeHealthResolver(memberMap, properties, dbLib);
699 * Adds or drops IPTables rules to block or resume akka traffic for a node
700 * in the akka cluster. Assumes that the user or group that the controller
701 * is run as has the ability to run sudo /sbin/iptables without requiring a
702 * password. This method will run indefinitely if that assumption is not
703 * correct. This method does not check to see if any rules around the node
704 * are preexisting, so multiple uses will result in multiple additions and
705 * removals from IPTables.
707 * @param task the operation to be performed against IPTables
708 * @param nodeInfo array containing the nodes to be added or dropped from
711 private void modifyIpTables(IpTables task, Object[] nodeInfo) {
712 log.info("modifyIpTables(): Modifying IPTables rules...");
713 if(task == IpTables.ADD) {
714 for(Object node : nodeInfo) {
715 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
716 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
717 log.info("modifyIpTables(): Isolating {}", n.getNode());
718 executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
719 executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
721 } else if(task == IpTables.DELETE) {
722 for(Object node : nodeInfo) {
723 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
724 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
725 log.info("modifyIpTables(): De-isolating {}", n.getNode());
726 executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
727 executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
730 if(nodeInfo.length > 0) {
731 executeCommand("sudo /sbin/iptables -L");
736 * Opens a shell session and executes a command.
738 * @param command the shell command to execute
740 private void executeCommand(String command) {
741 log.info("executeCommand(): Executing command: {}", command);
742 String[] cmd = command.split(" ");
744 Process p = Runtime.getRuntime().exec(cmd);
745 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
747 StringBuilder content = new StringBuilder();
748 while((inputLine = bufferedReader.readLine()) != null) {
749 content.append(inputLine);
751 bufferedReader.close();
752 log.info("executeCommand(): {}", content);
753 } catch(IOException e) {
754 log.error("executeCommand(): Error executing command", e);
759 * The IPTables operations this module can perform.