2 * ============LICENSE_START=======================================================
3 * ONAP : ccsdk features
4 * ================================================================================
5 * Copyright (C) 2020 highstreet technologies GmbH Intellectual Property.
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.database;
25 import java.io.FileNotFoundException;
26 import java.io.IOException;
27 import java.nio.charset.StandardCharsets;
28 import java.nio.file.Files;
29 import java.text.ParseException;
30 import java.util.ArrayList;
31 import java.util.Arrays;
32 import java.util.List;
34 import org.json.JSONObject;
35 import org.onap.ccsdk.features.sdnr.wt.common.database.HtDatabaseClient;
36 import org.onap.ccsdk.features.sdnr.wt.common.database.SearchHit;
37 import org.onap.ccsdk.features.sdnr.wt.common.database.SearchResult;
38 import org.onap.ccsdk.features.sdnr.wt.common.database.config.HostInfo;
39 import org.onap.ccsdk.features.sdnr.wt.common.database.data.AliasesEntry;
40 import org.onap.ccsdk.features.sdnr.wt.common.database.data.AliasesEntryList;
41 import org.onap.ccsdk.features.sdnr.wt.common.database.data.DatabaseVersion;
42 import org.onap.ccsdk.features.sdnr.wt.common.database.data.IndicesEntry;
43 import org.onap.ccsdk.features.sdnr.wt.common.database.data.IndicesEntryList;
44 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.CreateAliasRequest;
45 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.CreateIndexRequest;
46 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.DeleteAliasRequest;
47 import org.onap.ccsdk.features.sdnr.wt.common.database.requests.DeleteIndexRequest;
48 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.AcknowledgedResponse;
49 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.GetInfoResponse;
50 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.ListAliasesResponse;
51 import org.onap.ccsdk.features.sdnr.wt.common.database.responses.ListIndicesResponse;
52 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.SdnrDbType;
53 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.DataMigrationProviderService;
54 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.ReleaseInformation;
55 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.ComponentData;
56 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.ComponentName;
57 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.DataContainer;
58 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.DataMigrationReport;
59 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.Release;
60 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.ReleaseGroup;
61 import org.onap.ccsdk.features.sdnr.wt.dataprovider.setup.data.SearchHitConverter;
62 import org.slf4j.Logger;
63 import org.slf4j.LoggerFactory;
65 public class ElasticsearchDataMigrationProvider implements DataMigrationProviderService {
68 private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataMigrationProvider.class);
69 private static final String LOG_DELETING_INDEX = "deleting index {}";
70 private final HtDatabaseClient dbClient;
72 public ElasticsearchDataMigrationProvider(String url, String username, String password, boolean trustAll,
73 long timeoutms) throws Exception {
74 dbClient = HtDatabaseClient.getClient(new HostInfo[] {HostInfo.parse(url)}, username, password, trustAll, true,
79 public DataMigrationReport importData(String filename, boolean dryrun) throws Exception {
80 return this.importData(filename, dryrun, Release.CURRENT_RELEASE);
84 public DataMigrationReport importData(String filename, boolean dryrun, Release forRelease) throws Exception {
85 DataMigrationReport report = new DataMigrationReport();
86 File file = new File(filename);
89 report.error("file %s not found", filename);
92 throw new FileNotFoundException(filename);
94 DataContainer container = null;
96 container = DataContainer.load(file);
97 } catch (Exception e) {
99 report.error("problem loading file %s: %s", filename, e.getMessage());
102 throw new Exception("problem loading file " + filename, e);
104 ReleaseInformation ri = ReleaseInformation.getInstance(forRelease);
105 SearchHitConverter converter;
106 Set<ComponentName> components = ri.getComponents();
107 //for all db components of dest architecture
108 for (ComponentName component : components) {
109 //convert to ComponentData for current release with existing ComponentData of the container
110 converter = SearchHitConverter.Factory.getInstance(container.getRelease(), forRelease, component);
111 if (converter == null) {
114 ComponentData data = converter.convert(container);
116 String indexName = ri.getAlias(component);
117 String dataTypeName = ri.getDataType(component);
119 report.log("write %d entries into %s/%s", data.size(), indexName, dataTypeName);
121 LOG.debug("write {} entries into {}/{}", data.size(), indexName, dataTypeName);
123 for (SearchHit item : data) {
125 String id = this.dbClient.doWriteRaw(indexName, dataTypeName, item.getId(),
126 item.getSourceAsString(), true);
127 if (!item.getId().equals(id)) {
128 LOG.warn("entry for {} with original id {} was written with another id {}",
129 component.getValue(), item.getId(), id);
135 report.error("unable to convert data for " + component.getValue() + " from version "
136 + container.getRelease().getValue() + " to " + forRelease.getValue() + "\n");
138 LOG.warn("unable to convert data for {} from version {} to {}", component.getValue(),
139 container.getRelease().getValue(), forRelease.getValue());
143 LOG.info("import of {} completed", filename);
145 report.log("import of %s completed", filename);
147 report.setCompleted(true);
153 * export data if file exists .1 (.n) will be created
157 public DataMigrationReport exportData(String filename) {
158 DataMigrationReport report = new DataMigrationReport();
160 DataContainer container = new DataContainer();
162 filename = this.checkFilenameForWrite(filename);
163 LOG.info("output will be written to {}", filename);
165 Release dbRelease = this.autoDetectRelease();
166 if (dbRelease == null) {
167 report.error("unbable to detect db release. is database initialized?");
170 ReleaseInformation ri = ReleaseInformation.getInstance(dbRelease);
171 boolean componentsSucceeded = true;
172 for (ComponentName c : ri.getComponents()) {
173 ComponentData data = new ComponentData(c);
174 SearchResult<SearchHit> result = this.dbClient.doReadAllJsonData(ri.getAlias(c), ri.getDataType(c), false);
175 data.addAll(result.getHits());
176 container.addComponent(c, data);
179 Files.write(new File(filename).toPath(), Arrays.asList(container.toJSON()), StandardCharsets.UTF_8);
180 report.setCompleted(componentsSucceeded);
181 } catch (IOException e) {
182 LOG.warn("problem writing data to {}: {}", filename, e);
187 private String checkFilenameForWrite(String filename) {
188 File f = new File(filename);
192 return this.checkFilenameForWrite(filename, 0);
195 private String checkFilenameForWrite(String filename, int apdx) {
196 File f = new File(String.format("$s.$d", filename, apdx));
200 return this.checkFilenameForWrite(filename, apdx + 1);
204 public Release getCurrentVersion() {
205 return Release.CURRENT_RELEASE;
210 public Release autoDetectRelease() {
211 DatabaseVersion dbVersion = this.readActualVersion();
212 AliasesEntryList aliases = this.readAliases();
213 IndicesEntryList indices = this.readIndices();
214 if (indices == null) {
217 List<Release> foundReleases = new ArrayList<>();
218 //if there are active aliases reduce indices to the active ones
219 if (aliases != null && !aliases.isEmpty()) {
220 indices = indices.subList(aliases.getLinkedIndices());
222 for (Release r : Release.values()) {
223 if (r.isDbInRange(dbVersion, SdnrDbType.ELASTICSEARCH)) {
224 ReleaseInformation ri = ReleaseInformation.getInstance(r);
225 if (ri != null && ri.containsIndices(indices)) {
226 foundReleases.add(r);
230 if (foundReleases.size() == 1) {
231 return foundReleases.get(0);
233 LOG.error("detect {} releases: {}. unable to detect for which one to do sth.", foundReleases.size(),
238 private DatabaseVersion readActualVersion() {
240 GetInfoResponse response = this.dbClient.getInfo();
241 return response.getVersion();
242 } catch (Exception e) {
243 LOG.warn(e.getMessage());
248 private AliasesEntryList readAliases() {
249 AliasesEntryList entries = null;
251 ListAliasesResponse response = this.dbClient.getAliases();
252 entries = response.getEntries();
253 } catch (ParseException | IOException e) {
254 LOG.error(e.getMessage());
259 private IndicesEntryList readIndices() {
260 IndicesEntryList entries = null;
262 ListIndicesResponse response = this.dbClient.getIndices();
263 entries = response.getEntries();
264 } catch (ParseException | IOException e) {
265 LOG.error(e.getMessage());
272 public boolean initDatabase(Release release, int numShards, int numReplicas, String dbPrefix, boolean forceRecreate,
275 this.dbClient.waitForYellowStatus(timeoutms);
277 DatabaseVersion dbVersion = this.readActualVersion();
278 if (dbVersion == null) {
281 LOG.info("detected database version {}", dbVersion);
282 if (release == null) {
283 release = ReleaseGroup.CURRENT_RELEASE.getLatestCompatibleRelease(dbVersion, SdnrDbType.ELASTICSEARCH);
284 if (release == null) {
285 LOG.warn("unable to autodetect release for this database version for release {}",
286 ReleaseGroup.CURRENT_RELEASE.name());
289 LOG.info("autodetect release {}", release);
291 if (!release.isDbInRange(dbVersion, SdnrDbType.ELASTICSEARCH)) {
292 LOG.warn("db version {} maybe not compatible with release {}", dbVersion, release);
296 this.clearDatabase(release, dbPrefix, 0);
298 ReleaseInformation ri = ReleaseInformation.getInstance(release);
299 AliasesEntryList aliases = this.readAliases();
300 IndicesEntryList indices = this.readIndices();
301 if (aliases == null || indices == null) {
304 AcknowledgedResponse response = null;
305 if (!ri.runPreInitCommands(this.dbClient)) {
308 for (ComponentName component : ri.getComponents()) {
310 if (ri.hasOwnDbIndex(component)) {
311 //check if index already exists
312 String indexName = ri.getIndex(component, dbPrefix);
313 String aliasName = ri.getAlias(component, dbPrefix);
314 if (indices.findByIndex(indexName) == null) {
315 LOG.info("creating index for {}", component);
316 CreateIndexRequest request = new CreateIndexRequest(ri.getIndex(component, dbPrefix));
317 request.mappings(new JSONObject(ri.getDatabaseMapping(component)));
318 request.settings(new JSONObject(ri.getDatabaseSettings(component, numShards, numReplicas)));
319 response = this.dbClient.createIndex(request);
320 LOG.info(response.isAcknowledged() ? "succeeded" : "failed");
322 LOG.info("index {} for {} already exists", indexName, component);
324 //check if alias already exists
325 if (aliases.findByAlias(aliasName) == null) {
326 LOG.info("creating alias for {}", component);
327 response = this.dbClient.createAlias(new CreateAliasRequest(indexName, aliasName));
328 LOG.info(response.isAcknowledged() ? "succeeded" : "failed");
330 LOG.info("alias {} for index {} for {} already exists", aliasName, indexName, component);
333 } catch (IOException e) {
334 LOG.error(e.getMessage());
338 if (!ri.runPostInitCommands(this.dbClient)) {
345 public boolean clearDatabase(Release release, String dbPrefix, long timeoutms) {
348 this.dbClient.waitForYellowStatus(timeoutms);
351 AliasesEntryList entries = this.readAliases();
352 IndicesEntryList entries2 = this.readIndices();
353 if (entries == null) {
356 if (release == null) {
357 DatabaseVersion dbVersion = this.readActualVersion();
358 if (dbVersion == null) {
361 LOG.info("detected database version {}", dbVersion);
362 release = ReleaseGroup.CURRENT_RELEASE.getLatestCompatibleRelease(dbVersion, SdnrDbType.ELASTICSEARCH);
363 if (release == null) {
364 LOG.warn("unable to autodetect release for this database version for release {}",
365 ReleaseGroup.CURRENT_RELEASE.name());
368 LOG.info("autodetect release {}", release);
370 ReleaseInformation ri = ReleaseInformation.getInstance(release);
371 AcknowledgedResponse response;
372 if (entries.isEmpty()) {
373 LOG.info("no aliases to clear");
375 //check for every component of release if alias exists
376 for (ComponentName component : ri.getComponents()) {
377 String aliasToDelete = ri.getAlias(component, dbPrefix);
378 AliasesEntry entryToDelete = entries.findByAlias(aliasToDelete);
379 if (entryToDelete != null) {
381 LOG.info("deleting alias {} for index {}", entryToDelete.getAlias(), entryToDelete.getIndex());
382 response = this.dbClient.deleteAlias(
383 new DeleteAliasRequest(entryToDelete.getIndex(), entryToDelete.getAlias()));
384 LOG.info(response.isResponseSucceeded() ? "succeeded" : "failed");
385 } catch (IOException e) {
386 LOG.error(e.getMessage());
390 //try to find malformed typed index with alias name
391 IndicesEntry entry2ToDelete = entries2 == null ? null : entries2.findByIndex(aliasToDelete);
392 if (entry2ToDelete != null) {
394 LOG.info(LOG_DELETING_INDEX, entry2ToDelete.getName());
395 response = this.dbClient.deleteIndex(new DeleteIndexRequest(entry2ToDelete.getName()));
396 LOG.info(response.isResponseSucceeded() ? "succeeded" : "failed");
397 } catch (IOException e) {
398 LOG.error(e.getMessage());
405 if (entries2 == null) {
408 if (entries2.isEmpty()) {
409 LOG.info("no indices to clear");
411 //check for every component of release if index exists
412 for (ComponentName component : ri.getComponents()) {
413 String indexToDelete = ri.getIndex(component, dbPrefix);
414 IndicesEntry entryToDelete = entries2.findByIndex(indexToDelete);
415 if (entryToDelete != null) {
417 LOG.info(LOG_DELETING_INDEX, entryToDelete.getName());
418 response = this.dbClient.deleteIndex(new DeleteIndexRequest(entryToDelete.getName()));
419 LOG.info(response.isResponseSucceeded() ? "succeeded" : "failed");
420 } catch (IOException e) {
421 LOG.error(e.getMessage());
435 public boolean clearCompleteDatabase(long timeoutms) {
437 this.dbClient.waitForYellowStatus(timeoutms);
439 //check aliases and indices
440 AliasesEntryList aliases = this.readAliases();
441 IndicesEntryList indices = this.readIndices();
442 if (aliases == null || indices == null) {
445 for (AliasesEntry alias : aliases) {
447 LOG.info("deleting alias {} for index {}", alias.getAlias(), alias.getIndex());
448 this.dbClient.deleteAlias(new DeleteAliasRequest(alias.getIndex(), alias.getAlias()));
449 } catch (IOException e) {
450 LOG.error("problem deleting alias {}: {}", alias.getAlias(), e);
454 for (IndicesEntry index : indices) {
456 LOG.info(LOG_DELETING_INDEX, index.getName());
457 this.dbClient.deleteIndex(new DeleteIndexRequest(index.getName()));
458 } catch (IOException e) {
459 LOG.error("problem deleting index {}: {}", index.getName(), e);