1 package org.opendaylight.mwtn.base.database;
3 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
6 import java.io.FileInputStream;
7 import java.io.FileOutputStream;
8 import java.io.IOException;
9 import java.net.DatagramSocket;
10 import java.net.ServerSocket;
11 import java.nio.charset.Charset;
12 import java.nio.charset.StandardCharsets;
13 import java.nio.file.Files;
14 import java.nio.file.Path;
15 import java.util.List;
16 import java.util.zip.ZipEntry;
17 import java.util.zip.ZipInputStream;
19 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
20 import org.elasticsearch.client.Client;
21 import org.elasticsearch.common.settings.Settings;
22 import org.elasticsearch.common.unit.TimeValue;
23 import org.elasticsearch.node.Node;
24 import org.opendaylight.mwtn.base.internalTypes.Resources;
25 import org.opendaylight.mwtn.config.impl.AkkaConfig;
26 import org.opendaylight.mwtn.config.impl.AkkaConfig.ClusterNodeInfo;
27 import org.opendaylight.mwtn.config.impl.EsConfig;
28 import org.opendaylight.mwtn.config.impl.GeoConfig;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 public class HtDatabaseNode implements AutoCloseable {
36 private static final Logger LOGGER = LoggerFactory.getLogger(HtDatabaseNode.class);
37 private static final String DBCONFIGFILENAME = "etc/elasticsearch.yml";
38 private static int MIN_PORT_NUMBER = 1024;
39 private static int MAX_PORT_NUMBER = 65535;
40 private static int ES_PORT = 9200;
41 private static int DELAYSECONDS = 30;
43 private static String pluginFolder="etc/elasticsearch-plugins";
45 private static HtDatabaseNode oneNode = null;
49 private static Integer initializedTarget = 0;
50 private static Integer initializedReached = 0;
52 private final Node node;
54 private HtDatabaseNode() {
55 LOGGER.debug("Start elasticsearch service");
56 node = nodeBuilder().settings(Settings.builder().put("path.home", "etc").put("path.conf", "etc")).node();
57 LOGGER.info("Starting Database service. Wait {} s", DELAYSECONDS);
58 // Wait for green status but only wait for 2 seconds
59 ClusterHealthResponse nodeStatus = node.client().admin().cluster().prepareHealth().setWaitForYellowStatus()
60 .setTimeout(TimeValue.timeValueSeconds(DELAYSECONDS)).get();
62 LOGGER.debug("Elasticsearch service started with status {}", nodeStatus.toString());
71 oneNode = null; //Release the one instance that was started !
74 public Boolean getInitialized() {
75 synchronized (initializedReached) {
76 return initializedTarget != 0 && initializedReached == initializedTarget;
80 public void setInitializedReached() {
81 synchronized (initializedReached) {
82 HtDatabaseNode.initializedReached++;
86 public void setInitializedTarget() {
87 synchronized (initializedTarget) {
88 HtDatabaseNode.initializedTarget++;
92 public Client getClient() {
97 /* ---------------------------------------
98 * Static functions below
102 private static void extractZip(String zipFile, String outputFolder) {
104 byte[] buffer = new byte[1024];
108 // create output directory is not exists
109 File folder = new File(outputFolder);
110 if (!folder.exists()) {
114 // get the zip file content
115 ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFile));
116 // get the zipped file list entry
117 ZipEntry ze = zis.getNextEntry();
121 String fileName = ze.getName();
123 File newFile = new File(outputFolder + File.separator + fileName);
124 System.out.println("file unzip : " + newFile.getAbsoluteFile());
132 // create all non exists folders
133 // else you will hit FileNotFoundException for compressed folder
134 new File(newFile.getParent()).mkdirs();
136 FileOutputStream fos = new FileOutputStream(newFile);
139 while ((len = zis.read(buffer)) > 0) {
140 fos.write(buffer, 0, len);
145 ze = zis.getNextEntry();
150 } catch (IOException ex) {
151 LOGGER.warn("problem extracting " + zipFile + " to " + outputFolder);
155 // Visibility package for test purpose
156 static void checkorcreateplugins(String pluginFolder) {
157 File f = new File(pluginFolder);
158 String tmpFilename = pluginFolder + "/tmp.zip";
162 f = new File(pluginFolder + "/head");
163 File tmpFile = new File(tmpFilename);
165 LOGGER.debug("extracting head plugin");
166 if (Resources.extractFileTo("/elasticsearch/plugins/head.zip", tmpFile)) {
167 extractZip(tmpFile.getAbsolutePath(), pluginFolder);
169 LOGGER.debug("problem extracting plugin res");
171 f = new File(pluginFolder + "/delete-by-query");
173 LOGGER.debug("extracting head delete-by-query plugin");
174 if (Resources.extractFileTo("/elasticsearch/plugins/delete-by-query.zip", tmpFile)) {
175 extractZip(tmpFile.getAbsolutePath(), pluginFolder);
177 LOGGER.debug("problem extracting plugin res");
185 * Checks to see if a specific port is available.
188 * the port to check for availability
190 private static boolean isPortAvailable(int port) {
191 if (port < MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) {
192 throw new IllegalArgumentException("Invalid start port: " + port);
195 ServerSocket ss = null;
196 DatagramSocket ds = null;
198 ss = new ServerSocket(port);
199 ss.setReuseAddress(true);
200 ds = new DatagramSocket(port);
201 ds.setReuseAddress(true);
203 } catch (IOException e) {
212 } catch (IOException e) {
213 /* should not be thrown */
221 private static void checkorcreateConfigFile(EsConfig config, AkkaConfig akkaConfig,GeoConfig geoConfig) {
222 File f = new File(DBCONFIGFILENAME);
224 LOGGER.debug("no " + DBCONFIGFILENAME + " found - extracting from resources");
225 if (Resources.extractFileTo("/elasticsearch/elasticsearch.yml", f)) {
226 // replace template values
227 LOGGER.debug("replace template values with config:" + config);
228 Charset charset = StandardCharsets.UTF_8;
231 String hostName = "0.0.0.0"; //Default as initialisation value
232 if(akkaConfig!=null && akkaConfig.isCluster())
234 LOGGER.debug("cluster configuration found");
235 hostName=akkaConfig.getClusterConfig().getHostName(hostName);
236 String clusterDBName=akkaConfig.getClusterConfig().getDBClusterName(null);
237 String nodeName=String.format("node%d.%s",akkaConfig.getClusterConfig().getRoleMemberIndex(),clusterDBName);
238 if(clusterDBName!=null)
240 config.setCluster(clusterDBName);
241 config.setNode(nodeName);
242 LOGGER.info("set db name to "+clusterDBName+" nodename="+nodeName );
245 LOGGER.warn("unable to set correct db clustername");
247 String content = new String(Files.readAllBytes(p), charset);
248 content = content.replaceAll("\\$clustername", config.getCluster()).replaceAll("\\$nodename",
249 config.getNode()).replaceAll("\\$hostname", hostName);
251 //add cluster configuration
252 if(akkaConfig!=null && akkaConfig.isCluster())
254 List<ClusterNodeInfo> seedNodes=akkaConfig.getClusterConfig().getSeedNodes();
255 String nodesJSONString="[\""+seedNodes.get(0).getRemoteAddress()+"\"";
256 for(int i=1;i<seedNodes.size();i++)
257 nodesJSONString+=",\""+seedNodes.get(i).getRemoteAddress()+"\"";
258 nodesJSONString+="]";
259 content+=System.lineSeparator()+String.format("discovery.zen.ping.unicast.hosts: %s",nodesJSONString);
263 LOGGER.debug("adding zone configuration");
264 content+=System.lineSeparator()+String.format("cluster.routing.allocation.awareness.force.zone.values: zone1,zone2");
265 content+=System.lineSeparator()+String.format("cluster.routing.allocation.awareness.attributes: zone");
266 if(geoConfig.isPrimary(akkaConfig.getClusterConfig().getRoleMember()))
268 content+=System.lineSeparator()+String.format("node.zone: zone1");
269 LOGGER.debug("setting zone to zone1");
273 content+=System.lineSeparator()+String.format("node.zone: zone2");
274 LOGGER.debug("setting zone to zone2");
278 Files.write(p, content.getBytes(charset));
279 } catch (IOException e) {
280 LOGGER.warn("problem replacing values in file: " + e.getMessage());
284 LOGGER.warn("problem writing database.yml to etc folder from res");
293 public static HtDatabaseNode start(EsConfig config) throws IllegalStateException {
294 return start(config,null,null);
297 public static HtDatabaseNode start(EsConfig config, AkkaConfig akkaConfig,GeoConfig geoConfig) {
298 if (isPortAvailable(ES_PORT)) {
299 LOGGER.info("ES Port not in use. Start internal ES.");
300 if (oneNode == null) {
301 checkorcreateplugins(pluginFolder);
302 checkorcreateConfigFile(config,akkaConfig,geoConfig);
303 oneNode = new HtDatabaseNode();
305 throw new IllegalStateException("Database is already started, but can only be started once. Stop here.");
307 LOGGER.info("ES Port in use. External ES used.");