Assign image keyname and pubkey at vnf level
[ccsdk/apps.git] / sdnr / wireless-transport / code-Carbon-SR1 / apps / devicemanager / impl / src / main / java / org / opendaylight / mwtn / base / database / HtDatabaseNode.java
1 package org.opendaylight.mwtn.base.database;
2
3 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
4
5 import java.io.File;
6 import java.io.FileInputStream;
7 import java.io.FileOutputStream;
8 import java.io.IOException;
9 import java.net.DatagramSocket;
10 import java.net.ServerSocket;
11 import java.nio.charset.Charset;
12 import java.nio.charset.StandardCharsets;
13 import java.nio.file.Files;
14 import java.nio.file.Path;
15 import java.util.List;
16 import java.util.zip.ZipEntry;
17 import java.util.zip.ZipInputStream;
18
19 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
20 import org.elasticsearch.client.Client;
21 import org.elasticsearch.common.settings.Settings;
22 import org.elasticsearch.common.unit.TimeValue;
23 import org.elasticsearch.node.Node;
24 import org.opendaylight.mwtn.base.internalTypes.Resources;
25 import org.opendaylight.mwtn.config.impl.AkkaConfig;
26 import org.opendaylight.mwtn.config.impl.AkkaConfig.ClusterNodeInfo;
27 import org.opendaylight.mwtn.config.impl.EsConfig;
28 import org.opendaylight.mwtn.config.impl.GeoConfig;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 public class HtDatabaseNode implements AutoCloseable {
33
34
35
36         private static final Logger LOGGER = LoggerFactory.getLogger(HtDatabaseNode.class);
37         private static final String DBCONFIGFILENAME = "etc/elasticsearch.yml";
38         private static int MIN_PORT_NUMBER = 1024;
39         private static int MAX_PORT_NUMBER = 65535;
40         private static int ES_PORT = 9200;
41         private static int DELAYSECONDS = 30;
42
43         private static String pluginFolder="etc/elasticsearch-plugins";
44
45         private static HtDatabaseNode oneNode = null;
46         /**
47          *
48          */
49         private static Integer initializedTarget = 0;
50         private static Integer initializedReached = 0;
51
52         private final Node node;
53
54         private HtDatabaseNode() {
55                 LOGGER.debug("Start elasticsearch service");
56                 node = nodeBuilder().settings(Settings.builder().put("path.home", "etc").put("path.conf", "etc")).node();
57                 LOGGER.info("Starting Database service. Wait {} s", DELAYSECONDS);
58                 // Wait for green status but only wait for 2 seconds
59                 ClusterHealthResponse nodeStatus = node.client().admin().cluster().prepareHealth().setWaitForYellowStatus()
60                                 .setTimeout(TimeValue.timeValueSeconds(DELAYSECONDS)).get();
61
62                 LOGGER.debug("Elasticsearch service started with status {}", nodeStatus.toString());
63         }
64
65         /**
66          * Close node
67          */
68         @Override
69         public void close() {
70                 node.close();
71                 oneNode = null; //Release the one instance that was started !
72         }
73
74         public Boolean getInitialized() {
75                 synchronized (initializedReached) {
76                         return initializedTarget != 0 && initializedReached == initializedTarget;
77                 }
78         }
79
80         public void setInitializedReached() {
81                 synchronized (initializedReached) {
82                         HtDatabaseNode.initializedReached++;
83                 }
84         }
85
86         public void setInitializedTarget() {
87                 synchronized (initializedTarget) {
88                         HtDatabaseNode.initializedTarget++;
89                 }
90         }
91
92         public Client getClient() {
93                 return node.client();
94         }
95
96
97         /* ---------------------------------------
98          * Static functions below
99          */
100
101
102         private static void extractZip(String zipFile, String outputFolder) {
103
104                 byte[] buffer = new byte[1024];
105
106                 try {
107
108                         // create output directory is not exists
109                         File folder = new File(outputFolder);
110                         if (!folder.exists()) {
111                                 folder.mkdir();
112                         }
113
114                         // get the zip file content
115                         ZipInputStream zis = new ZipInputStream(new FileInputStream(zipFile));
116                         // get the zipped file list entry
117                         ZipEntry ze = zis.getNextEntry();
118
119                         while (ze != null) {
120
121                                 String fileName = ze.getName();
122
123                                 File newFile = new File(outputFolder + File.separator + fileName);
124                                 System.out.println("file unzip : " + newFile.getAbsoluteFile());
125                                 if(ze.isDirectory())
126                                 {
127                                         newFile.mkdir();
128                                 }
129                                 else
130                                 {
131
132                                         // create all non exists folders
133                                         // else you will hit FileNotFoundException for compressed folder
134                                         new File(newFile.getParent()).mkdirs();
135
136                                         FileOutputStream fos = new FileOutputStream(newFile);
137
138                                         int len;
139                                         while ((len = zis.read(buffer)) > 0) {
140                                                 fos.write(buffer, 0, len);
141                                         }
142
143                                         fos.close();
144                                 }
145                                 ze = zis.getNextEntry();
146                         }
147
148                         zis.closeEntry();
149                         zis.close();
150                 } catch (IOException ex) {
151                         LOGGER.warn("problem extracting " + zipFile + " to " + outputFolder);
152                 }
153         }
154
155         // Visibility package for test purpose
156         static void checkorcreateplugins(String pluginFolder) {
157                 File f = new File(pluginFolder);
158                 String tmpFilename = pluginFolder + "/tmp.zip";
159                 if (!f.exists())
160                         f.mkdir();
161
162                 f = new File(pluginFolder + "/head");
163                 File tmpFile = new File(tmpFilename);
164                 if (!f.exists()) {
165                         LOGGER.debug("extracting head plugin");
166                         if (Resources.extractFileTo("/elasticsearch/plugins/head.zip", tmpFile)) {
167                                 extractZip(tmpFile.getAbsolutePath(), pluginFolder);
168                         } else
169                                 LOGGER.debug("problem extracting plugin res");
170                 }
171                 f = new File(pluginFolder + "/delete-by-query");
172                 if (!f.exists()) {
173                         LOGGER.debug("extracting head delete-by-query plugin");
174                         if (Resources.extractFileTo("/elasticsearch/plugins/delete-by-query.zip", tmpFile)) {
175                                 extractZip(tmpFile.getAbsolutePath(), pluginFolder);
176                         } else
177                                 LOGGER.debug("problem extracting plugin res");
178                 }
179                 if(tmpFile.exists())
180                         tmpFile.delete();
181
182         }
183
184         /**
185          * Checks to see if a specific port is available.
186          *
187          * @param port
188          *            the port to check for availability
189          */
190         private static boolean isPortAvailable(int port) {
191                 if (port < MIN_PORT_NUMBER || port > MAX_PORT_NUMBER) {
192                         throw new IllegalArgumentException("Invalid start port: " + port);
193                 }
194
195                 ServerSocket ss = null;
196                 DatagramSocket ds = null;
197                 try {
198                         ss = new ServerSocket(port);
199                         ss.setReuseAddress(true);
200                         ds = new DatagramSocket(port);
201                         ds.setReuseAddress(true);
202                         return true;
203                 } catch (IOException e) {
204                 } finally {
205                         if (ds != null) {
206                                 ds.close();
207                         }
208
209                         if (ss != null) {
210                                 try {
211                                         ss.close();
212                                 } catch (IOException e) {
213                                         /* should not be thrown */
214                                 }
215                         }
216                 }
217
218                 return false;
219         }
220
221         private static void checkorcreateConfigFile(EsConfig config, AkkaConfig akkaConfig,GeoConfig geoConfig) {
222                 File f = new File(DBCONFIGFILENAME);
223                 if (!f.exists()) {
224                         LOGGER.debug("no " + DBCONFIGFILENAME + " found - extracting from resources");
225                         if (Resources.extractFileTo("/elasticsearch/elasticsearch.yml", f)) {
226                                 // replace template values
227                                 LOGGER.debug("replace template values with config:" + config);
228                                 Charset charset = StandardCharsets.UTF_8;
229                                 try {
230                                         Path p = f.toPath();
231                                         String hostName = "0.0.0.0"; //Default as initialisation value
232                                         if(akkaConfig!=null && akkaConfig.isCluster())
233                                         {
234                                                 LOGGER.debug("cluster configuration found");
235                                                 hostName=akkaConfig.getClusterConfig().getHostName(hostName);
236                                                 String clusterDBName=akkaConfig.getClusterConfig().getDBClusterName(null);
237                                                 String nodeName=String.format("node%d.%s",akkaConfig.getClusterConfig().getRoleMemberIndex(),clusterDBName);
238                                                 if(clusterDBName!=null)
239                                                 {
240                                                         config.setCluster(clusterDBName);
241                                                         config.setNode(nodeName);
242                                                         LOGGER.info("set db name to "+clusterDBName+" nodename="+nodeName );
243                                                 }
244                                                 else
245                                                         LOGGER.warn("unable to set correct db clustername");
246                                         }
247                                         String content = new String(Files.readAllBytes(p), charset);
248                                         content = content.replaceAll("\\$clustername", config.getCluster()).replaceAll("\\$nodename",
249                                                         config.getNode()).replaceAll("\\$hostname", hostName);
250
251                                         //add cluster configuration
252                                         if(akkaConfig!=null && akkaConfig.isCluster())
253                                         {
254                                                 List<ClusterNodeInfo> seedNodes=akkaConfig.getClusterConfig().getSeedNodes();
255                                                 String nodesJSONString="[\""+seedNodes.get(0).getRemoteAddress()+"\"";
256                                                 for(int i=1;i<seedNodes.size();i++)
257                                                         nodesJSONString+=",\""+seedNodes.get(i).getRemoteAddress()+"\"";
258                                                 nodesJSONString+="]";
259                                                 content+=System.lineSeparator()+String.format("discovery.zen.ping.unicast.hosts: %s",nodesJSONString);
260
261                                                 if(geoConfig!=null)
262                                                 {
263                                                         LOGGER.debug("adding zone configuration");
264                                                         content+=System.lineSeparator()+String.format("cluster.routing.allocation.awareness.force.zone.values: zone1,zone2");
265                                                         content+=System.lineSeparator()+String.format("cluster.routing.allocation.awareness.attributes: zone");
266                                                         if(geoConfig.isPrimary(akkaConfig.getClusterConfig().getRoleMember()))
267                                                         {
268                                                                 content+=System.lineSeparator()+String.format("node.zone: zone1");
269                                                                 LOGGER.debug("setting zone to zone1");
270                                                         }
271                                                         else
272                                                         {
273                                                                 content+=System.lineSeparator()+String.format("node.zone: zone2");
274                                                                 LOGGER.debug("setting zone to zone2");
275                                                         }
276                                                 }
277                                         }
278                                         Files.write(p, content.getBytes(charset));
279                                 } catch (IOException e) {
280                                         LOGGER.warn("problem replacing values in file: " + e.getMessage());
281
282                                 }
283                         } else {
284                                 LOGGER.warn("problem writing database.yml to etc folder from res");
285                         }
286                 }
287         }
288
289         /**
290          * Start as singleton
291          * @return the node
292          */
293         public static HtDatabaseNode start(EsConfig config) throws IllegalStateException {
294                 return start(config,null,null);
295         }
296
297         public static HtDatabaseNode start(EsConfig config, AkkaConfig akkaConfig,GeoConfig geoConfig) {
298                 if (isPortAvailable(ES_PORT)) {
299                         LOGGER.info("ES Port not in use. Start internal ES.");
300                         if (oneNode == null) {
301                                 checkorcreateplugins(pluginFolder);
302                                 checkorcreateConfigFile(config,akkaConfig,geoConfig);
303                                 oneNode = new HtDatabaseNode();
304                         } else
305                                 throw new IllegalStateException("Database is already started, but can only be started once. Stop here.");
306                 } else {
307                         LOGGER.info("ES Port in use. External ES used.");
308                 }
309
310                 return oneNode;
311         }
312
313 }