2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.onap.aai.sparky.autosuggestion.sync;
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
30 import java.util.concurrent.ExecutorService;
32 import org.onap.aai.cl.api.Logger;
33 import org.onap.aai.cl.eelf.LoggerFactory;
34 import org.onap.aai.cl.mdc.MdcContext;
35 import org.onap.aai.restclient.client.OperationResult;
36 import org.onap.aai.sparky.dal.NetworkTransaction;
37 import org.onap.aai.sparky.dal.rest.HttpMethod;
38 import org.onap.aai.sparky.logging.AaiUiMsgs;
39 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
40 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
41 import org.onap.aai.sparky.sync.IndexSynchronizer;
42 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
43 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
44 import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity;
45 import org.onap.aai.sparky.sync.enumeration.OperationState;
46 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
47 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
48 import org.onap.aai.sparky.util.NodeUtils;
52 public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
53 implements IndexSynchronizer {
55 private static final Logger LOG =
56 LoggerFactory.getInstance().getLogger(VnfAliasSuggestionSynchronizer.class);
58 private boolean isSyncInProgress;
59 private boolean shouldPerformRetry;
60 private Map<String, String> contextMap;
61 protected ExecutorService esPutExecutor;
62 private FiltersConfig filtersConfig;
64 public VnfAliasSuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig,
65 int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
66 NetworkStatisticsConfig esStatConfig, FiltersConfig filtersConfig) throws Exception {
67 super(LOG, "VASS-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
68 esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
70 this.isSyncInProgress = false;
71 this.shouldPerformRetry = false;
72 this.synchronizerName = "VNFs Alias Suggestion Synchronizer";
73 this.contextMap = MDC.getCopyOfContextMap();
74 this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
75 this.filtersConfig = filtersConfig;
79 protected boolean isSyncDone() {
80 int totalWorkOnHand = esWorkOnHand.get();
82 if (LOG.isDebugEnabled()) {
83 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
84 indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
87 if (totalWorkOnHand > 0 || !isSyncInProgress) {
95 public OperationState doSync() {
96 isSyncInProgress = true;
97 this.syncDurationInMs = -1;
98 syncStartedTimeStampInMs = System.currentTimeMillis();
102 while (!isSyncDone()) {
104 if (shouldPerformRetry) {
108 } catch (Exception exc) {
109 // We don't care about this exception
113 return OperationState.OK;
116 private void syncEntity() {
117 String txnId = NodeUtils.getRandomTxnId();
118 MdcContext.initialize(txnId, synchronizerName, "", "Sync", "");
120 AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity(filtersConfig);
121 syncEntity.deriveFields();
122 syncEntity.initializeFilters();
126 link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), syncEntity.getId());
127 } catch (Exception exc) {
128 LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
132 String jsonPayload = null;
133 jsonPayload = syncEntity.getAsJson();
134 if (link != null && jsonPayload != null) {
136 NetworkTransaction elasticPutTxn = new NetworkTransaction();
137 elasticPutTxn.setLink(link);
138 elasticPutTxn.setOperationType(HttpMethod.PUT);
140 esWorkOnHand.incrementAndGet();
141 final Map<String, String> contextMap = MDC.getCopyOfContextMap();
142 supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn,
143 elasticSearchAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> {
145 esWorkOnHand.decrementAndGet();
148 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
149 + error.getLocalizedMessage();
150 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
152 updateElasticSearchCounters(result);
153 wasEsOperationSuccessful(result);
157 } catch (Exception exc) {
159 "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
160 + exc.getLocalizedMessage();
161 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
165 private void wasEsOperationSuccessful(NetworkTransaction result) {
166 if (result != null) {
167 OperationResult opResult = result.getOperationResult();
169 if (!opResult.wasSuccessful()) {
170 shouldPerformRetry = true;
172 isSyncInProgress = false;
173 shouldPerformRetry = false;
179 public SynchronizerState getState() {
181 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
184 return SynchronizerState.IDLE;
188 public String getStatReport(boolean shouldDisplayFinalReport) {
189 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
190 return getStatReport(syncDurationInMs, shouldDisplayFinalReport);
194 public void shutdown() {
195 this.shutdownExecutors();