[CCSDK-1985]GR Toolkit Refactor
[ccsdk/sli/plugins.git] / grToolkit / provider / src / main / java / org / onap / ccsdk / sli / plugins / grtoolkit / GrToolkitProvider.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : SDN-C
4  * ================================================================================
5  * Copyright (C) 2018 AT&T Intellectual Property. All rights
6  *                      reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.ccsdk.sli.plugins.grtoolkit;
23
24 import java.io.BufferedReader;
25 import java.io.File;
26 import java.io.FileInputStream;
27 import java.io.FileReader;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.lang.reflect.Constructor;
31 import java.lang.reflect.InvocationTargetException;
32 import java.util.ArrayList;
33 import java.util.Collection;
34 import java.util.Comparator;
35 import java.util.HashMap;
36 import java.util.Map;
37 import java.util.Properties;
38 import java.util.List;
39 import java.util.concurrent.ExecutorService;
40 import java.util.concurrent.Executors;
41 import javax.annotation.Nonnull;
42
43 import com.google.common.util.concurrent.Futures;
44 import com.google.common.util.concurrent.ListenableFuture;
45
46 import org.apache.commons.lang.StringUtils;
47
48 import org.onap.ccsdk.sli.core.dblib.DbLibService;
49 import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionManager;
50 import org.onap.ccsdk.sli.plugins.grtoolkit.connection.ConnectionResponse;
51 import org.onap.ccsdk.sli.plugins.grtoolkit.data.AdminHealth;
52 import org.onap.ccsdk.sli.plugins.grtoolkit.data.ClusterActor;
53 import org.onap.ccsdk.sli.plugins.grtoolkit.data.DatabaseHealth;
54 import org.onap.ccsdk.sli.plugins.grtoolkit.data.FailoverStatus;
55 import org.onap.ccsdk.sli.plugins.grtoolkit.data.Health;
56 import org.onap.ccsdk.sli.plugins.grtoolkit.data.MemberBuilder;
57 import org.onap.ccsdk.sli.plugins.grtoolkit.data.PropertyKeys;
58 import org.onap.ccsdk.sli.plugins.grtoolkit.data.SiteHealth;
59 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.HealthResolver;
60 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SingleNodeHealthResolver;
61 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.SixNodeHealthResolver;
62 import org.onap.ccsdk.sli.plugins.grtoolkit.resolver.ThreeNodeHealthResolver;
63
64 import org.json.JSONArray;
65 import org.json.JSONObject;
66
67 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
68 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
69 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
70 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
71 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
72 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
73 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
74 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
75 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
76 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
77 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
78 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
79 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
80 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
81 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
82 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
83 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
84 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
85 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
86 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
87 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
88 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
89 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
90 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
91 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
92 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
93 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
94 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
95 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
96 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
97 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
98 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
99 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
100 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
101 import org.opendaylight.yangtools.yang.common.RpcResult;
102 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
103
104 import org.slf4j.Logger;
105 import org.slf4j.LoggerFactory;
106
107 /**
108  * API implementation of the {@code GrToolkitService} interface generated from
109  * the gr-toolkit.yang model. The RPCs contained within this class are meant to
110  * run in an architecture agnostic fashion, where the response is repeatable
111  * and predictable across any given node configuration. To facilitate this,
112  * health checking and failover logic has been abstracted into the
113  * {@code HealthResolver} classes.
114  * <p>
115  * Anyone who wishes to write a custom resolver for use with GR Toolkit should
116  * extend the {@code HealthResolver} class. The currently provided resolvers
117  * are useful references for further implementation.
118  *
119  * @author Anthony Haddox
120  * @see GrToolkitService
121  * @see HealthResolver
122  * @see SingleNodeHealthResolver
123  * @see ThreeNodeHealthResolver
124  * @see SixNodeHealthResolver
125  */
126 public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
127     private static final String APP_NAME = "gr-toolkit";
128     private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
129     private String akkaConfig;
130     private String httpProtocol;
131     private String siteIdentifier = System.getenv("SITE_NAME");
132     private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
133     private final ExecutorService executor;
134     protected DataBroker dataBroker;
135     protected NotificationPublishService notificationService;
136     protected RpcProviderRegistry rpcRegistry;
137     protected BindingAwareBroker.RpcRegistration<GrToolkitService> rpcRegistration;
138     protected DbLibService dbLib;
139     private String member;
140     private ClusterActor self;
141     private HashMap<String, ClusterActor> memberMap;
142     private Properties properties;
143     private DistributedDataStoreInterface configDatastore;
144     private HealthResolver resolver;
145
146     /**
147      * Constructs the provider for the GR Toolkit API. Dependencies are
148      * injected using the GrToolkit.xml blueprint.
149      *
150      * @param dataBroker The Data Broker
151      * @param notificationProviderService The Notification Service
152      * @param rpcProviderRegistry The RPC Registry
153      * @param configDatastore The Configuration Data Store provided by the controller
154      * @param dbLibService Reference to the controller provided DbLibService
155      */
156     public GrToolkitProvider(DataBroker dataBroker,
157                              NotificationPublishService notificationProviderService,
158                              RpcProviderRegistry rpcProviderRegistry,
159                              DistributedDataStoreInterface configDatastore,
160                              DbLibService dbLibService) {
161         log.info("Creating provider for {}", APP_NAME);
162         this.executor = Executors.newFixedThreadPool(1);
163         this.dataBroker = dataBroker;
164         this.notificationService = notificationProviderService;
165         this.rpcRegistry = rpcProviderRegistry;
166         this.configDatastore = configDatastore;
167         this.dbLib = dbLibService;
168         initialize();
169     }
170
171     /**
172      * Initializes some structures necessary to hold health check information
173      * and perform failovers.
174      */
175     private void initialize() {
176         log.info("Initializing provider for {}", APP_NAME);
177         createContainers();
178         setProperties();
179         defineMembers();
180         rpcRegistration = rpcRegistry.addRpcImplementation(GrToolkitService.class, this);
181         log.info("Initialization complete for {}", APP_NAME);
182     }
183
184     /**
185      * Creates the {@code Properties} object with the contents of
186      * gr-toolkit.properties, found at the {@code SDNC_CONFIG_DIR} directory,
187      * which should be set as an environment variable. If the properties file
188      * is not found, GR Toolkit will not function.
189      */
190     private void setProperties() {
191         log.info("Loading properties from {}", PROPERTIES_FILE);
192         properties = new Properties();
193         File propertiesFile = new File(PROPERTIES_FILE);
194         if(!propertiesFile.exists()) {
195             log.warn("setProperties(): Properties file not found.");
196         } else {
197             try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
198                 properties.load(fileInputStream);
199                 if(!properties.containsKey(PropertyKeys.SITE_IDENTIFIER)) {
200                     properties.put(PropertyKeys.SITE_IDENTIFIER, "Unknown Site");
201                 }
202                 httpProtocol = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? "https://" : "http://";
203                 akkaConfig = properties.getProperty(PropertyKeys.AKKA_CONF_LOCATION).trim();
204                 if(StringUtils.isEmpty(siteIdentifier)) {
205                     siteIdentifier = properties.getProperty(PropertyKeys.SITE_IDENTIFIER).trim();
206                 }
207                 log.info("setProperties(): Loaded properties.");
208             } catch(IOException e) {
209                 log.error("setProperties(): Error loading properties.", e);
210             }
211         }
212     }
213
214     /**
215      * Parses the akka.conf file used by the controller to define an akka
216      * cluster. This method requires the <i>seed-nodes</i> definition to exist
217      * on a single line.
218      */
219     private void defineMembers() {
220         member = configDatastore.getActorContext().getCurrentMemberName().getName();
221         log.info("defineMembers(): Cluster member: {}", member);
222
223         log.info("defineMembers(): Parsing akka.conf for cluster memberMap...");
224         try {
225             File akkaConfigFile = new File(this.akkaConfig);
226             try(FileReader fileReader = new FileReader(akkaConfigFile);
227                 BufferedReader bufferedReader = new BufferedReader(fileReader)) {
228                 String line;
229                 while((line = bufferedReader.readLine()) != null) {
230                     if(line.contains("seed-nodes =")) {
231                         parseSeedNodes(line);
232                         break;
233                     }
234                 }
235             }
236         } catch(IOException e) {
237             log.error("defineMembers(): Couldn't load akka", e);
238         } catch(NullPointerException e) {
239             log.error("defineMembers(): akkaConfig is null. Check properties file and restart {} bundle.", APP_NAME);
240             log.error("defineMembers(): NullPointerException", e);
241         }
242         log.info("self:\n{}", self);
243     }
244
245     /**
246      * Sets up the {@code InstanceIdentifier}s for Data Store transactions.
247      */
248     private void createContainers() {
249         // Replace with MD-SAL write for FailoverStatus
250     }
251
252     /**
253      * Shuts down the {@code ExecutorService} and closes the RPC Provider Registry.
254      */
255     @Override
256     public void close() throws Exception {
257         log.info("Closing provider for {}", APP_NAME);
258         executor.shutdown();
259         rpcRegistration.close();
260         log.info("close(): Successfully closed provider for {}", APP_NAME);
261     }
262
263     /**
264      * Listens for changes to the Data tree.
265      *
266      * @param changes Data tree changes.
267      */
268     @Override
269     public void onDataTreeChanged(@Nonnull Collection changes) {
270         log.info("onDataTreeChanged(): No changes.");
271     }
272
273     /**
274      * Makes a call to {@code resolver.getClusterHealth()} to determine the
275      * health of the akka clustered controllers.
276      *
277      * @param input request body adhering to the model for
278      *        {@code ClusterHealthInput}
279      * @return response adhering to the model for {@code ClusterHealthOutput}
280      * @see HealthResolver
281      * @see ClusterHealthInput
282      * @see ClusterHealthOutput
283      */
284     @Override
285     public ListenableFuture<RpcResult<ClusterHealthOutput>> clusterHealth(ClusterHealthInput input) {
286         log.info("{}:cluster-health invoked.", APP_NAME);
287         resolver.getClusterHealth();
288         return buildClusterHealthOutput();
289     }
290
291     /**
292      * Makes a call to {@code resolver.getSiteHealth()} to determine the health
293      * of all of the application components of a site. In a multi-site config,
294      * this will gather the health of all sites.
295      *
296      * @param input request body adhering to the model for
297      *        {@code SiteHealthInput}
298      * @return response adhering to the model for {@code SiteHealthOutput}
299      * @see HealthResolver
300      * @see SiteHealthInput
301      * @see SiteHealthOutput
302      */
303     @Override
304     public ListenableFuture<RpcResult<SiteHealthOutput>> siteHealth(SiteHealthInput input) {
305         log.info("{}:site-health invoked.", APP_NAME);
306         List<SiteHealth> sites = resolver.getSiteHealth();
307         return buildSiteHealthOutput(sites);
308     }
309
310     /**
311      * Makes a call to {@code resolver.getDatabaseHealth()} to determine the
312      * health of the database(s) used by the controller.
313      *
314      * @param input request body adhering to the model for
315      *        {@code DatabaseHealthInput}
316      * @return response adhering to the model for {@code DatabaseHealthOutput}
317      * @see HealthResolver
318      * @see DatabaseHealthInput
319      * @see DatabaseHealthOutput
320      */
321     @Override
322     public ListenableFuture<RpcResult<DatabaseHealthOutput>> databaseHealth(DatabaseHealthInput input) {
323         log.info("{}:database-health invoked.", APP_NAME);
324         DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
325         DatabaseHealth health = resolver.getDatabaseHealth();
326         outputBuilder.setStatus(health.getHealth().equals(Health.HEALTHY) ? "200" : "500");
327         outputBuilder.setHealth(health.getHealth().toString());
328         outputBuilder.setServedBy(member);
329         log.info("databaseHealth(): Health: {}", health.getHealth());
330         return Futures.immediateFuture(RpcResultBuilder.<DatabaseHealthOutput>status(true).withResult(outputBuilder.build()).build());
331     }
332
333     /**
334      * Makes a call to {@code resolver.getAdminHealth()} to determine the
335      * health of the administrative portal(s) used by the controller.
336      *
337      * @param input request body adhering to the model for
338      *        {@code AdminHealthInput}
339      * @return response adhering to the model for {@code AdminHealthOutput}
340      * @see HealthResolver
341      * @see AdminHealthInput
342      * @see AdminHealthOutput
343      */
344     @Override
345     public ListenableFuture<RpcResult<AdminHealthOutput>> adminHealth(AdminHealthInput input) {
346         log.info("{}:admin-health invoked.", APP_NAME);
347         AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
348         AdminHealth adminHealth = resolver.getAdminHealth();
349         outputBuilder.setStatus(Integer.toString(adminHealth.getStatusCode()));
350         outputBuilder.setHealth(adminHealth.getHealth().toString());
351         outputBuilder.setServedBy(member);
352         log.info("adminHealth(): Status: {} | Health: {}", adminHealth.getStatusCode(), adminHealth.getHealth());
353         return Futures.immediateFuture(RpcResultBuilder.<AdminHealthOutput>status(true).withResult(outputBuilder.build()).build());
354     }
355
356     /**
357      * Places IP Tables rules in place to drop akka communications traffic with
358      * one or mode nodes. This method does not not perform any checks to see if
359      * rules currently exist, and assumes success.
360      *
361      * @param input request body adhering to the model for
362      *        {@code HaltAkkaTrafficInput}
363      * @return response adhering to the model for {@code HaltAkkaTrafficOutput}
364      * @see HaltAkkaTrafficInput
365      * @see HaltAkkaTrafficOutput
366      */
367     @Override
368     public ListenableFuture<RpcResult<HaltAkkaTrafficOutput>> haltAkkaTraffic(HaltAkkaTrafficInput input) {
369         log.info("{}:halt-akka-traffic invoked.", APP_NAME);
370         HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
371         outputBuilder.setStatus("200");
372         modifyIpTables(IpTables.ADD, input.getNodeInfo().toArray());
373         outputBuilder.setServedBy(member);
374
375         return Futures.immediateFuture(RpcResultBuilder.<HaltAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
376     }
377
378     /**
379      * Removes IP Tables rules in place to permit akka communications traffic
380      * with one or mode nodes. This method does not not perform any checks to
381      * see if rules currently exist, and assumes success.
382      *
383      * @param input request body adhering to the model for
384      *        {@code ResumeAkkaTrafficInput}
385      * @return response adhering to the model for {@code ResumeAkkaTrafficOutput}
386      * @see ResumeAkkaTrafficInput
387      * @see ResumeAkkaTrafficOutput
388      */
389     @Override
390     public ListenableFuture<RpcResult<ResumeAkkaTrafficOutput>> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
391         log.info("{}:resume-akka-traffic invoked.", APP_NAME);
392         ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
393         outputBuilder.setStatus("200");
394         modifyIpTables(IpTables.DELETE, input.getNodeInfo().toArray());
395         outputBuilder.setServedBy(member);
396
397         return Futures.immediateFuture(RpcResultBuilder.<ResumeAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
398     }
399
400     /**
401      * Returns a canned response containing the identifier for this
402      * controller's site.
403      *
404      * @param input request body adhering to the model for
405      *        {@code SiteIdentifierInput}
406      * @return response adhering to the model for {@code SiteIdentifierOutput}
407      * @see SiteIdentifierInput
408      * @see SiteIdentifierOutput
409      */
410     @Override
411     public ListenableFuture<RpcResult<SiteIdentifierOutput>> siteIdentifier(SiteIdentifierInput input) {
412         log.info("{}:site-identifier invoked.", APP_NAME);
413         SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
414         outputBuilder.setStatus("200");
415         outputBuilder.setId(siteIdentifier);
416         outputBuilder.setServedBy(member);
417         return Futures.immediateFuture(RpcResultBuilder.<SiteIdentifierOutput>status(true).withResult(outputBuilder.build()).build());
418     }
419
420     /**
421      * Makes a call to {@code resolver.tryFailover()} to try a failover defined
422      * by the active {@code HealthResolver}.
423      *
424      * @param input request body adhering to the model for
425      *        {@code FailoverInput}
426      * @return response adhering to the model for {@code FailoverOutput}
427      * @see HealthResolver
428      * @see FailoverInput
429      * @see FailoverOutput
430      */
431     @Override
432     public ListenableFuture<RpcResult<FailoverOutput>> failover(FailoverInput input) {
433         log.info("{}:failover invoked.", APP_NAME);
434         FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
435         FailoverStatus failoverStatus = resolver.tryFailover(input);
436         outputBuilder.setServedBy(member);
437         outputBuilder.setMessage(failoverStatus.getMessage());
438         outputBuilder.setStatus(Integer.toString(failoverStatus.getStatusCode()));
439         log.info("{}:{}.", APP_NAME, failoverStatus.getMessage());
440         return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
441     }
442
443     /**
444      * Performs an akka traffic isolation of the active site from the standby
445      * site in an Active/Standby architecture. Invokes the
446      * {@code halt-akka-traffic} RPC against the standby site nodes using the
447      * information of the active site nodes.
448      *
449      * @param activeSite list of nodes in the active site
450      * @param standbySite list of nodes in the standby site
451      * @param port http or https port of the controller
452      * @deprecated No longer used since the refactor to use the HealthResolver
453      *             pattern. Retained so the logic can be replicated later.
454      */
455     private void isolateSiteFromCluster(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
456         log.info("isolateSiteFromCluster(): Halting Akka traffic...");
457         for(ClusterActor actor : standbySite) {
458             try {
459                 log.info("Halting Akka traffic for: {}", actor.getNode());
460                 // Build JSON with activeSite actor Node and actor AkkaPort
461                 JSONObject akkaInput = new JSONObject();
462                 JSONObject inputBlock = new JSONObject();
463                 JSONArray votingStateArray = new JSONArray();
464                 JSONObject nodeInfo;
465                 for(ClusterActor node : activeSite) {
466                     nodeInfo = new JSONObject();
467                     nodeInfo.put("node", node.getNode());
468                     nodeInfo.put("port", node.getAkkaPort());
469                     votingStateArray.put(nodeInfo);
470                 }
471                 inputBlock.put("node-info", votingStateArray);
472                 akkaInput.put("input", inputBlock);
473                 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", ConnectionManager.HttpMethod.POST, akkaInput.toString(), "");
474             } catch(IOException e) {
475                 log.error("isolateSiteFromCluster(): Could not halt Akka traffic for: " + actor.getNode(), e);
476             }
477         }
478     }
479
480     /**
481      * Invokes the down unreachable action through the Jolokia mbean API.
482      *
483      * @param activeSite list of nodes in the active site
484      * @param standbySite list of nodes in the standby site
485      * @param port http or https port of the controller
486      * @deprecated No longer used since the refactor to use the HealthResolver
487      *             pattern. Retained so the logic can be replicated later.
488      */
489     private void downUnreachableNodes(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
490         log.info("downUnreachableNodes(): Setting site unreachable...");
491         JSONObject jolokiaInput = new JSONObject();
492         jolokiaInput.put("type", "EXEC");
493         jolokiaInput.put("mbean", "akka:type=Cluster");
494         jolokiaInput.put("operation", "down");
495         JSONArray arguments = new JSONArray();
496         for(ClusterActor actor : activeSite) {
497             // Build Jolokia input
498             // May need to change from akka port to actor.getAkkaPort()
499             arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty(PropertyKeys.CONTROLLER_PORT_AKKA));
500         }
501         jolokiaInput.put("arguments", arguments);
502         log.debug("downUnreachableNodes(): {}", jolokiaInput);
503         try {
504             log.info("downUnreachableNodes(): Setting nodes unreachable");
505             ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + standbySite.get(0).getNode() + ":" + port + "/jolokia", ConnectionManager.HttpMethod.POST, jolokiaInput.toString(), "");
506         } catch(IOException e) {
507             log.error("downUnreachableNodes(): Error setting nodes unreachable", e);
508         }
509     }
510
511     /**
512      * Triggers a data backup and export sequence of MD-SAL data. Invokes the
513      * {@code data-export-import:schedule-export} RPC to schedule a data export
514      * and subsequently the {@code daexim-offsite-backup:backup-data} RPC
515      * against the active site to export and backup the data. Assumes the
516      * controllers have the org.onap.ccsdk.sli.northbound.daeximoffsitebackup
517      * bundle installed.
518      *
519      * @param activeSite list of nodes in the active site
520      * @param port http or https port of the controller
521      * @deprecated No longer used since the refactor to use the HealthResolver
522      *             pattern. Retained so the logic can be replicated later.
523      */
524     private void backupMdSal(ArrayList<ClusterActor> activeSite, String port) {
525         log.info("backupMdSal(): Backing up data...");
526         try {
527             log.info("backupMdSal(): Scheduling backup for: {}", activeSite.get(0).getNode());
528             ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + activeSite.get(0).getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", ConnectionManager.HttpMethod.POST, "{ \"input\": { \"run-at\": \"30\" } }", "");
529         } catch(IOException e) {
530             log.error("backupMdSal(): Error backing up MD-SAL", e);
531         }
532         for(ClusterActor actor : activeSite) {
533             try {
534                 // Move data offsite
535                 log.info("backupMdSal(): Backing up data for: {}", actor.getNode());
536                 ConnectionResponse response = ConnectionManager.getConnectionResponse(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", ConnectionManager.HttpMethod.POST, null, "");
537             } catch(IOException e) {
538                 log.error("backupMdSal(): Error backing up data.", e);
539             }
540         }
541     }
542
543     /**
544      * Builds a response object for {@code clusterHealth()}. Sorts and iterates
545      * over the contents of the {@code memberMap}, which contains the health
546      * information of the cluster, and adds them to the {@code outputBuilder}.
547      * If the ClusterActor is healthy, according to
548      * {@code resolver.isControllerHealthy()}, the {@code ClusterHealthOutput}
549      * status has a {@code 0} appended, otherwise a {@code 1} is appended. A
550      * status of all zeroes denotes a healthy cluster. This status should be
551      * easily decoded by tools which use the output.
552      *
553      * @return future containing a completed {@code ClusterHealthOutput}
554      * @see ClusterActor
555      * @see ClusterHealthOutput
556      * @see HealthResolver
557      */
558     @SuppressWarnings("unchecked")
559     private ListenableFuture<RpcResult<ClusterHealthOutput>> buildClusterHealthOutput() {
560         ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
561         outputBuilder.setServedBy(member);
562         List memberList = new ArrayList<Member>();
563         StringBuilder stat = new StringBuilder();
564         memberMap.values()
565                 .stream()
566                 .sorted(Comparator.comparingInt(member -> Integer.parseInt(member.getMember().split("-")[1])))
567                 .forEach(member -> {
568                     memberList.add(new MemberBuilder(member).build());
569                     // 0 is a healthy controller, 1 is unhealthy.
570                     // The list is sorted so users can decode to find unhealthy nodes
571                     // This will also let them figure out health on a per-site basis
572                     // Depending on any tools they use with this API
573                     if(resolver.isControllerHealthy(member)) {
574                         stat.append("0");
575                     } else {
576                         stat.append("1");
577                     }
578                 });
579         outputBuilder.setStatus(stat.toString());
580         outputBuilder.setMembers(memberList);
581         RpcResult<ClusterHealthOutput> rpcResult = RpcResultBuilder.<ClusterHealthOutput>status(true).withResult(outputBuilder.build()).build();
582         return Futures.immediateFuture(rpcResult);
583     }
584
585     /**
586      * Builds a response object for {@code siteHealth()}. Iterates over a list
587      * of {@code SiteHealth} objects and populates the {@code SiteHealthOutput}
588      * with the information.
589      *
590      * @param sites list of sites
591      * @return future containing a completed {@code SiteHealthOutput}
592      * @see SiteHealth
593      * @see HealthResolver
594      */
595     @SuppressWarnings("unchecked")
596     private ListenableFuture<RpcResult<SiteHealthOutput>> buildSiteHealthOutput(List<SiteHealth> sites) {
597         SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
598         SitesBuilder siteBuilder = new SitesBuilder();
599         outputBuilder.setStatus("200");
600         outputBuilder.setSites((List) new ArrayList<Site>());
601
602         for(SiteHealth site : sites) {
603             siteBuilder.setHealth(site.getHealth().toString());
604             siteBuilder.setRole(site.getRole());
605             siteBuilder.setId(site.getId());
606             outputBuilder.getSites().add(siteBuilder.build());
607             log.info("buildSiteHealthOutput(): Health for {}: {}", site.getId(), site.getHealth().getHealth());
608         }
609
610         outputBuilder.setServedBy(member);
611         RpcResult<SiteHealthOutput> rpcResult = RpcResultBuilder.<SiteHealthOutput>status(true).withResult(outputBuilder.build()).build();
612         return Futures.immediateFuture(rpcResult);
613     }
614
615     /**
616      * Parses a line containing the akka networking information of the akka
617      * controller cluster. Assumes entries of the format:
618      * <p>
619      * akka.tcp://opendaylight-cluster-data@<FQDN>:<AKKA_PORT>
620      * <p>
621      * The information is stored in a {@code ClusterActor} object, and then
622      * added to the memberMap HashMap, with the {@code FQDN} as the key. The
623      * final step is a call to {@code createHealthResolver} to create the
624      * health resolver for the provider.
625      *
626      * @param line the line containing all of the seed nodes
627      * @see ClusterActor
628      * @see HealthResolver
629      */
630     private void parseSeedNodes(String line) {
631         memberMap = new HashMap<>();
632         line = line.substring(line.indexOf("[\""), line.indexOf(']'));
633         String[] splits = line.split(",");
634
635         for(int ndx = 0; ndx < splits.length; ndx++) {
636             String nodeName = splits[ndx];
637             int delimLocation = nodeName.indexOf('@');
638             String port = nodeName.substring(splits[ndx].indexOf(':', delimLocation) + 1, splits[ndx].indexOf('"', splits[ndx].indexOf(':')));
639             splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(':', delimLocation));
640             log.info("parseSeedNodes(): Adding node: {}:{}", splits[ndx], port);
641             ClusterActor clusterActor = new ClusterActor();
642             clusterActor.setNode(splits[ndx]);
643             clusterActor.setAkkaPort(port);
644             clusterActor.setMember("member-" + (ndx + 1));
645             if(member.equals(clusterActor.getMember())) {
646                 self = clusterActor;
647             }
648             memberMap.put(clusterActor.getNode(), clusterActor);
649             log.info("parseSeedNodes(): {}", clusterActor);
650         }
651
652         createHealthResolver();
653     }
654
655     /**
656      * Creates the specific health resolver requested by the user, as specified
657      * in the gr-toolkit.properties file. If a resolver is not specified, or
658      * there is an issue creating the resolver, it will use a fallback resolver
659      * based on how many nodes are added to the memberMap HashMap.
660      *
661      * @see HealthResolver
662      * @see SingleNodeHealthResolver
663      * @see ThreeNodeHealthResolver
664      * @see SixNodeHealthResolver
665      */
666     private void createHealthResolver() {
667         log.info("createHealthResolver(): Creating health resolver...");
668         try {
669             Class resolverClass = null;
670             String userDefinedResolver = properties.getProperty(PropertyKeys.RESOLVER);
671             if(StringUtils.isEmpty(userDefinedResolver)) {
672                 throw new InstantiationException();
673             }
674             resolverClass = Class.forName(userDefinedResolver);
675             Class[] types = { Map.class , properties.getClass(), DbLibService.class };
676             Constructor<HealthResolver> constructor = resolverClass.getConstructor(types);
677             Object[] parameters = { memberMap, properties, dbLib };
678             resolver = constructor.newInstance(parameters);
679             log.info("createHealthResolver(): Created resolver from name {}", resolver.toString());
680         } catch(ClassNotFoundException | InstantiationException | InvocationTargetException | NoSuchMethodException | IllegalAccessException e) {
681             log.warn("createHealthResolver(): Could not create user defined resolver", e);
682             if(memberMap.size() == 1) {
683                 log.info("createHealthResolver(): FALLBACK: Initializing SingleNodeHealthResolver...");
684                 resolver  = new SingleNodeHealthResolver(memberMap, properties, dbLib);
685             } else if(memberMap.size() == 3) {
686                 log.info("createHealthResolver(): FALLBACK: Initializing ThreeNodeHealthResolver...");
687                 resolver  = new ThreeNodeHealthResolver(memberMap, properties, dbLib);
688             } else if(memberMap.size() == 6) {
689                 log.info("createHealthResolver(): FALLBACK: Initializing SixNodeHealthResolver...");
690                 resolver  = new SixNodeHealthResolver(memberMap, properties, dbLib);
691             }
692         }
693     }
694
695     /**
696      * Adds or drops IPTables rules to block or resume akka traffic for a node
697      * in the akka cluster. Assumes that the user or group that the controller
698      * is run as has the ability to run sudo /sbin/iptables without requiring a
699      * password. This method will run indefinitely if that assumption is not
700      * correct. This method does not check to see if any rules around the node
701      * are preexisting, so multiple uses will result in multiple additions and
702      * removals from IPTables.
703      *
704      * @param task the operation to be performed against IPTables
705      * @param nodeInfo array containing the nodes to be added or dropped from
706      *                 IPTables
707      */
708     private void modifyIpTables(IpTables task, Object[] nodeInfo) {
709         log.info("modifyIpTables(): Modifying IPTables rules...");
710         if(task == IpTables.ADD) {
711             for(Object node : nodeInfo) {
712                 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
713                         (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
714                 log.info("modifyIpTables(): Isolating {}", n.getNode());
715                 executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
716                 executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
717             }
718         } else if(task == IpTables.DELETE) {
719             for(Object node : nodeInfo) {
720                 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
721                         (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
722                 log.info("modifyIpTables(): De-isolating {}", n.getNode());
723                 executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
724                 executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -d %s", n.getPort(), n.getNode()));
725             }
726         }
727         if(nodeInfo.length > 0) {
728             executeCommand("sudo /sbin/iptables -L");
729         }
730     }
731
732     /**
733      * Opens a shell session and executes a command.
734      *
735      * @param command the shell command to execute
736      */
737     private void executeCommand(String command) {
738         log.info("executeCommand(): Executing command: {}", command);
739         String[] cmd = command.split(" ");
740         try {
741             Process p = Runtime.getRuntime().exec(cmd);
742             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
743             String inputLine;
744             StringBuilder content = new StringBuilder();
745             while((inputLine = bufferedReader.readLine()) != null) {
746                 content.append(inputLine);
747             }
748             bufferedReader.close();
749             log.info("executeCommand(): {}", content);
750         } catch(IOException e) {
751             log.error("executeCommand(): Error executing command", e);
752         }
753     }
754
755     /**
756      * The IPTables operations this module can perform.
757      */
758     enum IpTables {
759         ADD,
760         DELETE
761     }
762 }