Merge "fix connection state machine"
[ccsdk/features.git] / sdnr / wt / data-provider / setup / src / main / java / org / onap / ccsdk / features / sdnr / wt / dataprovider / setup / DataMigrationProviderImpl.java
1 /*
2  * ============LICENSE_START=======================================================
3  * ONAP : ccsdk features
4  * ================================================================================
5  * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
6  * All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *     http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  */
22 package org.onap.ccsdk.features.sdnr.wt.dataprovider.setup;
23
24 import java.io.File;
25 import java.io.FileNotFoundException;
26 import java.io.IOException;
27 import java.nio.charset.StandardCharsets;
28 import java.nio.file.Files;
29 import java.text.ParseException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.List;
33 import java.util.Set;
34
35 import org.json.JSONObject;
36 import org.onap.ccsdk.features.sdnr.wt.common.database.HtDatabaseClient;
37 import org.onap.ccsdk.features.sdnr.wt.common.database.Portstatus;
38 import org.onap.ccsdk.features.sdnr.wt.common.database.SearchHit;
39 import org.onap.ccsdk.features.sdnr.wt.common.database.SearchResult;
40 import org.onap.ccsdk.features.sdnr.wt.common.database.config.HostInfo;
41 import org.onap.ccsdk.features.sdnr.wt.common.database.data.AliasesEntry;
42 import org.onap.ccsdk.features.sdnr.wt.common.database.data.AliasesEntryList;
43 import org.onap.ccsdk.features.sdnr.wt.common.database.data.EsVersion;
44 import org.onap.ccsdk.features.sdnr.wt.common.database.data.IndicesEntry;
45 import org.onap.ccsdk.features.sdnr.wt.common.database.data.IndicesEntryList;
46 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.CreateAliasRequest;
47 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.CreateIndexRequest;
48 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.DeleteAliasRequest;
49 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.DeleteIndexRequest;
50 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.AcknowledgedResponse;
51 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.GetInfoResponse;
52 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.ListAliasesResponse;
53 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.ListIndicesResponse;
54 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.ComponentData;
55 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.ComponentName;
56 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.DataMigrationReport;
57 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.DataContainer;
58 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.Release;
59 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.SearchHitConverter;
60 import org.slf4j.Logger;
61 import org.slf4j.LoggerFactory;
62
63 public class DataMigrationProviderImpl implements DataMigrationProviderService {
64
65         
66     private static final Logger LOG = LoggerFactory.getLogger(DataMigrationProviderImpl.class);
67     private final HtDatabaseClient dbClient;
68
69     public DataMigrationProviderImpl(HostInfo[] hosts, String username, String password, boolean trustAll, long timeoutms) {
70
71         if(timeoutms>0) {
72             Portstatus.waitSecondsTillAvailable(timeoutms/1000, hosts);
73         }
74         this.dbClient = new HtDatabaseClient(hosts, username, password, trustAll);
75     }
76
77     @Override
78     public DataMigrationReport importData(String filename, boolean dryrun) throws Exception {
79         return this.importData(filename, dryrun, Release.CURRENT_RELEASE);
80     }
81
82     public DataMigrationReport importData(String filename, boolean dryrun, Release forRelease) throws Exception {
83         DataMigrationReport report = new DataMigrationReport();
84         File file = new File(filename);
85         if (!file.exists()) {
86             if (dryrun) {
87                 report.error("file %s not found", filename);
88                 return report;
89             }
90             throw new FileNotFoundException(filename);
91         }
92         DataContainer container = null;
93         try {
94             container = DataContainer.load(file);
95         } catch (Exception e) {
96             if (dryrun) {
97                 report.error("problem loading file %s: %s", filename, e.getMessage());
98                 return report;
99             }
100             throw new Exception("problem loading file " + filename, e);
101         }
102         ReleaseInformation ri = ReleaseInformation.getInstance(forRelease);
103         SearchHitConverter converter;
104         Set<ComponentName> components = ri.getComponents();
105         //for all db components of dest architecture
106         for (ComponentName component : components) {
107             //convert to ComponentData for current release with existing ComponentData of the container
108             converter = SearchHitConverter.Factory.getInstance(container.getRelease(), forRelease, component);
109             if (converter == null) {
110                 continue;
111             }
112             ComponentData data = converter.convert(container);
113             if (data != null) {
114                 String indexName = ri.getAlias(component);
115                 String dataTypeName = ri.getDataType(component);
116                 if (dryrun) {
117                     report.log("write %d entries into %s/%s", data.size(), indexName, dataTypeName);
118                 } else {
119                     LOG.debug("write {} entries into {}/{}", data.size(), indexName, dataTypeName);
120                 }
121                 for (SearchHit item : data) {
122                     if (!dryrun) {
123                         String id = this.dbClient.doWriteRaw(indexName, dataTypeName, item.getId(),
124                                 item.getSourceAsString());
125                         if (!item.getId().equals(id)) {
126                             LOG.warn("entry for {} with original id {} was written with another id {}",
127                                     component.getValue(), item.getId(), id);
128                         }
129                     }
130                 }
131             } else {
132                 if (dryrun) {
133                     report.error("unable to convert data for " + component.getValue() + " from version "
134                             + container.getRelease().getValue() + " to " + forRelease.getValue() + "\n");
135                 } else {
136                     LOG.warn("unable to convert data for {} from version {} to {}", component.getValue(),
137                             container.getRelease().getValue(), forRelease.getValue());
138                 }
139             }
140         }
141         LOG.info("import of {} completed", filename);
142         if (dryrun) {
143             report.log("import of %s completed", filename);
144         }
145         report.setCompleted(true);
146         return report;
147     }
148
149
150     /**
151      * export data
152      * if file exists .1 (.n) will be created
153      *
154      */
155     @Override
156     public DataMigrationReport exportData(String filename) {
157         DataMigrationReport report = new DataMigrationReport();
158
159         DataContainer container = new DataContainer();
160
161         filename = this.checkFilenameForWrite(filename);
162         LOG.info("output will be written to {}", filename);
163         //autodetect version
164         Release dbRelease = this.autoDetectRelease();
165         if(dbRelease==null) {
166             report.error("unbable to detect db release. is database initialized?");
167             return report;
168         }
169         ReleaseInformation ri = ReleaseInformation.getInstance(dbRelease);
170         boolean componentsSucceeded = true;
171         for(ComponentName c: ri.getComponents()) {
172             ComponentData data = new ComponentData(c);
173             SearchResult<SearchHit> result = this.dbClient.doReadAllJsonData(ri.getAlias(c),ri.getDataType(c),false);
174             data.addAll(result.getHits());
175             container.addComponent(c, data );
176         }
177         try {
178             Files.write(new File(filename).toPath(), Arrays.asList(container.toJSON()), StandardCharsets.UTF_8);
179             report.setCompleted(componentsSucceeded);
180         } catch (IOException e) {
181             LOG.warn("problem writing data to {}: {}", filename, e);
182         }
183         return report;
184     }
185
186     private String checkFilenameForWrite(String filename) {
187         File f = new File(filename);
188         if (!f.exists()) {
189             return filename;
190         }
191         return this.checkFilenameForWrite(filename, 0);
192     }
193
194     private String checkFilenameForWrite(String filename, int apdx) {
195         File f = new File(String.format("$s.$d",filename,apdx));
196         if (!f.exists()) {
197             return filename;
198         }
199         return this.checkFilenameForWrite(filename, apdx + 1);
200     }
201
202     @Override
203     public Release getCurrentVersion() {
204         return Release.CURRENT_RELEASE;
205     }
206
207
208     public Release autoDetectRelease() {
209         EsVersion dbVersion = this.readActualVersion();
210         AliasesEntryList aliases = this.readAliases();
211         IndicesEntryList indices = this.readIndices();
212         if(indices==null) {
213             return null;
214         }
215         List<Release> foundReleases = new ArrayList<>();
216         //if there are active aliases reduce indices to the active ones
217         if(aliases!=null && aliases.size()>0) {
218             indices = indices.subList(aliases.getLinkedIndices());
219         }
220         for(Release r:Release.values()) {
221             if(r.isDbInRange(dbVersion)) {
222                 ReleaseInformation ri = ReleaseInformation.getInstance(r);
223                 if(ri!=null && ri.containsIndices(indices)) {
224                     foundReleases.add(r);
225                 }
226             }
227         }
228         if (foundReleases.size() == 1) {
229             return foundReleases.get(0);
230         }
231         LOG.error("detect {} releases: {}. unable to detect for which one to do sth.",foundReleases.size(), foundReleases);
232         return null;
233     }
234     private EsVersion readActualVersion() {
235         try {
236             GetInfoResponse response = this.dbClient.getInfo();
237             return response.getVersion();
238         } catch (Exception e) {
239             LOG.warn(e.getMessage());
240         }
241         return null;
242     }
243
244     private AliasesEntryList readAliases() {
245         AliasesEntryList entries = null;
246         try {
247             ListAliasesResponse response = this.dbClient.getAliases();
248             entries = response.getEntries();
249         } catch (ParseException | IOException e) {
250             LOG.error(e.getMessage());
251         }
252         return entries;
253     }
254
255     private IndicesEntryList readIndices() {
256         IndicesEntryList entries = null;
257         try {
258             ListIndicesResponse response = this.dbClient.getIndices();
259             entries = response.getEntries();
260         } catch (ParseException | IOException e) {
261             LOG.error(e.getMessage());
262         }
263         return entries;
264     }
265
266     @Override
267     public boolean initDatabase(Release release, int numShards, int numReplicas, String dbPrefix,
268             boolean forceRecreate,long timeoutms) {
269         if(timeoutms>0) {
270             this.dbClient.waitForYellowStatus(timeoutms);
271         }
272         EsVersion dbVersion = this.readActualVersion();
273         if (dbVersion == null) {
274             return false;
275         }
276         if (!release.isDbInRange(dbVersion)) {
277             LOG.warn("db version {} maybe not compatible with release {}", dbVersion, release);
278             return false;
279         }
280         if (forceRecreate) {
281             this.clearDatabase(release, dbPrefix,0);
282         }
283         ReleaseInformation ri = ReleaseInformation.getInstance(release);
284         AliasesEntryList aliases = this.readAliases();
285         IndicesEntryList indices = this.readIndices();
286         if (aliases == null || indices == null) {
287             return false;
288         }
289         AcknowledgedResponse response = null;
290         if(!ri.runPreInitCommands(this.dbClient)) {
291             return false;
292         }
293         for (ComponentName component : ri.getComponents()) {
294             try {
295                 if (ri.hasOwnDbIndex(component)) {
296                     //check if index already exists
297                     String indexName = ri.getIndex(component, dbPrefix);
298                     String aliasName = ri.getAlias(component, dbPrefix);
299                     if (indices.findByIndex(indexName) == null) {
300                         LOG.info("creating index for {}", component);
301                         CreateIndexRequest request = new CreateIndexRequest(ri.getIndex(component, dbPrefix));
302                         request.mappings(new JSONObject(ri.getDatabaseMapping(component)));
303                         request.settings(new JSONObject(ri.getDatabaseSettings(component, numShards, numReplicas)));
304                         response = this.dbClient.createIndex(request);
305                         LOG.info(response.isAcknowledged() ? "succeeded" : "failed");
306                     } else {
307                         LOG.info("index {} for {} already exists", indexName, component);
308                     }
309                     //check if alias already exists
310                     if (aliases.findByAlias(aliasName) == null) {
311                         LOG.info("creating alias for {}", component);
312                         response = this.dbClient.createAlias(new CreateAliasRequest(indexName, aliasName));
313                         LOG.info(response.isAcknowledged() ? "succeeded" : "failed");
314                     } else {
315                         LOG.info("alias {} for index {} for {} already exists", aliasName, indexName, component);
316                     }
317                 }
318             } catch (IOException e) {
319                 LOG.error(e.getMessage());
320                 return false;
321             }
322         }
323         if(!ri.runPostInitCommands(this.dbClient)) {
324             return false;
325         }
326         return true;
327     }
328
329     @Override
330     public boolean clearDatabase(Release release, String dbPrefix, long timeoutms) {
331
332         if(timeoutms>0) {
333             this.dbClient.waitForYellowStatus(timeoutms);
334         }
335         //check aliases
336         AliasesEntryList entries = this.readAliases();
337         if (entries == null) {
338             return false;
339         }
340         ReleaseInformation ri = ReleaseInformation.getInstance(release);
341         AcknowledgedResponse response;
342         if (entries.size() <= 0) {
343             LOG.info("no aliases to clear");
344         } else {
345             //check for every component of release if alias exists
346             for (ComponentName component : ri.getComponents()) {
347                 String aliasToDelete = ri.getAlias(component, dbPrefix);
348                 AliasesEntry entryToDelete = entries.findByAlias(aliasToDelete);
349                 if (entryToDelete != null) {
350                     try {
351                         LOG.info("deleting alias {} for index {}", entryToDelete.getAlias(), entryToDelete.getIndex());
352                         response=this.dbClient.deleteAlias(
353                                 new DeleteAliasRequest(entryToDelete.getIndex(), entryToDelete.getAlias()));
354                         LOG.info(response.isResponseSucceeded()?"succeeded":"failed");
355                     } catch (IOException e) {
356                         LOG.error(e.getMessage());
357                         return false;
358                     }
359                 }
360             }
361         }
362         IndicesEntryList entries2 = this.readIndices();
363         if (entries2 == null) {
364             return false;
365         }
366         if (entries2.size() <= 0) {
367             LOG.info("no indices to clear");
368         } else {
369             //check for every component of release if index exists
370             for (ComponentName component : ri.getComponents()) {
371                 String indexToDelete = ri.getIndex(component, dbPrefix);
372                 IndicesEntry entryToDelete = entries2.findByIndex(indexToDelete);
373                 if (entryToDelete != null) {
374                     try {
375                         LOG.info("deleting index {}", entryToDelete.getName());
376                         response=this.dbClient.deleteIndex(new DeleteIndexRequest(entryToDelete.getName()));
377                         LOG.info(response.isResponseSucceeded()?"succeeded":"failed");
378                     } catch (IOException e) {
379                         LOG.error(e.getMessage());
380                         return false;
381                     }
382                 }
383             }
384         }
385
386         return true;
387     }
388
389         /**
390          * @param timeoutms
391          * @return
392          */
393         public boolean clearCompleteDatabase(long timeoutms) {
394                 if(timeoutms>0) {
395             this.dbClient.waitForYellowStatus(timeoutms);
396         }
397         //check aliases and indices
398                 AliasesEntryList aliases = this.readAliases();
399         IndicesEntryList indices = this.readIndices();
400         if (aliases == null || indices == null) {
401             return false;
402         }
403         for(AliasesEntry alias:aliases) {
404                 try {
405                         LOG.info("deleting alias {} for index {}",alias.getAlias(),alias.getIndex());
406                                 this.dbClient.deleteAlias(new DeleteAliasRequest(alias.getIndex(), alias.getAlias()));
407                         } catch (IOException e) {
408                                 LOG.error("problem deleting alias {}: {}",alias.getAlias(),e);
409                                 return false;
410                         }
411         }
412         for(IndicesEntry index : indices) {
413                 try {
414                         LOG.info("deleting index {}",index.getName());
415                                 this.dbClient.deleteIndex(new DeleteIndexRequest(index.getName()));
416                         } catch (IOException e) {
417                                 LOG.error("problem deleting index {}: {}",index.getName(),e);
418                                 return false;
419                         }
420         }
421         return true;
422         }
423
424 }