93a5bbd408a3dfa5f7e6952f6af7cb7b1180b94b
[ccsdk/features.git] /
1 /*******************************************************************************
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  ******************************************************************************/
18 package org.onap.ccsdk.features.sdnr.wt.devicemanager.base.database;
19
20 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
21 import java.io.File;
22 import java.io.IOException;
23 import java.net.DatagramSocket;
24 import java.net.ServerSocket;
25 import java.nio.charset.Charset;
26 import java.nio.charset.StandardCharsets;
27 import java.nio.file.Files;
28 import java.nio.file.Path;
29 import java.util.List;
30 import org.apache.lucene.util.Version;
31 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
32 import org.elasticsearch.client.Client;
33 import org.elasticsearch.common.settings.Settings;
34 import org.elasticsearch.common.unit.TimeValue;
35 import org.elasticsearch.node.Node;
36 import org.onap.ccsdk.features.sdnr.wt.devicemanager.base.internalTypes.Resources;
37 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.AkkaConfig;
38 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.EsConfig;
39 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.impl.GeoConfig;
40 import org.onap.ccsdk.features.sdnr.wt.devicemanager.config.util.ClusterNodeInfo;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 public class HtDatabaseNode implements AutoCloseable {
45
46     private static final Logger LOGGER = LoggerFactory.getLogger(HtDatabaseNode.class);
47     private static final String DBCONFIGFILENAME = "etc/elasticsearch.yml";
48     private static final String RESFOLDER_PLUGIN = "elasticsearch/plugins";
49     private static final String RESFOLDER_PLUGINHEAD = RESFOLDER_PLUGIN + "/head";
50     private static final String RESFOLDER_PLUGINDELETE = RESFOLDER_PLUGIN + "/delete-by-query";
51     private static int MIN_PORT_NUMBER = 1024;
52     private static int MAX_PORT_NUMBER = 65535;
53     private static int ES_PORT = 9200;
54     private static int DELAYSECONDS = 120;
55     private static String PLUGINFOLDER = "etc/elasticsearch-plugins";
56
57     private static HtDatabaseNode oneNode = null;
58     private static Object initializationLock = new Object();
59     private static Integer initializedTarget = 0;
60     private static Integer initializedReached = 0;
61
62     private final Node node;
63
64     private HtDatabaseNode() {
65         LOGGER.debug("Start elasticsearch service");
66
67         LOGGER.debug("Lucine version: " + Version.LATEST);
68
69         node = nodeBuilder().settings(Settings.builder().put("path.home", "etc").put("path.conf", "etc")).node();
70         LOGGER.info("Starting Database service. Wait {} s", DELAYSECONDS);
71         // Wait for orange status for single node without redundancy
72         ClusterHealthResponse nodeStatus = node.client().admin().cluster().prepareHealth().setWaitForYellowStatus()
73                 .setTimeout(TimeValue.timeValueSeconds(DELAYSECONDS)).get();
74
75         LOGGER.debug("Elasticsearch service started with status {}", nodeStatus.toString());
76     }
77
78     /**
79      * Close node
80      */
81     @Override
82     public void close() {
83         node.close();
84         oneNode = null; // Release the one instance that was started !
85     }
86
87     /**
88      * Provide indication if all Index initializations are done.
89      *
90      * @return true if all index initializations are ready, false if not
91      */
92     public Boolean getInitialized() {
93         synchronized (initializationLock) {
94             return initializedTarget != 0 && initializedReached == initializedTarget;
95         }
96     }
97
98     public void setInitializedReached() {
99         synchronized (initializationLock) {
100             HtDatabaseNode.initializedReached++;
101         }
102     }
103
104     public void setInitializedTarget() {
105         synchronized (initializationLock) {
106             HtDatabaseNode.initializedTarget++;
107         }
108     }
109
110     public Client getClient() {
111         return node.client();
112     }
113
114
115     /*
116      * --------------------------------------- Static functions below
117      */
118
119
120     // Visibility package for test purpose
121     static void checkorcreateplugins(String pluginFolder) {
122         File f = new File(pluginFolder);
123         if (!f.exists()) {
124             f.mkdir();
125         }
126         if (!Resources.copyFolderInto(RESFOLDER_PLUGINHEAD, PLUGINFOLDER, RESFOLDER_PLUGIN)) {
127             throw new IllegalArgumentException("Copy not successfull Name: " + RESFOLDER_PLUGINHEAD + " folder src "
128                     + PLUGINFOLDER + " folder dst " + RESFOLDER_PLUGIN);
129         }
130        //Normal JAR loaded by classloader as part of the bundle
131        if (!Resources.copyFolderInto(RESFOLDER_PLUGINDELETE, PLUGINFOLDER, RESFOLDER_PLUGIN)) {
132             throw new IllegalArgumentException("Copy not successfull Name: " + RESFOLDER_PLUGINDELETE + " folder src "
133                     + PLUGINFOLDER + " folder dst " + RESFOLDER_PLUGIN);
134        }
135     }
136
137     /**
138      * Checks to see if a specific port is available.
139      *
140      * @param port the port to check for availability
141      */
142     private static boolean isPortAvailable(int port) {
143         if (port < MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) {
144             throw new IllegalArgumentException("Invalid start port: " + port);
145         }
146
147         ServerSocket ss = null;
148         DatagramSocket ds = null;
149         try {
150             ss = new ServerSocket(port);
151             ss.setReuseAddress(true);
152             ds = new DatagramSocket(port);
153             ds.setReuseAddress(true);
154             return true;
155         } catch (IOException e) {
156         } finally {
157             if (ds != null) {
158                 ds.close();
159             }
160
161             if (ss != null) {
162                 try {
163                     ss.close();
164                 } catch (IOException e) {
165                     /* should not be thrown */
166                 }
167             }
168         }
169
170         return false;
171     }
172
173     private static void checkorcreateConfigFile(EsConfig config, AkkaConfig akkaConfig, GeoConfig geoConfig) {
174         File f = new File(DBCONFIGFILENAME);
175         if (!f.exists()) {
176             LOGGER.debug("no " + DBCONFIGFILENAME + " found - extracting from resources");
177             if (Resources.extractFileTo("elasticsearch/elasticsearch.yml", f)) {
178                 // replace template values
179                 LOGGER.debug("replace template values with config:" + config);
180                 Charset charset = StandardCharsets.UTF_8;
181                 try {
182                     Path p = f.toPath();
183                     String hostName = "0.0.0.0"; // Default as initialisation value
184                     if (akkaConfig != null && akkaConfig.isCluster()) {
185                         LOGGER.debug("cluster configuration found");
186                         hostName = akkaConfig.getClusterConfig().getHostName(hostName);
187                         String clusterDBName = akkaConfig.getClusterConfig().getDBClusterName(null);
188                         String nodeName = String.format("node%d.%s", akkaConfig.getClusterConfig().getRoleMemberIndex(),
189                                 clusterDBName);
190                         if (clusterDBName != null) {
191                             config.setCluster(clusterDBName);
192                             config.setNode(nodeName);
193                             config.save();
194                             LOGGER.info("set db name to " + clusterDBName + " nodename=" + nodeName);
195                         } else {
196                             LOGGER.warn("unable to set correct db clustername");
197                         }
198                     }
199                     String content = new String(Files.readAllBytes(p), charset);
200                     content = content.replaceAll("\\$clustername", config.getCluster())
201                             .replaceAll("\\$nodename", config.getNode()).replaceAll("\\$hostname", hostName);
202
203                     // add cluster configuration
204                     if (akkaConfig != null && akkaConfig.isCluster()) {
205                         List<ClusterNodeInfo> seedNodes = akkaConfig.getClusterConfig().getSeedNodes();
206                         String nodesJSONString = "[\"" + seedNodes.get(0).getRemoteAddress() + "\"";
207                         for (int i = 1; i < seedNodes.size(); i++) {
208                             nodesJSONString += ",\"" + seedNodes.get(i).getRemoteAddress() + "\"";
209                         }
210                         nodesJSONString += "]";
211                         content += System.lineSeparator()
212                                 + String.format("discovery.zen.ping.unicast.hosts: %s", nodesJSONString);
213
214                         if (geoConfig != null) {
215                             LOGGER.debug("adding zone configuration");
216                             content += System.lineSeparator() + String
217                                     .format("cluster.routing.allocation.awareness.force.zone.values: zone1,zone2");
218                             content += System.lineSeparator()
219                                     + String.format("cluster.routing.allocation.awareness.attributes: zone");
220                             if (geoConfig.isPrimary(akkaConfig.getClusterConfig().getRoleMember())) {
221                                 content += System.lineSeparator() + String.format("node.zone: zone1");
222                                 LOGGER.debug("setting zone to zone1");
223                             } else {
224                                 content += System.lineSeparator() + String.format("node.zone: zone2");
225                                 LOGGER.debug("setting zone to zone2");
226                             }
227                         }
228                     }
229                     Files.write(p, content.getBytes(charset));
230                 } catch (IOException e) {
231                     LOGGER.warn("problem replacing values in file: " + e.getMessage());
232
233                 }
234             } else {
235                 LOGGER.warn("problem writing database.yml to etc folder from res");
236             }
237         }
238     }
239
240     /**
241      * Start as singleton
242      *
243      * @param config data
244      * @param akkaConfig data
245      * @param geoConfig data
246      * @return the node or null if external node used
247      */
248     public static HtDatabaseNode start(EsConfig config, AkkaConfig akkaConfig, GeoConfig geoConfig) {
249         if (isPortAvailable(ES_PORT)) {
250             LOGGER.info("ES Port not in use. Start internal ES.");
251             if (oneNode == null) {
252                 checkorcreateplugins(PLUGINFOLDER);
253                 checkorcreateConfigFile(config, akkaConfig, geoConfig);
254                 oneNode = new HtDatabaseNode();
255             } else {
256                 throw new IllegalStateException(
257                         "Database is already started, but can only be started once. Stop here.");
258             }
259         } else {
260             LOGGER.info("ES Port in use. External ES used.");
261         }
262
263         return oneNode;
264     }
265
266 }