2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2018 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.plugins;
24 import java.io.BufferedReader;
26 import java.io.FileInputStream;
27 import java.io.FileReader;
28 import java.io.IOException;
29 import java.io.InputStreamReader;
30 import java.io.OutputStream;
31 import java.net.HttpURLConnection;
33 import java.nio.charset.StandardCharsets;
34 import java.sql.SQLException;
35 import java.util.ArrayList;
36 import java.util.Collection;
37 import java.util.HashMap;
39 import java.util.Properties;
40 import java.util.List;
41 import java.util.concurrent.ExecutorService;
42 import java.util.concurrent.Executors;
43 import java.util.regex.Matcher;
44 import java.util.regex.Pattern;
45 import javax.annotation.Nonnull;
47 import com.google.common.util.concurrent.Futures;
48 import com.google.common.util.concurrent.ListenableFuture;
50 import org.onap.ccsdk.sli.core.dblib.DbLibService;
51 import org.onap.ccsdk.sli.plugins.data.ClusterActor;
52 import org.onap.ccsdk.sli.plugins.data.MemberBuilder;
54 import org.json.JSONArray;
55 import org.json.JSONException;
56 import org.json.JSONObject;
58 import org.opendaylight.controller.cluster.datastore.DistributedDataStoreInterface;
59 import org.opendaylight.controller.md.sal.binding.api.DataBroker;
60 import org.opendaylight.controller.md.sal.binding.api.DataTreeChangeListener;
61 import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
62 import org.opendaylight.controller.sal.binding.api.BindingAwareBroker;
63 import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
64 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthInput;
65 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutput;
66 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.AdminHealthOutputBuilder;
67 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthInput;
68 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutput;
69 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ClusterHealthOutputBuilder;
70 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthInput;
71 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutput;
72 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.DatabaseHealthOutputBuilder;
73 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverInput;
74 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutput;
75 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.FailoverOutputBuilder;
76 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.GrToolkitService;
77 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficInput;
78 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutput;
79 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.HaltAkkaTrafficOutputBuilder;
80 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Member;
81 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficInput;
82 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutput;
83 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.ResumeAkkaTrafficOutputBuilder;
84 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.Site;
85 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthInput;
86 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutput;
87 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteHealthOutputBuilder;
88 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierInput;
89 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutput;
90 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.SiteIdentifierOutputBuilder;
91 import org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.site.health.output.SitesBuilder;
92 import org.opendaylight.yangtools.yang.common.RpcResult;
93 import org.opendaylight.yangtools.yang.common.RpcResultBuilder;
95 import org.slf4j.Logger;
96 import org.slf4j.LoggerFactory;
98 public class GrToolkitProvider implements AutoCloseable, GrToolkitService, DataTreeChangeListener {
99 private static final String APP_NAME = "gr-toolkit";
100 private static final String PROPERTIES_FILE = System.getenv("SDNC_CONFIG_DIR") + "/gr-toolkit.properties";
101 private static final String HEALTHY = "HEALTHY";
102 private static final String FAULTY = "FAULTY";
103 private static final String VALUE = "value";
104 private String akkaConfig;
105 private String jolokiaClusterPath;
106 private String shardManagerPath;
107 private String shardPathTemplate;
108 private String credentials;
109 private String httpProtocol;
110 private String siteIdentifier = System.getenv("SITE_NAME");
111 private final Logger log = LoggerFactory.getLogger(GrToolkitProvider.class);
112 private final ExecutorService executor;
113 protected DataBroker dataBroker;
114 protected NotificationPublishService notificationService;
115 protected RpcProviderRegistry rpcRegistry;
116 protected BindingAwareBroker.RpcRegistration<GrToolkitService> rpcRegistration;
117 protected DbLibService dbLib;
118 private String member;
119 private ClusterActor self;
120 private HashMap<String, ClusterActor> memberMap;
121 private SiteConfiguration siteConfiguration;
122 private Properties properties;
123 private DistributedDataStoreInterface configDatastore;
124 public GrToolkitProvider(DataBroker dataBroker,
125 NotificationPublishService notificationProviderService,
126 RpcProviderRegistry rpcProviderRegistry,
127 DistributedDataStoreInterface configDatastore,
128 DbLibService dbLibService) {
129 this.log.info("Creating provider for {}", APP_NAME);
130 this.executor = Executors.newFixedThreadPool(1);
131 this.dataBroker = dataBroker;
132 this.notificationService = notificationProviderService;
133 this.rpcRegistry = rpcProviderRegistry;
134 this.configDatastore = configDatastore;
135 this.dbLib = dbLibService;
139 private void initialize() {
140 log.info("Initializing provider for {}", APP_NAME);
141 // Create the top level containers
146 rpcRegistration = rpcRegistry.addRpcImplementation(GrToolkitService.class, this);
147 log.info("Initialization complete for {}", APP_NAME);
150 private void setProperties() {
151 log.info("Loading properties from {}", PROPERTIES_FILE);
152 properties = new Properties();
153 File propertiesFile = new File(PROPERTIES_FILE);
154 if(!propertiesFile.exists()) {
155 log.warn("Properties file not found.");
158 try(FileInputStream fileInputStream = new FileInputStream(propertiesFile)) {
159 properties.load(fileInputStream);
160 if(!properties.containsKey(PropertyKeys.SITE_IDENTIFIER)) {
161 properties.put(PropertyKeys.SITE_IDENTIFIER, "Unknown Site");
163 String port = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? properties.getProperty(PropertyKeys.CONTROLLER_PORT_SSL).trim() : properties.getProperty(PropertyKeys.CONTROLLER_PORT_HTTP).trim();
164 httpProtocol = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL).trim()) ? "https://" : "http://";
165 akkaConfig = properties.getProperty(PropertyKeys.AKKA_CONF_LOCATION).trim();
166 jolokiaClusterPath = ":" + port + properties.getProperty(PropertyKeys.MBEAN_CLUSTER).trim();
167 shardManagerPath = ":" + port + properties.getProperty(PropertyKeys.MBEAN_SHARD_MANAGER).trim();
168 shardPathTemplate = ":" + port + properties.getProperty(PropertyKeys.MBEAN_SHARD_CONFIG).trim();
169 if(siteIdentifier == null || siteIdentifier.isEmpty()) {
170 siteIdentifier = properties.getProperty(PropertyKeys.SITE_IDENTIFIER).trim();
172 credentials = properties.getProperty(PropertyKeys.CONTROLLER_CREDENTIALS).trim();
173 log.info("Loaded properties.");
174 } catch(IOException e) {
175 log.error("Error loading properties.", e);
179 private void defineMembers() {
180 member = configDatastore.getActorContext().getCurrentMemberName().getName();
181 log.info("Cluster member: {}", member);
183 log.info("Parsing akka.conf for cluster memberMap...");
185 File akkaConfigFile = new File(this.akkaConfig);
186 try(FileReader fileReader = new FileReader(akkaConfigFile);
187 BufferedReader bufferedReader = new BufferedReader(fileReader)) {
189 while((line = bufferedReader.readLine()) != null) {
190 if(line.contains("seed-nodes =")) {
191 parseSeedNodes(line);
196 } catch(IOException e) {
197 log.error("Couldn't load akka", e);
199 log.info("self:\n{}", self);
202 private void createContainers() {
203 // Replace with MD-SAL write for FailoverStatus
206 protected void initializeChild() {
207 // Override if you have custom initialization intelligence
211 public void close() throws Exception {
212 log.info("Closing provider for {}", APP_NAME);
214 rpcRegistration.close();
215 log.info("Successfully closed provider for {}", APP_NAME);
219 public void onDataTreeChanged(@Nonnull Collection changes) {
220 log.info("onDataTreeChanged() called. but there is no change here");
224 public ListenableFuture<RpcResult<ClusterHealthOutput>> clusterHealth(ClusterHealthInput input) {
225 log.info("{}:cluster-health invoked.", APP_NAME);
226 getControllerHealth();
227 return buildClusterHealthOutput("200");
231 public ListenableFuture<RpcResult<SiteHealthOutput>> siteHealth(SiteHealthInput input) {
232 log.info("{}:site-health invoked.", APP_NAME);
233 getControllerHealth();
234 return buildSiteHealthOutput("200", getAdminHealth(), getDatabaseHealth());
238 public ListenableFuture<RpcResult<DatabaseHealthOutput>> databaseHealth(DatabaseHealthInput input) {
239 log.info("{}:database-health invoked.", APP_NAME);
240 DatabaseHealthOutputBuilder outputBuilder = new DatabaseHealthOutputBuilder();
241 outputBuilder.setStatus("200");
242 outputBuilder.setHealth(getDatabaseHealth());
244 return Futures.immediateFuture(RpcResultBuilder.<DatabaseHealthOutput>status(true).withResult(outputBuilder.build()).build());
248 public ListenableFuture<RpcResult<AdminHealthOutput>> adminHealth(AdminHealthInput input) {
249 log.info("{}:admin-health invoked.", APP_NAME);
250 AdminHealthOutputBuilder outputBuilder = new AdminHealthOutputBuilder();
251 outputBuilder.setStatus("200");
252 outputBuilder.setHealth(getAdminHealth());
254 return Futures.immediateFuture(RpcResultBuilder.<AdminHealthOutput>status(true).withResult(outputBuilder.build()).build());
258 public ListenableFuture<RpcResult<HaltAkkaTrafficOutput>> haltAkkaTraffic(HaltAkkaTrafficInput input) {
259 log.info("{}:halt-akka-traffic invoked.", APP_NAME);
260 HaltAkkaTrafficOutputBuilder outputBuilder = new HaltAkkaTrafficOutputBuilder();
261 outputBuilder.setStatus("200");
262 modifyIpTables(IpTables.ADD, input.getNodeInfo().toArray());
264 return Futures.immediateFuture(RpcResultBuilder.<HaltAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
268 public ListenableFuture<RpcResult<ResumeAkkaTrafficOutput>> resumeAkkaTraffic(ResumeAkkaTrafficInput input) {
269 log.info("{}:resume-akka-traffic invoked.", APP_NAME);
270 ResumeAkkaTrafficOutputBuilder outputBuilder = new ResumeAkkaTrafficOutputBuilder();
271 outputBuilder.setStatus("200");
272 modifyIpTables(IpTables.DELETE, input.getNodeInfo().toArray());
274 return Futures.immediateFuture(RpcResultBuilder.<ResumeAkkaTrafficOutput>status(true).withResult(outputBuilder.build()).build());
278 public ListenableFuture<RpcResult<SiteIdentifierOutput>> siteIdentifier(SiteIdentifierInput input) {
279 log.info("{}:site-identifier invoked.", APP_NAME);
280 SiteIdentifierOutputBuilder outputBuilder = new SiteIdentifierOutputBuilder();
281 outputBuilder.setStatus("200");
282 outputBuilder.setId(siteIdentifier);
284 return Futures.immediateFuture(RpcResultBuilder.<SiteIdentifierOutput>status(true).withResult(outputBuilder.build()).build());
288 public ListenableFuture<RpcResult<FailoverOutput>> failover(FailoverInput input) {
289 log.info("{}:failover invoked.", APP_NAME);
290 FailoverOutputBuilder outputBuilder = new FailoverOutputBuilder();
291 if(siteConfiguration != SiteConfiguration.GEO) {
292 log.info("Cannot failover non-GEO site.");
293 outputBuilder.setMessage("Failover aborted. This is not a GEO configuration.");
294 outputBuilder.setStatus("400");
295 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
297 ArrayList<ClusterActor> activeSite = new ArrayList<>();
298 ArrayList<ClusterActor> standbySite = new ArrayList<>();
300 log.info("Performing preliminary cluster health check...");
301 // Necessary to populate all member info. Health is not used for judgement calls.
302 getControllerHealth();
304 log.info("Determining active site...");
305 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
306 String key = entry.getKey();
307 ClusterActor clusterActor = entry.getValue();
308 if(clusterActor.isVoting()) {
309 activeSite.add(clusterActor);
310 log.debug("Active Site member: {}", key);
313 standbySite.add(clusterActor);
314 log.debug("Standby Site member: {}", key);
318 String port = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL)) ? properties.getProperty(PropertyKeys.CONTROLLER_PORT_SSL) : properties.getProperty(PropertyKeys.CONTROLLER_PORT_HTTP);
320 if(Boolean.parseBoolean(input.getBackupData())) {
321 backupMdSal(activeSite, port);
324 if(!changeClusterVoting(outputBuilder, activeSite, standbySite, port))
325 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
327 if(Boolean.parseBoolean(input.getIsolate())) {
328 isolateSiteFromCluster(activeSite, standbySite, port);
330 if(Boolean.parseBoolean(input.getDownUnreachable())) {
331 downUnreachableNodes(activeSite, standbySite, port);
335 log.info("{}:failover complete.", APP_NAME);
337 outputBuilder.setMessage("Failover complete.");
338 outputBuilder.setStatus("200");
339 return Futures.immediateFuture(RpcResultBuilder.<FailoverOutput>status(true).withResult(outputBuilder.build()).build());
342 private void isolateSiteFromCluster(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
343 log.info("Halting Akka traffic...");
344 for(ClusterActor actor : standbySite) {
346 log.info("Halting Akka traffic for: {}", actor.getNode());
347 // Build JSON with activeSite actor Node and actor AkkaPort
348 JSONObject akkaInput = new JSONObject();
349 JSONObject inputBlock = new JSONObject();
350 JSONArray votingStateArray = new JSONArray();
352 for(ClusterActor node : activeSite) {
353 nodeInfo = new JSONObject();
354 nodeInfo.put("node", node.getNode());
355 nodeInfo.put("port", node.getAkkaPort());
356 votingStateArray.put(nodeInfo);
358 inputBlock.put("node-info", votingStateArray);
359 akkaInput.put("input", inputBlock);
360 getRequestContent(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:halt-akka-traffic", HttpMethod.POST, akkaInput.toString());
361 } catch(IOException e) {
362 log.error("Could not halt Akka traffic for: " + actor.getNode(), e);
367 private void downUnreachableNodes(ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
368 log.info("Setting site unreachable...");
369 JSONObject jolokiaInput = new JSONObject();
370 jolokiaInput.put("type", "EXEC");
371 jolokiaInput.put("mbean", "akka:type=Cluster");
372 jolokiaInput.put("operation", "down");
373 JSONArray arguments = new JSONArray();
374 for(ClusterActor actor : activeSite) {
375 // Build Jolokia input
376 // May need to change from akka port to actor.getAkkaPort()
377 arguments.put("akka.tcp://opendaylight-cluster-data@" + actor.getNode() + ":" + properties.getProperty(PropertyKeys.CONTROLLER_PORT_AKKA));
379 jolokiaInput.put("arguments", arguments);
380 log.debug("{}", jolokiaInput);
382 log.info("Setting nodes unreachable");
383 getRequestContent(httpProtocol + standbySite.get(0).getNode() + ":" + port + "/jolokia", HttpMethod.POST, jolokiaInput.toString());
384 } catch(IOException e) {
385 log.error("Error setting nodes unreachable", e);
389 private boolean changeClusterVoting(FailoverOutputBuilder outputBuilder, ArrayList<ClusterActor> activeSite, ArrayList<ClusterActor> standbySite, String port) {
390 log.info("Changing voting for all shards to standby site...");
392 JSONObject votingInput = new JSONObject();
393 JSONObject inputBlock = new JSONObject();
394 JSONArray votingStateArray = new JSONArray();
395 JSONObject memberVotingState;
396 for(ClusterActor actor : activeSite) {
397 memberVotingState = new JSONObject();
398 memberVotingState.put("member-name", actor.getMember());
399 memberVotingState.put("voting", false);
400 votingStateArray.put(memberVotingState);
402 for(ClusterActor actor : standbySite) {
403 memberVotingState = new JSONObject();
404 memberVotingState.put("member-name", actor.getMember());
405 memberVotingState.put("voting", true);
406 votingStateArray.put(memberVotingState);
408 inputBlock.put("member-voting-state", votingStateArray);
409 votingInput.put("input", inputBlock);
410 log.debug("{}", votingInput);
411 // Change voting all shards
412 getRequestContent(httpProtocol + self.getNode() + ":" + port + "/restconf/operations/cluster-admin:change-member-voting-states-for-all-shards", HttpMethod.POST, votingInput.toString());
413 } catch(IOException e) {
414 log.error("Changing voting", e);
415 outputBuilder.setMessage("Failover aborted. Failed to change voting.");
416 outputBuilder.setStatus("500");
422 private void backupMdSal(ArrayList<ClusterActor> activeSite, String port) {
423 log.info("Backing up data...");
425 log.info("Scheduling backup for: {}", activeSite.get(0).getNode());
426 getRequestContent(httpProtocol + activeSite.get(0).getNode() + ":" + port + "/restconf/operations/data-export-import:schedule-export", HttpMethod.POST, "{ \"input\": { \"run-at\": \"30\" } }");
427 } catch(IOException e) {
428 log.error("Error backing up MD-SAL", e);
430 for(ClusterActor actor : activeSite) {
433 log.info("Backing up data for: {}", actor.getNode());
434 getRequestContent(httpProtocol + actor.getNode() + ":" + port + "/restconf/operations/daexim-offsite-backup:backup-data", HttpMethod.POST);
435 } catch(IOException e) {
436 log.error("Error backing up data.", e);
441 private ListenableFuture<RpcResult<ClusterHealthOutput>> buildClusterHealthOutput(String statusCode) {
442 ClusterHealthOutputBuilder outputBuilder = new ClusterHealthOutputBuilder();
443 outputBuilder.setStatus(statusCode);
444 outputBuilder.setMembers((List) new ArrayList<Member>());
448 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
449 ClusterActor clusterActor = entry.getValue();
450 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
451 if(ClusterActor.SITE_1.equals(clusterActor.getSite()))
453 else if(ClusterActor.SITE_2.equals(clusterActor.getSite()))
456 outputBuilder.getMembers().add(new MemberBuilder(clusterActor).build());
458 if(siteConfiguration == SiteConfiguration.SOLO) {
459 outputBuilder.setSite1Health(HEALTHY);
461 else if(site1Health > 1) {
462 outputBuilder.setSite1Health(HEALTHY);
465 outputBuilder.setSite1Health(FAULTY);
468 if(siteConfiguration == SiteConfiguration.GEO && site2Health > 1) {
469 outputBuilder.setSite2Health(HEALTHY);
471 else if(siteConfiguration == SiteConfiguration.GEO) {
472 outputBuilder.setSite2Health(FAULTY);
475 RpcResult<ClusterHealthOutput> rpcResult = RpcResultBuilder.<ClusterHealthOutput>status(true).withResult(outputBuilder.build()).build();
476 return Futures.immediateFuture(rpcResult);
479 private ListenableFuture<RpcResult<SiteHealthOutput>> buildSiteHealthOutput(String statusCode, String adminHealth, String databaseHealth) {
480 SiteHealthOutputBuilder outputBuilder = new SiteHealthOutputBuilder();
481 outputBuilder.setStatus(statusCode);
482 outputBuilder.setSites((List) new ArrayList<Site>());
484 if(siteConfiguration != SiteConfiguration.GEO) {
486 SitesBuilder builder = new SitesBuilder();
487 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
488 ClusterActor clusterActor = entry.getValue();
489 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
493 if(siteConfiguration != SiteConfiguration.SOLO) {
494 builder.setHealth(HEALTHY);
495 builder.setRole("ACTIVE");
496 builder.setId(siteIdentifier);
499 builder = getSitesBuilder(healthyODLs, true, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), siteIdentifier);
501 outputBuilder.getSites().add(builder.build());
504 int site1HealthyODLs = 0;
505 int site2HealthyODLs = 0;
506 boolean site1Voting = false;
507 boolean site2Voting = false;
508 boolean performedCrossSiteHealthCheck = false;
509 boolean crossSiteAdminHealthy = false;
510 boolean crossSiteDbHealthy = false;
511 String crossSiteIdentifier = "UNKNOWN_SITE";
512 String port = "true".equals(properties.getProperty(PropertyKeys.CONTROLLER_USE_SSL)) ? properties.getProperty(PropertyKeys.CONTROLLER_PORT_SSL) : properties.getProperty(PropertyKeys.CONTROLLER_PORT_HTTP);
514 // Make calls over to site 2 healthchecks
515 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
516 ClusterActor clusterActor = entry.getValue();
517 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
518 if(ClusterActor.SITE_1.equals(clusterActor.getSite())) {
520 if(clusterActor.isVoting()) {
526 if(clusterActor.isVoting()) {
529 if(!performedCrossSiteHealthCheck) {
531 String content = getRequestContent(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:site-identifier", HttpMethod.POST);
532 crossSiteIdentifier = new JSONObject(content).getJSONObject("output").getString("id");
533 crossSiteDbHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:database-health");
534 crossSiteAdminHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:admin-health");
535 performedCrossSiteHealthCheck = true;
536 } catch(Exception e) {
537 log.info("Cannot get site identifier from {}", clusterActor.getNode());
538 log.error("Site Health Error", e);
544 SitesBuilder builder = getSitesBuilder(site1HealthyODLs, site1Voting, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), siteIdentifier);
545 outputBuilder.getSites().add(builder.build());
546 builder = getSitesBuilder(site2HealthyODLs, site2Voting, crossSiteAdminHealthy, crossSiteDbHealthy, crossSiteIdentifier);
547 outputBuilder.getSites().add(builder.build());
550 // Make calls over to site 1 healthchecks
551 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
552 ClusterActor clusterActor = entry.getValue();
553 if(clusterActor.isUp() && !clusterActor.isUnreachable()) {
554 if(ClusterActor.SITE_1.equals(clusterActor.getSite())) {
556 if(clusterActor.isVoting()) {
559 if(!performedCrossSiteHealthCheck) {
561 String content = getRequestContent(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:site-identifier", HttpMethod.POST);
562 crossSiteIdentifier = new JSONObject(content).getJSONObject("output").getString("id");
563 crossSiteDbHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:database-health");
564 crossSiteAdminHealthy = crossSiteHealthRequest(httpProtocol + clusterActor.getNode() + ":" + port + "/restconf/operations/gr-toolkit:admin-health");
565 performedCrossSiteHealthCheck = true;
566 } catch(Exception e) {
567 log.info("Cannot get site identifier from {}", clusterActor.getNode());
568 log.error("Site Health Error", e);
574 if(clusterActor.isVoting()) {
581 SitesBuilder builder = getSitesBuilder(site1HealthyODLs, site1Voting, crossSiteAdminHealthy, crossSiteDbHealthy, crossSiteIdentifier);
582 outputBuilder.getSites().add(builder.build());
583 builder = getSitesBuilder(site2HealthyODLs, site2Voting, HEALTHY.equals(adminHealth), HEALTHY.equals(databaseHealth), siteIdentifier);
584 outputBuilder.getSites().add(builder.build());
588 RpcResult<SiteHealthOutput> rpcResult = RpcResultBuilder.<SiteHealthOutput>status(true).withResult(outputBuilder.build()).build();
589 return Futures.immediateFuture(rpcResult);
592 private SitesBuilder getSitesBuilder(int siteHealthyODLs, boolean siteVoting, boolean adminHealthy, boolean dbHealthy, String siteIdentifier) {
593 SitesBuilder builder = new SitesBuilder();
594 if(siteHealthyODLs > 1) {
595 builder.setHealth(HEALTHY);
598 log.warn("{} Healthy ODLs: {}", siteIdentifier, siteHealthyODLs);
599 builder.setHealth(FAULTY);
602 log.warn("{} Admin Health: {}", siteIdentifier, FAULTY);
603 builder.setHealth(FAULTY);
606 log.warn("{} Database Health: {}", siteIdentifier, FAULTY);
607 builder.setHealth(FAULTY);
610 builder.setRole("ACTIVE");
613 builder.setRole("STANDBY");
615 builder.setId(siteIdentifier);
619 private boolean isSite1() {
620 int memberNumber = Integer.parseInt(member.split("-")[1]);
621 boolean isSite1 = memberNumber < 4;
622 log.info("isSite1(): {}", isSite1);
626 private void parseSeedNodes(String line) {
627 memberMap = new HashMap<>();
628 line = line.substring(line.indexOf("[\""), line.indexOf(']'));
629 String[] splits = line.split(",");
631 for(int ndx = 0; ndx < splits.length; ndx++) {
632 String nodeName = splits[ndx];
633 int delimLocation = nodeName.indexOf('@');
634 String port = nodeName.substring(splits[ndx].indexOf(':', delimLocation) + 1, splits[ndx].indexOf('"', splits[ndx].indexOf(':')));
635 splits[ndx] = nodeName.substring(delimLocation + 1, splits[ndx].indexOf(':', delimLocation));
636 log.info("Adding node: {}:{}", splits[ndx], port);
637 ClusterActor clusterActor = new ClusterActor();
638 clusterActor.setNode(splits[ndx]);
639 clusterActor.setAkkaPort(port);
640 clusterActor.setMember("member-" + (ndx + 1));
642 clusterActor.setSite(ClusterActor.SITE_1);
645 clusterActor.setSite(ClusterActor.SITE_2);
648 if(member.equals(clusterActor.getMember())) {
651 memberMap.put(clusterActor.getNode(), clusterActor);
652 log.info("{}", clusterActor);
655 if(memberMap.size() == 1) {
656 log.info("1 member found. This is a solo environment.");
657 siteConfiguration = SiteConfiguration.SOLO;
659 else if(memberMap.size() == 3) {
660 log.info("This is a single site.");
661 siteConfiguration = SiteConfiguration.SINGLE;
663 else if(memberMap.size() == 6) {
664 log.info("This is a georedundant site.");
665 siteConfiguration = SiteConfiguration.GEO;
669 private void getMemberStatus(ClusterActor clusterActor) throws IOException {
670 log.info("Getting member status for {}", clusterActor.getNode());
671 String content = getRequestContent(httpProtocol + clusterActor.getNode() + jolokiaClusterPath, HttpMethod.GET);
673 JSONObject responseJson = new JSONObject(content);
674 JSONObject responseValue = responseJson.getJSONObject(VALUE);
675 clusterActor.setUp("Up".equals(responseValue.getString("MemberStatus")));
676 clusterActor.setUnreachable(false);
677 } catch(JSONException e) {
678 log.error("Error parsing response from {}", clusterActor.getNode(), e);
679 clusterActor.setUp(false);
680 clusterActor.setUnreachable(true);
684 private void getShardStatus(ClusterActor clusterActor) throws IOException {
685 log.info("Getting shard status for {}", clusterActor.getNode());
686 String content = getRequestContent(httpProtocol + clusterActor.getNode() + shardManagerPath, HttpMethod.GET);
688 JSONObject responseValue = new JSONObject(content).getJSONObject(VALUE);
689 JSONArray shardList = responseValue.getJSONArray("LocalShards");
691 String pattern = "-config$";
692 Pattern r = Pattern.compile(pattern);
694 for(int ndx = 0; ndx < shardList.length(); ndx++) {
695 String configShardName = shardList.getString(ndx);
696 m = r.matcher(configShardName);
697 String operationalShardName = m.replaceFirst("-operational");
698 String shardConfigPath = String.format(shardPathTemplate, configShardName);
699 String shardOperationalPath = String.format(shardPathTemplate, operationalShardName).replace("Config", "Operational");
700 extractShardInfo(clusterActor, configShardName, shardConfigPath);
701 extractShardInfo(clusterActor, operationalShardName, shardOperationalPath);
703 } catch(JSONException e) {
704 log.error("Error parsing response from " + clusterActor.getNode(), e);
708 private void extractShardInfo(ClusterActor clusterActor, String shardName, String shardPath) throws IOException {
709 log.info("Extracting shard info for {}", shardName);
710 log.debug("Pulling config info for {} from: {}", shardName, shardPath);
711 String content = getRequestContent(httpProtocol + clusterActor.getNode() + shardPath, HttpMethod.GET);
712 log.debug("Response: {}", content);
715 JSONObject shardValue = new JSONObject(content).getJSONObject(VALUE);
716 clusterActor.setVoting(shardValue.getBoolean("Voting"));
717 if(shardValue.getString("PeerAddresses").length() > 0) {
718 clusterActor.getReplicaShards().add(shardName);
719 if(shardValue.getString("Leader").startsWith(clusterActor.getMember())) {
720 clusterActor.getShardLeader().add(shardName);
724 clusterActor.getNonReplicaShards().add(shardName);
726 JSONArray followerInfo = shardValue.getJSONArray("FollowerInfo");
727 for(int followerNdx = 0; followerNdx < followerInfo.length(); followerNdx++) {
728 int commitIndex = shardValue.getInt("CommitIndex");
729 int matchIndex = followerInfo.getJSONObject(followerNdx).getInt("matchIndex");
730 if(commitIndex != -1 && matchIndex != -1) {
731 int commitsBehind = commitIndex - matchIndex;
732 clusterActor.getCommits().put(followerInfo.getJSONObject(followerNdx).getString("id"), commitsBehind);
735 } catch(JSONException e) {
736 log.error("Error parsing response from " + clusterActor.getNode(), e);
740 private void getControllerHealth() {
741 for(Map.Entry<String, ClusterActor> entry : memberMap.entrySet()) {
742 ClusterActor clusterActor = entry.getValue();
743 String key = entry.getKey();
745 // First flush out the old values
746 clusterActor.flush();
747 log.info("Gathering info for {}", clusterActor.getNode());
748 getMemberStatus(clusterActor);
749 getShardStatus(clusterActor);
750 log.info("MemberInfo:\n{}", clusterActor);
751 } catch(IOException e) {
752 log.error("Connection Error", e);
753 memberMap.get(key).setUnreachable(true);
754 memberMap.get(key).setUp(false);
755 log.info("MemberInfo:\n{}", memberMap.get(key));
760 private void modifyIpTables(IpTables task, Object[] nodeInfo) {
761 log.info("Modifying IPTables rules...");
762 if(task == IpTables.ADD) {
763 for(Object node : nodeInfo) {
764 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo n =
765 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.halt.akka.traffic.input.NodeInfo) node;
766 log.info("Isolating {}", n.getNode());
767 executeCommand(String.format("sudo /sbin/iptables -A INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
768 executeCommand(String.format("sudo /sbin/iptables -A OUTPUT -p tcp --destination-port %s -j DROP -s %s", n.getPort(), n.getNode()));
771 } else if(task == IpTables.DELETE) {
772 for(Object node : nodeInfo) {
773 org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo n =
774 (org.opendaylight.yang.gen.v1.org.onap.ccsdk.sli.plugins.gr.toolkit.rev180926.resume.akka.traffic.input.NodeInfo) node;
775 log.info("De-isolating {}", n.getNode());
776 executeCommand(String.format("sudo /sbin/iptables -D INPUT -p tcp --destination-port %s -j DROP -s %s", properties.get(PropertyKeys.CONTROLLER_PORT_AKKA), n.getNode()));
777 executeCommand(String.format("sudo /sbin/iptables -D OUTPUT -p tcp --destination-port %s -j DROP -s %s", n.getPort(), n.getNode()));
781 executeCommand("sudo /sbin/iptables -L");
784 private void executeCommand(String command) {
785 log.info("Executing command: {}", command);
786 String[] cmd = command.split(" ");
788 Process p = Runtime.getRuntime().exec(cmd);
789 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(p.getInputStream()));
791 StringBuilder content = new StringBuilder();
792 while((inputLine = bufferedReader.readLine()) != null) {
793 content.append(inputLine);
795 bufferedReader.close();
796 log.info("{}", content);
797 } catch(IOException e) {
798 log.error("Error executing command", e);
802 private boolean crossSiteHealthRequest(String path) throws IOException {
803 String content = getRequestContent(path, HttpMethod.POST);
805 JSONObject responseJson = new JSONObject(content);
806 JSONObject responseValue = responseJson.getJSONObject(VALUE);
807 return HEALTHY.equals(responseValue.getString("health"));
808 } catch(JSONException e) {
809 log.error("Error parsing JSON", e);
810 throw new IOException();
814 private String getAdminHealth() {
815 String protocol = "true".equals(properties.getProperty(PropertyKeys.ADM_USE_SSL)) ? "https://" : "http://";
816 String port = "true".equals(properties.getProperty(PropertyKeys.ADM_USE_SSL)) ? properties.getProperty(PropertyKeys.ADM_PORT_SSL) : properties.getProperty(PropertyKeys.ADM_PORT_HTTP);
817 String path = protocol + properties.getProperty(PropertyKeys.ADM_FQDN) + ":" + port + properties.getProperty(PropertyKeys.ADM_HEALTHCHECK);
818 log.info("Requesting healthcheck from {}", path);
820 int response = getRequestStatus(path, HttpMethod.GET);
821 log.info("Response: {}", response);
825 } catch(IOException e) {
826 log.error("Problem getting ADM health.", e);
831 private String getDatabaseHealth() {
832 log.info("Determining database health...");
834 log.info("DBLib isActive(): {}", dbLib.isActive());
835 log.info("DBLib isReadOnly(): {}", dbLib.getConnection().isReadOnly());
836 log.info("DBLib isClosed(): {}", dbLib.getConnection().isClosed());
837 if(!dbLib.isActive() || dbLib.getConnection().isClosed() || dbLib.getConnection().isReadOnly()) {
838 log.warn("Database is FAULTY");
841 log.info("Database is HEALTHY");
842 } catch(SQLException e) {
843 log.error("Database is FAULTY");
844 log.error("Error", e);
851 private String getRequestContent(String path, HttpMethod method) throws IOException {
852 return getRequestContent(path, method, null);
855 private String getRequestContent(String path, HttpMethod method, String input) throws IOException {
856 HttpURLConnection connection = getConnection(path);
857 connection.setRequestMethod(method.getMethod());
858 connection.setDoInput(true);
861 sendPayload(input, connection);
864 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
866 StringBuilder content = new StringBuilder();
867 while((inputLine = bufferedReader.readLine()) != null) {
868 content.append(inputLine);
870 bufferedReader.close();
871 connection.disconnect();
872 return content.toString();
875 private int getRequestStatus(String path, HttpMethod method) throws IOException {
876 return getRequestStatus(path, method, null);
879 private int getRequestStatus(String path, HttpMethod method, String input) throws IOException {
880 HttpURLConnection connection = getConnection(path);
881 connection.setRequestMethod(method.getMethod());
882 connection.setDoInput(true);
885 sendPayload(input, connection);
887 int response = connection.getResponseCode();
888 log.info("Received {} response code from {}", response, path);
889 connection.disconnect();
893 private void sendPayload(String input, HttpURLConnection connection) throws IOException {
894 byte[] out = input.getBytes(StandardCharsets.UTF_8);
895 int length = out.length;
897 connection.setFixedLengthStreamingMode(length);
898 connection.setRequestProperty("Content-Type", "application/json");
899 connection.setDoOutput(true);
900 connection.connect();
901 try(OutputStream os = connection.getOutputStream()) {
906 private HttpURLConnection getConnection(String host) throws IOException {
907 log.info("Getting connection to: {}", host);
908 URL url = new URL(host);
909 String auth = "Basic " + javax.xml.bind.DatatypeConverter.printBase64Binary(credentials.getBytes());
910 HttpURLConnection connection = (HttpURLConnection) url.openConnection();
911 connection.addRequestProperty("Authorization", auth);
912 connection.setRequestProperty("Connection", "keep-alive");
913 connection.setRequestProperty("Proxy-Connection", "keep-alive");
917 private enum IpTables {
922 private enum SiteConfiguration {
928 private enum HttpMethod {
932 private String method;
933 HttpMethod(String method) {
934 this.method = method;
936 public String getMethod() {
941 private class PropertyKeys {
942 static final String SITE_IDENTIFIER = "site.identifier";
943 static final String CONTROLLER_USE_SSL = "controller.useSsl";
944 static final String CONTROLLER_PORT_SSL = "controller.port.ssl";
945 static final String CONTROLLER_PORT_HTTP = "controller.port.http";
946 static final String CONTROLLER_PORT_AKKA = "controller.port.akka";
947 static final String CONTROLLER_CREDENTIALS = "controller.credentials";
948 static final String AKKA_CONF_LOCATION = "akka.conf.location";
949 static final String MBEAN_CLUSTER = "mbean.cluster";
950 static final String MBEAN_SHARD_MANAGER = "mbean.shardManager";
951 static final String MBEAN_SHARD_CONFIG = "mbean.shard.config";
952 static final String ADM_USE_SSL = "adm.useSsl";
953 static final String ADM_PORT_SSL = "adm.port.ssl";
954 static final String ADM_PORT_HTTP = "adm.port.http";
955 static final String ADM_FQDN = "adm.fqdn";
956 static final String ADM_HEALTHCHECK= "adm.healthcheck";