1 /*******************************************************************************
2 * ============LICENSE_START========================================================================
3 * ONAP : ccsdk feature sdnr wt
4 * =================================================================================================
5 * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6 * =================================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8 * in compliance with the License. You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software distributed under the License
13 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14 * or implied. See the License for the specific language governing permissions and limitations under
16 * ============LICENSE_END==========================================================================
17 ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database;
20 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
22 import java.io.IOException;
23 import java.net.DatagramSocket;
24 import java.net.ServerSocket;
25 import java.nio.charset.Charset;
26 import java.nio.charset.StandardCharsets;
27 import java.nio.file.Files;
28 import java.nio.file.Path;
29 import java.util.List;
30 import org.apache.lucene.util.Version;
31 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
32 import org.elasticsearch.client.Client;
33 import org.elasticsearch.common.settings.Settings;
34 import org.elasticsearch.common.unit.TimeValue;
35 import org.elasticsearch.node.Node;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.internalTypes.Resources;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.util.ClusterNodeInfo;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
44 public class HtDatabaseNode implements AutoCloseable {
46 private static final Logger LOGGER = LoggerFactory.getLogger(HtDatabaseNode.class);
47 private static final String DBCONFIGFILENAME = "etc/elasticsearch.yml";
48 private static final String RESFOLDER_PLUGIN = "elasticsearch/plugins";
49 private static final String RESFOLDER_PLUGINHEAD = RESFOLDER_PLUGIN + "/head";
50 private static final String RESFOLDER_PLUGINDELETE = RESFOLDER_PLUGIN + "/delete-by-query";
51 private static int MIN_PORT_NUMBER = 1024;
52 private static int MAX_PORT_NUMBER = 65535;
53 private static int ES_PORT = 9200;
54 private static int DELAYSECONDS = 120;
55 private static String PLUGINFOLDER = "etc/elasticsearch-plugins";
57 private static HtDatabaseNode oneNode = null;
58 private static Object initializationLock = new Object();
59 private static Integer initializedTarget = 0;
60 private static Integer initializedReached = 0;
62 private final Node node;
64 private HtDatabaseNode() {
65 LOGGER.debug("Start elasticsearch service");
67 LOGGER.debug("Lucine version: " + Version.LATEST);
69 node = nodeBuilder().settings(Settings.builder().put("path.home", "etc").put("path.conf", "etc")).node();
70 LOGGER.info("Starting Database service. Wait {} s", DELAYSECONDS);
71 // Wait for orange status for single node without redundancy
72 ClusterHealthResponse nodeStatus = node.client().admin().cluster().prepareHealth().setWaitForYellowStatus()
73 .setTimeout(TimeValue.timeValueSeconds(DELAYSECONDS)).get();
75 LOGGER.debug("Elasticsearch service started with status {}", nodeStatus.toString());
84 oneNode = null; // Release the one instance that was started !
88 * Provide indication if all Index initializations are done.
90 * @return true if all index initializations are ready, false if not
92 public Boolean getInitialized() {
93 synchronized (initializationLock) {
94 return initializedTarget != 0 && initializedReached == initializedTarget;
98 public void setInitializedReached() {
99 synchronized (initializationLock) {
100 HtDatabaseNode.initializedReached++;
104 public void setInitializedTarget() {
105 synchronized (initializationLock) {
106 HtDatabaseNode.initializedTarget++;
110 public Client getClient() {
111 return node.client();
116 * --------------------------------------- Static functions below
120 // Visibility package for test purpose
121 static void checkorcreateplugins(String pluginFolder) {
122 File f = new File(pluginFolder);
126 if (!Resources.copyFolderInto(RESFOLDER_PLUGINHEAD, PLUGINFOLDER, RESFOLDER_PLUGIN)) {
127 throw new IllegalArgumentException("Copy not successfull Name: " + RESFOLDER_PLUGINHEAD + " folder src "
128 + PLUGINFOLDER + " folder dst " + RESFOLDER_PLUGIN);
130 //Normal JAR loaded by classloader as part of the bundle
131 if (!Resources.copyFolderInto(RESFOLDER_PLUGINDELETE, PLUGINFOLDER, RESFOLDER_PLUGIN)) {
132 throw new IllegalArgumentException("Copy not successfull Name: " + RESFOLDER_PLUGINDELETE + " folder src "
133 + PLUGINFOLDER + " folder dst " + RESFOLDER_PLUGIN);
138 * Checks to see if a specific port is available.
140 * @param port the port to check for availability
142 private static boolean isPortAvailable(int port) {
143 if (port < MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) {
144 throw new IllegalArgumentException("Invalid start port: " + port);
147 ServerSocket ss = null;
148 DatagramSocket ds = null;
150 ss = new ServerSocket(port);
151 ss.setReuseAddress(true);
152 ds = new DatagramSocket(port);
153 ds.setReuseAddress(true);
155 } catch (IOException e) {
164 } catch (IOException e) {
165 /* should not be thrown */
173 private static void checkorcreateConfigFile(EsConfig config, AkkaConfig akkaConfig, GeoConfig geoConfig) {
174 File f = new File(DBCONFIGFILENAME);
176 LOGGER.debug("no " + DBCONFIGFILENAME + " found - extracting from resources");
177 if (Resources.extractFileTo("elasticsearch/elasticsearch.yml", f)) {
178 // replace template values
179 LOGGER.debug("replace template values with config:" + config);
180 Charset charset = StandardCharsets.UTF_8;
183 String hostName = "0.0.0.0"; // Default as initialisation value
184 if (akkaConfig != null && akkaConfig.isCluster()) {
185 LOGGER.debug("cluster configuration found");
186 hostName = akkaConfig.getClusterConfig().getHostName(hostName);
187 String clusterDBName = akkaConfig.getClusterConfig().getDBClusterName(null);
188 String nodeName = String.format("node%d.%s", akkaConfig.getClusterConfig().getRoleMemberIndex(),
190 if (clusterDBName != null) {
191 config.setCluster(clusterDBName);
192 config.setNode(nodeName);
194 LOGGER.info("set db name to " + clusterDBName + " nodename=" + nodeName);
196 LOGGER.warn("unable to set correct db clustername");
199 String content = new String(Files.readAllBytes(p), charset);
200 content = content.replaceAll("\\$clustername", config.getCluster())
201 .replaceAll("\\$nodename", config.getNode()).replaceAll("\\$hostname", hostName);
203 // add cluster configuration
204 if (akkaConfig != null && akkaConfig.isCluster()) {
205 List<ClusterNodeInfo> seedNodes = akkaConfig.getClusterConfig().getSeedNodes();
206 String nodesJSONString = "[\"" + seedNodes.get(0).getRemoteAddress() + "\"";
207 for (int i = 1; i < seedNodes.size(); i++) {
208 nodesJSONString += ",\"" + seedNodes.get(i).getRemoteAddress() + "\"";
210 nodesJSONString += "]";
211 content += System.lineSeparator()
212 + String.format("discovery.zen.ping.unicast.hosts: %s", nodesJSONString);
214 if (geoConfig != null) {
215 LOGGER.debug("adding zone configuration");
216 content += System.lineSeparator() + String
217 .format("cluster.routing.allocation.awareness.force.zone.values: zone1,zone2");
218 content += System.lineSeparator()
219 + String.format("cluster.routing.allocation.awareness.attributes: zone");
220 if (geoConfig.isPrimary(akkaConfig.getClusterConfig().getRoleMember())) {
221 content += System.lineSeparator() + String.format("node.zone: zone1");
222 LOGGER.debug("setting zone to zone1");
224 content += System.lineSeparator() + String.format("node.zone: zone2");
225 LOGGER.debug("setting zone to zone2");
229 Files.write(p, content.getBytes(charset));
230 } catch (IOException e) {
231 LOGGER.warn("problem replacing values in file: " + e.getMessage());
235 LOGGER.warn("problem writing database.yml to etc folder from res");
244 * @param akkaConfig data
245 * @param geoConfig data
246 * @return the node or null if external node used
248 public static HtDatabaseNode start(EsConfig config, AkkaConfig akkaConfig, GeoConfig geoConfig) {
249 if (isPortAvailable(ES_PORT)) {
250 LOGGER.info("ES Port not in use. Start internal ES.");
251 if (oneNode == null) {
252 checkorcreateplugins(PLUGINFOLDER);
253 checkorcreateConfigFile(config, akkaConfig, geoConfig);
254 oneNode = new HtDatabaseNode();
256 throw new IllegalStateException(
257 "Database is already started, but can only be started once. Stop here.");
260 LOGGER.info("ES Port in use. External ES used.");