1 /*******************************************************************************
 
   2  * ============LICENSE_START========================================================================
 
   3  * ONAP : ccsdk feature sdnr wt
 
   4  * =================================================================================================
 
   5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
 
   6  * =================================================================================================
 
   7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 
   8  * in compliance with the License. You may obtain a copy of the License at
 
  10  * http://www.apache.org/licenses/LICENSE-2.0
 
  12  * Unless required by applicable law or agreed to in writing, software distributed under the License
 
  13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 
  14  * or implied. See the License for the specific language governing permissions and limitations under
 
  16  * ============LICENSE_END==========================================================================
 
  17  ******************************************************************************/
 
  18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database;
 
  20 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
 
  22 import java.io.IOException;
 
  23 import java.net.DatagramSocket;
 
  24 import java.net.ServerSocket;
 
  25 import java.nio.charset.Charset;
 
  26 import java.nio.charset.StandardCharsets;
 
  27 import java.nio.file.Files;
 
  28 import java.nio.file.Path;
 
  29 import java.util.List;
 
  30 import org.apache.lucene.util.Version;
 
  31 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 
  32 import org.elasticsearch.client.Client;
 
  33 import org.elasticsearch.common.settings.Settings;
 
  34 import org.elasticsearch.common.unit.TimeValue;
 
  35 import org.elasticsearch.node.Node;
 
  36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.internalTypes.Resources;
 
  37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
 
  38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
 
  39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
 
  40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.util.ClusterNodeInfo;
 
  41 import org.slf4j.Logger;
 
  42 import org.slf4j.LoggerFactory;
 
  44 public class HtDatabaseNode implements AutoCloseable {
 
  46     private static final Logger LOGGER = LoggerFactory.getLogger(HtDatabaseNode.class);
 
  47     private static final String DBCONFIGFILENAME = "etc/elasticsearch.yml";
 
  48     private static final String RESFOLDER_PLUGIN = "elasticsearch/plugins";
 
  49     private static final String RESFOLDER_PLUGINHEAD = RESFOLDER_PLUGIN + "/head";
 
  50     private static final String RESFOLDER_PLUGINDELETE = RESFOLDER_PLUGIN + "/delete-by-query";
 
  51     private static int MIN_PORT_NUMBER = 1024;
 
  52     private static int MAX_PORT_NUMBER = 65535;
 
  53     private static int ES_PORT = 9200;
 
  54     private static int DELAYSECONDS = 120;
 
  55     private static String PLUGINFOLDER = "etc/elasticsearch-plugins";
 
  57     private static HtDatabaseNode oneNode = null;
 
  58     private static Object initializationLock = new Object();
 
  59     private static Integer initializedTarget = 0;
 
  60     private static Integer initializedReached = 0;
 
  62     private final Node node;
 
  64     private HtDatabaseNode() {
 
  65         LOGGER.debug("Start elasticsearch service");
 
  67         LOGGER.debug("Lucine version: " + Version.LATEST);
 
  69         node = nodeBuilder().settings(Settings.builder().put("path.home", "etc").put("path.conf", "etc")).node();
 
  70         LOGGER.info("Starting Database service. Wait {} s", DELAYSECONDS);
 
  71         // Wait for orange status for single node without redundancy
 
  72         ClusterHealthResponse nodeStatus = node.client().admin().cluster().prepareHealth().setWaitForYellowStatus()
 
  73                 .setTimeout(TimeValue.timeValueSeconds(DELAYSECONDS)).get();
 
  75         LOGGER.debug("Elasticsearch service started with status {}", nodeStatus.toString());
 
  84         oneNode = null; // Release the one instance that was started !
 
  88      * Provide indication if all Index initializations are done.
 
  90      * @return true if all index initializations are ready, false if not
 
  92     public Boolean getInitialized() {
 
  93         synchronized (initializationLock) {
 
  94             return initializedTarget != 0 && initializedReached == initializedTarget;
 
  98     public void setInitializedReached() {
 
  99         synchronized (initializationLock) {
 
 100             HtDatabaseNode.initializedReached++;
 
 104     public void setInitializedTarget() {
 
 105         synchronized (initializationLock) {
 
 106             HtDatabaseNode.initializedTarget++;
 
 110     public Client getClient() {
 
 111         return node.client();
 
 116      * --------------------------------------- Static functions below
 
 120     // Visibility package for test purpose
 
 121     static void checkorcreateplugins(String pluginFolder) {
 
 122         File f = new File(pluginFolder);
 
 126         if (!Resources.copyFolderInto(RESFOLDER_PLUGINHEAD, PLUGINFOLDER, RESFOLDER_PLUGIN)) {
 
 127             throw new IllegalArgumentException("Copy not successfull Name: " + RESFOLDER_PLUGINHEAD + " folder src "
 
 128                     + PLUGINFOLDER + " folder dst " + RESFOLDER_PLUGIN);
 
 130        //Normal JAR loaded by classloader as part of the bundle
 
 131        if (!Resources.copyFolderInto(RESFOLDER_PLUGINDELETE, PLUGINFOLDER, RESFOLDER_PLUGIN)) {
 
 132             throw new IllegalArgumentException("Copy not successfull Name: " + RESFOLDER_PLUGINDELETE + " folder src "
 
 133                     + PLUGINFOLDER + " folder dst " + RESFOLDER_PLUGIN);
 
 138      * Checks to see if a specific port is available.
 
 140      * @param port the port to check for availability
 
 142     private static boolean isPortAvailable(int port) {
 
 143         if (port < MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) {
 
 144             throw new IllegalArgumentException("Invalid start port: " + port);
 
 147         ServerSocket ss = null;
 
 148         DatagramSocket ds = null;
 
 150             ss = new ServerSocket(port);
 
 151             ss.setReuseAddress(true);
 
 152             ds = new DatagramSocket(port);
 
 153             ds.setReuseAddress(true);
 
 155         } catch (IOException e) {
 
 164                 } catch (IOException e) {
 
 165                     /* should not be thrown */
 
 173     private static void checkorcreateConfigFile(EsConfig config, AkkaConfig akkaConfig, GeoConfig geoConfig) {
 
 174         File f = new File(DBCONFIGFILENAME);
 
 176             LOGGER.debug("no " + DBCONFIGFILENAME + " found - extracting from resources");
 
 177             if (Resources.extractFileTo("elasticsearch/elasticsearch.yml", f)) {
 
 178                 // replace template values
 
 179                 LOGGER.debug("replace template values with config:" + config);
 
 180                 Charset charset = StandardCharsets.UTF_8;
 
 183                     String hostName = "0.0.0.0"; // Default as initialisation value
 
 184                     if (akkaConfig != null && akkaConfig.isCluster()) {
 
 185                         LOGGER.debug("cluster configuration found");
 
 186                         hostName = akkaConfig.getClusterConfig().getHostName(hostName);
 
 187                         String clusterDBName = akkaConfig.getClusterConfig().getDBClusterName(null);
 
 188                         String nodeName = String.format("node%d.%s", akkaConfig.getClusterConfig().getRoleMemberIndex(),
 
 190                         if (clusterDBName != null) {
 
 191                             config.setCluster(clusterDBName);
 
 192                             config.setNode(nodeName);
 
 194                             LOGGER.info("set db name to " + clusterDBName + " nodename=" + nodeName);
 
 196                             LOGGER.warn("unable to set correct db clustername");
 
 199                     String content = new String(Files.readAllBytes(p), charset);
 
 200                     content = content.replaceAll("\\$clustername", config.getCluster())
 
 201                             .replaceAll("\\$nodename", config.getNode()).replaceAll("\\$hostname", hostName);
 
 203                     // add cluster configuration
 
 204                     if (akkaConfig != null && akkaConfig.isCluster()) {
 
 205                         List<ClusterNodeInfo> seedNodes = akkaConfig.getClusterConfig().getSeedNodes();
 
 206                         String nodesJSONString = "[\"" + seedNodes.get(0).getRemoteAddress() + "\"";
 
 207                         for (int i = 1; i < seedNodes.size(); i++) {
 
 208                             nodesJSONString += ",\"" + seedNodes.get(i).getRemoteAddress() + "\"";
 
 210                         nodesJSONString += "]";
 
 211                         content += System.lineSeparator()
 
 212                                 + String.format("discovery.zen.ping.unicast.hosts: %s", nodesJSONString);
 
 214                         if (geoConfig != null) {
 
 215                             LOGGER.debug("adding zone configuration");
 
 216                             content += System.lineSeparator() + String
 
 217                                     .format("cluster.routing.allocation.awareness.force.zone.values: zone1,zone2");
 
 218                             content += System.lineSeparator()
 
 219                                     + String.format("cluster.routing.allocation.awareness.attributes: zone");
 
 220                             if (geoConfig.isPrimary(akkaConfig.getClusterConfig().getRoleMember())) {
 
 221                                 content += System.lineSeparator() + String.format("node.zone: zone1");
 
 222                                 LOGGER.debug("setting zone to zone1");
 
 224                                 content += System.lineSeparator() + String.format("node.zone: zone2");
 
 225                                 LOGGER.debug("setting zone to zone2");
 
 229                     Files.write(p, content.getBytes(charset));
 
 230                 } catch (IOException e) {
 
 231                     LOGGER.warn("problem replacing values in file: " + e.getMessage());
 
 235                 LOGGER.warn("problem writing database.yml to etc folder from res");
 
 244      * @param akkaConfig data
 
 245      * @param geoConfig data
 
 246      * @return the node or null if external node used
 
 248     public static HtDatabaseNode start(EsConfig config, AkkaConfig akkaConfig, GeoConfig geoConfig) {
 
 249         if (isPortAvailable(ES_PORT)) {
 
 250             LOGGER.info("ES Port not in use. Start internal ES.");
 
 251             if (oneNode == null) {
 
 252                 checkorcreateplugins(PLUGINFOLDER);
 
 253                 checkorcreateConfigFile(config, akkaConfig, geoConfig);
 
 254                 oneNode = new HtDatabaseNode();
 
 256                 throw new IllegalStateException(
 
 257                         "Database is already started, but can only be started once. Stop here.");
 
 260             LOGGER.info("ES Port in use. External ES used.");