8da2f09ae064c8bf02ba4dc7b4cce03c07a10f44
[ccsdk/sli.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : SDN-C
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights
6  *                      reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.ccsdk.sli.plugins.grtoolkit;
23
24 import com.google.common.util.concurrent.Futures;
25 import com.google.common.util.concurrent.ListenableFuture;
26 import java.io.BufferedReader;
27 import java.io.File;
28 import java.io.FileInputStream;
29 import java.io.FileReader;
30 import java.io.IOException;
31 import java.io.InputStreamReader;
32 import java.lang.reflect.Constructor;
33 import java.lang.reflect.InvocationTargetException;
34 import java.util.ArrayList;
35 import java.util.Collection;
36 import java.util.Comparator;
37 import java.util.HashMap;
38 import java.util.List;
39 import java.util.Map;
40 import java.util.Properties;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import javax.annotation.Nonnull;
44 import org.apache.commons.lang.StringUtils;
45 import org.json.JSONArray;
46 import org.json.JSONObject;
47 import org.onap.ccsdk.sli.core.dblib.DbLibService;
48 import org.onap.ccsdk.sli.core.utils.common.EnvProperties;
49 import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionManager;
50 import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionResponse;
51 import org.onap.ccsdk.sli.plugins.grtoolkit.data.AdminHealth;
52 import org.onap.ccsdk.sli.plugins.grtoolkit.data.ClusterActor;
53 import org.onap.ccsdk.sli.plugins.grtoolkit.data.DatabaseHealth;
54 import org.onap.ccsdk.sli.plugins.grtoolkit.data.FailoverStatus;
55 import org.onap.ccsdk.sli.plugins.grtoolkit.data.Health;
56 import org.onap.ccsdk.sli.plugins.grtoolkit.data.MemberBuilder;
57 import org.onap.ccsdk.sli.plugins.grtoolkit.data.PropertyKeys;
58 import org.onap.ccsdk.sli.plugins.grtoolkit.data.SiteHealth;
59 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.HealthResolver;
60 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SingleNodeHealthResolver;
61 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SixNodeHealthResolver;
62 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.ThreeNodeHealthResolver;
63 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
64 import org.opendaylight.mdsal.binding.api.DataBroker;
65 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
66 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
67 import org.opendaylight.mdsal.binding.api.RpcProviderService;
68 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
69 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
70 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
71 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
72 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
73 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
74 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
75 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
76 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
77 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
78 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
79 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
80 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
81 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
82 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
83 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
84 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
85 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
86 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
87 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
88 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
89 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
90 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
91 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
92 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
93 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
94 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
95 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
96 import org.opendaylight.yangtools.concepts.ObjectRegistration;
97 import org.opendaylight.yangtools.yang.common.RpcResult;
98 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
99 import org.slf4j.Logger;
100 import org.slf4j.LoggerFactory;
101
102 /**
103  * API implementation of the {@code GrToolkitService} interface generated from
104  * the gr-toolkit.yang model. The RPCs contained within this class are meant to
105  * run in an architecture agnostic fashion, where the response is repeatable
106  * and predictable across any given node configuration. To facilitate this,
107  * health checking and failover logic has been abstracted into the
108  * {@code HealthResolver} classes.
109  * <p>
110  * Anyone who wishes to write a custom resolver for use with GR Toolkit should
111  * extend the {@code HealthResolver} class. The currently provided resolvers
112  * are useful references for further implementation.
113  *
114  * @author Anthony Haddox
115  * @see GrToolkitService
116  * @see HealthResolver
117  * @see SingleNodeHealthResolver
118  * @see ThreeNodeHealthResolver
119  * @see SixNodeHealthResolver
120  */
121 public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
122     private static final String APP_NAME = "gr-toolkit";
123     private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
124     private String akkaConfig;
125     private String httpProtocol;
126     private String siteIdentifier = System.getenv("SITE_NAME");
127     private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
128     private final ExecutorService executor;
129     protected DataBroker dataBroker;
130     protected NotificationPublishService notificationService;
131     protected RpcProviderService rpcRegistry;
132     protected ObjectRegistration<GrToolkitService> rpcRegistration;
133     protected DbLibService dbLib;
134     private String member;
135     private ClusterActor self;
136     private HashMap<String, ClusterActor> memberMap;
137     private Properties properties;
138     private DistributedDataStoreInterface configDatastore;
139     private HealthResolver resolver;
140
141     /**
142      * Constructs the provider for the GR Toolkit API. Dependencies are
143      * injected using the GrToolkit.xml blueprint.
144      *
145      * @param dataBroker The Data Broker
146      * @param notificationProviderService The Notification Service
147      * @param rpcProviderRegistry The RPC Registry
148      * @param configDatastore The Configuration Data Store provided by the controller
149      * @param dbLibService Reference to the controller provided DbLibService
150      */
151     public GrToolkitProvider(DataBroker dataBroker,
152                              NotificationPublishService notificationProviderService,
153                              RpcProviderService rpcProviderRegistry,
154                              DistributedDataStoreInterface configDatastore,
155                              DbLibService dbLibService) {
156         log.info("Creating provider for {}", APP_NAME);
157         this.executor = Executors.newFixedThreadPool(1);
158         this.dataBroker = dataBroker;
159         this.notificationService = notificationProviderService;
160         this.rpcRegistry = rpcProviderRegistry;
161         this.configDatastore = configDatastore;
162         this.dbLib = dbLibService;
163         initialize();
164     }
165
166     /**
167      * Initializes some structures necessary to hold health check information
168      * and perform failovers.
169      */
170     private void initialize() {
171         log.info("Initializing provider for {}", APP_NAME);
172         createContainers();
173         setProperties();
174         defineMembers();
175         rpcRegistration = rpcRegistry.registerRpcImplementation(GrToolkitService.class, this);
176         log.info("Initialization complete for {}", APP_NAME);
177     }
178
179     /**
180      * Creates the {@code Properties} object with the contents of
181      * gr-toolkit.properties, found at the {@code SDNC_CONFIG_DIR} directory,
182      * which should be set as an environment variable. If the properties file
183      * is not found, GR Toolkit will not function.
184      */
185     private void setProperties() {
186         log.info("Loading properties from {}", PROPERTIES_FILE);
187         properties = new EnvProperties();
188         File propertiesFile = new File(PROPERTIES_FILE);
189         if(!propertiesFile.exists()) {
190             log.warn("setProperties(): Properties file not found.");
191         } else {
192             try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
193                 properties.load(fileInputStream);
194                 if(!properties.containsKey(PropertyKeys.SITE_IDENTIFIER)) {
195                     properties.put(PropertyKeys.SITE_IDENTIFIER, "Unknown Site");
196                 }
197                 httpProtocol = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? "https://" : "http://";
198                 akkaConfig = properties.getProperty(PropertyKeys.AKKA_CONF_LOCATION).trim();
199                 if(StringUtils.isEmpty(siteIdentifier)) {
200                     siteIdentifier = properties.getProperty(PropertyKeys.SITE_IDENTIFIER).trim();
201                 }
202                 log.info("setProperties(): Loaded properties.");
203             } catch(IOException e) {
204                 log.error("setProperties(): Error loading properties.", e);
205             }
206         }
207     }
208
209     /**
210      * Parses the akka.conf file used by the controller to define an akka
211      * cluster. This method requires the <i>seed-nodes</i> definition to exist
212      * on a single line.
213      */
214     private void defineMembers() {
215         member = configDatastore.getActorUtils().getCurrentMemberName().getName();
216         log.info("defineMembers(): Cluster member: {}", member);
217
218         log.info("defineMembers(): Parsing akka.conf for cluster memberMap...");
219         try {
220             File akkaConfigFile = new File(this.akkaConfig);
221             try(FileReader fileReader = new FileReader(akkaConfigFile);
222                 BufferedReader bufferedReader = new BufferedReader(fileReader)) {
223                 String line;
224                 while((line = bufferedReader.readLine()) != null) {
225                     if(line.contains("seed-nodes =")) {
226                         parseSeedNodes(line);
227                         break;
228                     }
229                 }
230             }
231         } catch(IOException e) {
232             log.error("defineMembers(): Couldn't load akka", e);
233         } catch(NullPointerException e) {
234             log.error("defineMembers(): akkaConfig is null. Check properties file and restart {} bundle.", APP_NAME);
235             log.error("defineMembers(): NullPointerException", e);
236         }
237         log.info("self:\n{}", self);
238     }
239
240     /**
241      * Sets up the {@code InstanceIdentifier}s for Data Store transactions.
242      */
243     private void createContainers() {
244         // Replace with MD-SAL write for FailoverStatus
245     }
246
247     /**
248      * Shuts down the {@code ExecutorService} and closes the RPC Provider Registry.
249      */
250     @Override
251     public void close() throws Exception {
252         log.info("Closing provider for {}", APP_NAME);
253         executor.shutdown();
254         rpcRegistration.close();
255         log.info("close(): Successfully closed provider for {}", APP_NAME);
256     }
257
258     /**
259      * Listens for changes to the Data tree.
260      *
261      * @param changes Data tree changes.
262      */
263     @Override
264     public void onDataTreeChanged(@Nonnull Collection changes) {
265         log.info("onDataTreeChanged(): No changes.");
266     }
267
268     /**
269      * Makes a call to {@code resolver.getClusterHealth()} to determine the
270      * health of the akka clustered controllers.
271      *
272      * @param input request body adhering to the model for
273      *        {@code ClusterHealthInput}
274      * @return response adhering to the model for {@code ClusterHealthOutput}
275      * @see HealthResolver
276      * @see ClusterHealthInput
277      * @see ClusterHealthOutput
278      */
279     @Override
280     public ListenableFuture<RpcResult<ClusterHealthOutput>> clusterHealth(ClusterHealthInput input) {
281         log.info("{}:cluster-health invoked.", APP_NAME);
282         resolver.getClusterHealth();
283         return buildClusterHealthOutput();
284     }
285
286     /**
287      * Makes a call to {@code resolver.getSiteHealth()} to determine the health
288      * of all of the application components of a site. In a multi-site config,
289      * this will gather the health of all sites.
290      *
291      * @param input request body adhering to the model for
292      *        {@code SiteHealthInput}
293      * @return response adhering to the model for {@code SiteHealthOutput}
294      * @see HealthResolver
295      * @see SiteHealthInput
296      * @see SiteHealthOutput
297      */
298     @Override
299     public ListenableFuture<RpcResult<SiteHealthOutput>> siteHealth(SiteHealthInput input) {
300         log.info("{}:site-health invoked.", APP_NAME);
301         List<SiteHealth> sites = resolver.getSiteHealth();
302         return buildSiteHealthOutput(sites);
303     }
304
305     /**
306      * Makes a call to {@code resolver.getDatabaseHealth()} to determine the
307      * health of the database(s) used by the controller.
308      *
309      * @param input request body adhering to the model for
310      *        {@code DatabaseHealthInput}
311      * @return response adhering to the model for {@code DatabaseHealthOutput}
312      * @see HealthResolver
313      * @see DatabaseHealthInput
314      * @see DatabaseHealthOutput
315      */
316     @Override
317     public ListenableFuture<RpcResult<DatabaseHealthOutput>> databaseHealth(DatabaseHealthInput input) {
318         log.info("{}:database-health invoked.", APP_NAME);
319         DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
320         DatabaseHealth health = resolver.getDatabaseHealth();
321         outputBuilder.setStatus(health.getHealth().equals(Health.HEALTHY) ? "200" : "500");
322         outputBuilder.setHealth(health.getHealth().toString());
323         outputBuilder.setServedBy(member);
324         log.info("databaseHealth(): Health: {}", health.getHealth());
325         return Futures.immediateFuture(RpcResultBuilder.<DatabaseHealthOutput>status(true).withResult(outputBuilder.build()).build());
326     }
327
328     /**
329      * Makes a call to {@code resolver.getAdminHealth()} to determine the
330      * health of the administrative portal(s) used by the controller.
331      *
332      * @param input request body adhering to the model for
333      *        {@code AdminHealthInput}
334      * @return response adhering to the model for {@code AdminHealthOutput}
335      * @see HealthResolver
336      * @see AdminHealthInput
337      * @see AdminHealthOutput
338      */
339     @Override
340     public ListenableFuture<RpcResult<AdminHealthOutput>> adminHealth(AdminHealthInput input) {
341         log.info("{}:admin-health invoked.", APP_NAME);
342         AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
343         AdminHealth adminHealth = resolver.getAdminHealth();
344         outputBuilder.setStatus(Integer.toString(adminHealth.getStatusCode()));
345         outputBuilder.setHealth(adminHealth.getHealth().toString());
346         outputBuilder.setServedBy(member);
347         log.info("adminHealth(): Status: {} | Health: {}", adminHealth.getStatusCode(), adminHealth.getHealth());
348         return Futures.immediateFuture(RpcResultBuilder.<AdminHealthOutput>status(true).withResult(outputBuilder.build()).build());
349     }
350
351     /**
352      * Places IP Tables rules in place to drop akka communications traffic with
353      * one or mode nodes. This method does not not perform any checks to see if
354      * rules currently exist, and assumes success.
355      *
356      * @param input request body adhering to the model for
357      *        {@code HaltAkkaTrafficInput}
358      * @return response adhering to the model for {@code HaltAkkaTrafficOutput}
359      * @see HaltAkkaTrafficInput
360      * @see HaltAkkaTrafficOutput
361      */
362     @Override
363     public ListenableFuture<RpcResult<HaltAkkaTrafficOutput>> haltAkkaTraffic(HaltAkkaTrafficInput input) {
364         log.info("{}:halt-akka-traffic invoked.", APP_NAME);
365         HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
366         outputBuilder.setStatus("200");
367         modifyIpTables(IpTables.ADD, input.getNodeInfo().toArray());
368         outputBuilder.setServedBy(member);
369
370         return Futures.immediateFuture(RpcResultBuilder.<HaltAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
371     }
372
373     /**
374      * Removes IP Tables rules in place to permit akka communications traffic
375      * with one or mode nodes. This method does not not perform any checks to
376      * see if rules currently exist, and assumes success.
377      *
378      * @param input request body adhering to the model for
379      *        {@code ResumeAkkaTrafficInput}
380      * @return response adhering to the model for {@code ResumeAkkaTrafficOutput}
381      * @see ResumeAkkaTrafficInput
382      * @see ResumeAkkaTrafficOutput
383      */
384     @Override
385     public ListenableFuture<RpcResult<ResumeAkkaTrafficOutput>> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
386         log.info("{}:resume-akka-traffic invoked.", APP_NAME);
387         ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
388         outputBuilder.setStatus("200");
389         modifyIpTables(IpTables.DELETE, input.getNodeInfo().toArray());
390         outputBuilder.setServedBy(member);
391
392         return Futures.immediateFuture(RpcResultBuilder.<ResumeAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
393     }
394
395     /**
396      * Returns a canned response containing the identifier for this
397      * controller's site.
398      *
399      * @param input request body adhering to the model for
400      *        {@code SiteIdentifierInput}
401      * @return response adhering to the model for {@code SiteIdentifierOutput}
402      * @see SiteIdentifierInput
403      * @see SiteIdentifierOutput
404      */
405     @Override
406     public ListenableFuture<RpcResult<SiteIdentifierOutput>> siteIdentifier(SiteIdentifierInput input) {
407         log.info("{}:site-identifier invoked.", APP_NAME);
408         SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
409         outputBuilder.setStatus("200");
410         outputBuilder.setId(siteIdentifier);
411         outputBuilder.setServedBy(member);
412         return Futures.immediateFuture(RpcResultBuilder.<SiteIdentifierOutput>status(true).withResult(outputBuilder.build()).build());
413     }
414
415     /**
416      * Makes a call to {@code resolver.tryFailover()} to try a failover defined
417      * by the active {@code HealthResolver}.
418      *
419      * @param input request body adhering to the model for
420      *        {@code FailoverInput}
421      * @return response adhering to the model for {@code FailoverOutput}
422      * @see HealthResolver
423      * @see FailoverInput
424      * @see FailoverOutput
425      */
426     @Override
427     public ListenableFuture<RpcResult<FailoverOutput>> failover(FailoverInput input) {
428         log.info("{}:failover invoked.", APP_NAME);
429         FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
430         FailoverStatus failoverStatus = resolver.tryFailover(input);
431         outputBuilder.setServedBy(member);
432         outputBuilder.setMessage(failoverStatus.getMessage());
433         outputBuilder.setStatus(Integer.toString(failoverStatus.getStatusCode()));
434         log.info("{}:{}.", APP_NAME, failoverStatus.getMessage());
435         return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
436     }
437
438     /**
439      * Performs an akka traffic isolation of the active site from the standby
440      * site in an Active/Standby architecture. Invokes the
441      * {@code halt-akka-traffic} RPC against the standby site nodes using the
442      * information of the active site nodes.
443      *
444      * @param activeSite list of nodes in the active site
445      * @param standbySite list of nodes in the standby site
446      * @param port http or https port of the controller
447      * @deprecated No longer used since the refactor to use the HealthResolver
448      *             pattern. Retained so the logic can be replicated later.
449      */
450     @Deprecated
451     private void isolateSiteFromCluster(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
452         log.info("isolateSiteFromCluster(): Halting Akka traffic...");
453         for(ClusterActor actor : standbySite) {
454             try {
455                 log.info("Halting Akka traffic for: {}", actor.getNode());
456                 // Build JSON with activeSite actor Node and actor AkkaPort
457                 JSONObject akkaInput = new JSONObject();
458                 JSONObject inputBlock = new JSONObject();
459                 JSONArray votingStateArray = new JSONArray();
460                 JSONObject nodeInfo;
461                 for(ClusterActor node : activeSite) {
462                     nodeInfo = new JSONObject();
463                     nodeInfo.put("node", node.getNode());
464                     nodeInfo.put("port", node.getAkkaPort());
465                     votingStateArray.put(nodeInfo);
466                 }
467                 inputBlock.put("node-info", votingStateArray);
468                 akkaInput.put("input", inputBlock);
469                 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", ConnectionManager.HttpMethod.POST, akkaInput.toString(), "");
470             } catch(IOException e) {
471                 log.error("isolateSiteFromCluster(): Could not halt Akka traffic for: " + actor.getNode(), e);
472             }
473         }
474     }
475
476     /**
477      * Invokes the down unreachable action through the Jolokia mbean API.
478      *
479      * @param activeSite list of nodes in the active site
480      * @param standbySite list of nodes in the standby site
481      * @param port http or https port of the controller
482      * @deprecated No longer used since the refactor to use the HealthResolver
483      *             pattern. Retained so the logic can be replicated later.
484      */
485     @Deprecated
486     private void downUnreachableNodes(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
487         log.info("downUnreachableNodes(): Setting site unreachable...");
488         JSONObject jolokiaInput = new JSONObject();
489         jolokiaInput.put("type", "EXEC");
490         jolokiaInput.put("mbean", "akka:type=Cluster");
491         jolokiaInput.put("operation", "down");
492         JSONArray arguments = new JSONArray();
493         for(ClusterActor actor : activeSite) {
494             // Build Jolokia input
495             // May need to change from akka port to actor.getAkkaPort()
496             arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty(PropertyKeys.CONTROLLER_PORT_AKKA));
497         }
498         jolokiaInput.put("arguments", arguments);
499         log.debug("downUnreachableNodes(): {}", jolokiaInput);
500         try {
501             log.info("downUnreachableNodes(): Setting nodes unreachable");
502             ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + standbySite.get(0).getNode() + ":" + port + "/jolokia", ConnectionManager.HttpMethod.POST, jolokiaInput.toString(), "");
503         } catch(IOException e) {
504             log.error("downUnreachableNodes(): Error setting nodes unreachable", e);
505         }
506     }
507
508     /**
509      * Triggers a data backup and export sequence of MD-SAL data. Invokes the
510      * {@code data-export-import:schedule-export} RPC to schedule a data export
511      * and subsequently the {@code daexim-offsite-backup:backup-data} RPC
512      * against the active site to export and backup the data. Assumes the
513      * controllers have the org.onap.ccsdk.sli.northbound.daeximoffsitebackup
514      * bundle installed.
515      *
516      * @param activeSite list of nodes in the active site
517      * @param port http or https port of the controller
518      * @deprecated No longer used since the refactor to use the HealthResolver
519      *             pattern. Retained so the logic can be replicated later.
520      */
521     @Deprecated
522     private void backupMdSal(ArrayList<ClusterActor> activeSite, String port) {
523         log.info("backupMdSal(): Backing up data...");
524         try {
525             log.info("backupMdSal(): Scheduling backup for: {}", activeSite.get(0).getNode());
526             ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + activeSite.get(0).getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", ConnectionManager.HttpMethod.POST, "{ \"input\": { \"run-at\": \"30\" } }", "");
527         } catch(IOException e) {
528             log.error("backupMdSal(): Error backing up MD-SAL", e);
529         }
530         for(ClusterActor actor : activeSite) {
531             try {
532                 // Move data offsite
533                 log.info("backupMdSal(): Backing up data for: {}", actor.getNode());
534                 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", ConnectionManager.HttpMethod.POST, null, "");
535             } catch(IOException e) {
536                 log.error("backupMdSal(): Error backing up data.", e);
537             }
538         }
539     }
540
541     /**
542      * Builds a response object for {@code clusterHealth()}. Sorts and iterates
543      * over the contents of the {@code memberMap}, which contains the health
544      * information of the cluster, and adds them to the {@code outputBuilder}.
545      * If the ClusterActor is healthy, according to
546      * {@code resolver.isControllerHealthy()}, the {@code ClusterHealthOutput}
547      * status has a {@code 0} appended, otherwise a {@code 1} is appended. A
548      * status of all zeroes denotes a healthy cluster. This status should be
549      * easily decoded by tools which use the output.
550      *
551      * @return future containing a completed {@code ClusterHealthOutput}
552      * @see ClusterActor
553      * @see ClusterHealthOutput
554      * @see HealthResolver
555      */
556     @SuppressWarnings("unchecked")
557     private ListenableFuture<RpcResult<ClusterHealthOutput>> buildClusterHealthOutput() {
558         ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
559         outputBuilder.setServedBy(member);
560         List memberList = new ArrayList<Member>();
561         StringBuilder stat = new StringBuilder();
562         memberMap.values()
563                 .stream()
564                 .sorted(Comparator.comparingInt(member -> Integer.parseInt(member.getMember().split("-")[1])))
565                 .forEach(member -> {
566                     memberList.add(new MemberBuilder(member).build());
567                     // 0 is a healthy controller, 1 is unhealthy.
568                     // The list is sorted so users can decode to find unhealthy nodes
569                     // This will also let them figure out health on a per-site basis
570                     // Depending on any tools they use with this API
571                     if(resolver.isControllerHealthy(member)) {
572                         stat.append("0");
573                     } else {
574                         stat.append("1");
575                     }
576                 });
577         outputBuilder.setStatus(stat.toString());
578         outputBuilder.setMembers(memberList);
579         RpcResult<ClusterHealthOutput> rpcResult = RpcResultBuilder.<ClusterHealthOutput>status(true).withResult(outputBuilder.build()).build();
580         return Futures.immediateFuture(rpcResult);
581     }
582
583     /**
584      * Builds a response object for {@code siteHealth()}. Iterates over a list
585      * of {@code SiteHealth} objects and populates the {@code SiteHealthOutput}
586      * with the information.
587      *
588      * @param sites list of sites
589      * @return future containing a completed {@code SiteHealthOutput}
590      * @see SiteHealth
591      * @see HealthResolver
592      */
593     @SuppressWarnings("unchecked")
594     private ListenableFuture<RpcResult<SiteHealthOutput>> buildSiteHealthOutput(List<SiteHealth> sites) {
595         SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
596         SitesBuilder siteBuilder = new SitesBuilder();
597         outputBuilder.setStatus("200");
598         outputBuilder.setSites((List) new ArrayList<Site>());
599
600         for(SiteHealth site : sites) {
601             siteBuilder.setHealth(site.getHealth().toString());
602             siteBuilder.setRole(site.getRole());
603             siteBuilder.setId(site.getId());
604             outputBuilder.getSites().add(siteBuilder.build());
605             log.info("buildSiteHealthOutput(): Health for {}: {}", site.getId(), site.getHealth().getHealth());
606         }
607
608         outputBuilder.setServedBy(member);
609         RpcResult<SiteHealthOutput> rpcResult = RpcResultBuilder.<SiteHealthOutput>status(true).withResult(outputBuilder.build()).build();
610         return Futures.immediateFuture(rpcResult);
611     }
612
613     /**
614      * Parses a line containing the akka networking information of the akka
615      * controller cluster. Assumes entries of the format:
616      * <p>
617      * akka.tcp://opendaylight-cluster-data@<FQDN>:<AKKA_PORT>
618      * <p>
619      * The information is stored in a {@code ClusterActor} object, and then
620      * added to the memberMap HashMap, with the {@code FQDN} as the key. The
621      * final step is a call to {@code createHealthResolver} to create the
622      * health resolver for the provider.
623      *
624      * @param line the line containing all of the seed nodes
625      * @see ClusterActor
626      * @see HealthResolver
627      */
628     private void parseSeedNodes(String line) {
629         memberMap = new HashMap<>();
630         line = line.substring(line.indexOf("[\""), line.indexOf(']'));
631         String[] splits = line.split(",");
632
633         for(int ndx = 0; ndx < splits.length; ndx++) {
634             String nodeName = splits[ndx];
635             int delimLocation = nodeName.indexOf('@');
636             String port = nodeName.substring(splits[ndx].indexOf(':', delimLocation) + 1, splits[ndx].indexOf('"', splits[ndx].indexOf(':')));
637             splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(':', delimLocation));
638             log.info("parseSeedNodes(): Adding node: {}:{}", splits[ndx], port);
639             ClusterActor clusterActor = new ClusterActor();
640             clusterActor.setNode(splits[ndx]);
641             clusterActor.setAkkaPort(port);
642             clusterActor.setMember("member-" + (ndx + 1));
643             if(member.equals(clusterActor.getMember())) {
644                 self = clusterActor;
645             }
646             memberMap.put(clusterActor.getNode(), clusterActor);
647             log.info("parseSeedNodes(): {}", clusterActor);
648         }
649
650         createHealthResolver();
651     }
652
653     /**
654      * Creates the specific health resolver requested by the user, as specified
655      * in the gr-toolkit.properties file. If a resolver is not specified, or
656      * there is an issue creating the resolver, it will use a fallback resolver
657      * based on how many nodes are added to the memberMap HashMap.
658      *
659      * @see HealthResolver
660      * @see SingleNodeHealthResolver
661      * @see ThreeNodeHealthResolver
662      * @see SixNodeHealthResolver
663      */
664     private void createHealthResolver() {
665         log.info("createHealthResolver(): Creating health resolver...");
666         try {
667             Class resolverClass = null;
668             String userDefinedResolver = properties.getProperty(PropertyKeys.RESOLVER);
669             if(StringUtils.isEmpty(userDefinedResolver)) {
670                 throw new InstantiationException();
671             }
672             resolverClass = Class.forName(userDefinedResolver);
673             Class[] types = { Map.class , properties.getClass(), DbLibService.class };
674             Constructor<HealthResolver> constructor = resolverClass.getConstructor(types);
675             Object[] parameters = { memberMap, properties, dbLib };
676             resolver = constructor.newInstance(parameters);
677             log.info("createHealthResolver(): Created resolver from name {}", resolver.toString());
678         } catch(ClassNotFoundException | InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
679             log.warn("createHealthResolver(): Could not create user defined resolver", e);
680             if(memberMap.size() == 1) {
681                 log.info("createHealthResolver(): FALLBACK: Initializing SingleNodeHealthResolver...");
682                 resolver  = new SingleNodeHealthResolver(memberMap, properties, dbLib);
683             } else if(memberMap.size() == 3) {
684                 log.info("createHealthResolver(): FALLBACK: Initializing ThreeNodeHealthResolver...");
685                 resolver  = new ThreeNodeHealthResolver(memberMap, properties, dbLib);
686             } else if(memberMap.size() == 6) {
687                 log.info("createHealthResolver(): FALLBACK: Initializing SixNodeHealthResolver...");
688                 resolver  = new SixNodeHealthResolver(memberMap, properties, dbLib);
689             }
690         }
691     }
692
693     /**
694      * Adds or drops IPTables rules to block or resume akka traffic for a node
695      * in the akka cluster. Assumes that the user or group that the controller
696      * is run as has the ability to run sudo /sbin/iptables without requiring a
697      * password. This method will run indefinitely if that assumption is not
698      * correct. This method does not check to see if any rules around the node
699      * are preexisting, so multiple uses will result in multiple additions and
700      * removals from IPTables.
701      *
702      * @param task the operation to be performed against IPTables
703      * @param nodeInfo array containing the nodes to be added or dropped from
704      *                 IPTables
705      */
706     private void modifyIpTables(IpTables task, Object[] nodeInfo) {
707         log.info("modifyIpTables(): Modifying IPTables rules...");
708         if(task == IpTables.ADD) {
709             for(Object node : nodeInfo) {
710                 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
711                         (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
712                 log.info("modifyIpTables(): Isolating {}", n.getNode());
713                 executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
714                 executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
715             }
716         } else if(task == IpTables.DELETE) {
717             for(Object node : nodeInfo) {
718                 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
719                         (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
720                 log.info("modifyIpTables(): De-isolating {}", n.getNode());
721                 executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
722                 executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
723             }
724         }
725         if(nodeInfo.length > 0) {
726             executeCommand("sudo /sbin/iptables -L");
727         }
728     }
729
730     /**
731      * Opens a shell session and executes a command.
732      *
733      * @param command the shell command to execute
734      */
735     private void executeCommand(String command) {
736         log.info("executeCommand(): Executing command: {}", command);
737         String[] cmd = command.split(" ");
738         try {
739             Process p = Runtime.getRuntime().exec(cmd);
740             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
741             String inputLine;
742             StringBuilder content = new StringBuilder();
743             while((inputLine = bufferedReader.readLine()) != null) {
744                 content.append(inputLine);
745             }
746             bufferedReader.close();
747             log.info("executeCommand(): {}", content);
748         } catch(IOException e) {
749             log.error("executeCommand(): Error executing command", e);
750         }
751     }
752
753     /**
754      * The IPTables operations this module can perform.
755      */
756     enum IpTables {
757         ADD,
758         DELETE
759     }
760 }