/** * ============LICENSE_START=================================================== * SPARKY (AAI UI service) * ============================================================================ * Copyright © 2017 AT&T Intellectual Property. * Copyright © 2017 Amdocs * All rights reserved. * ============================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ============LICENSE_END===================================================== * * ECOMP and OpenECOMP are trademarks * and service marks of AT&T Intellectual Property. */ package org.openecomp.sparky.synchronizer; import static java.util.concurrent.CompletableFuture.supplyAsync; import org.openecomp.cl.mdc.MdcContext; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.Map; import java.util.concurrent.ExecutorService; import org.openecomp.cl.api.Logger; import org.openecomp.cl.eelf.LoggerFactory; import org.openecomp.sparky.dal.NetworkTransaction; import org.openecomp.sparky.dal.rest.HttpMethod; import org.openecomp.sparky.dal.rest.OperationResult; import org.openecomp.sparky.logging.AaiUiMsgs; import org.openecomp.sparky.synchronizer.entity.AggregationSuggestionEntity; import org.openecomp.sparky.synchronizer.enumeration.OperationState; import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState; import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut; import org.openecomp.sparky.util.NodeUtils; import org.slf4j.MDC; public class AggregationSuggestionSynchronizer extends AbstractEntitySynchronizer implements IndexSynchronizer { private static final Logger LOG = LoggerFactory.getInstance().getLogger(AggregationSuggestionSynchronizer.class); private boolean isSyncInProgress; private boolean shouldPerformRetry; private Map contextMap; protected ExecutorService esPutExecutor; public AggregationSuggestionSynchronizer(String indexName) throws Exception { super(LOG, "ASS-" + indexName.toUpperCase(), 2, 5, 5, indexName); this.isSyncInProgress = false; this.shouldPerformRetry = false; this.synchronizerName = "Aggregation Suggestion Synchronizer"; this.contextMap = MDC.getCopyOfContextMap(); this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG); } @Override protected boolean isSyncDone() { int totalWorkOnHand = esWorkOnHand.get(); if (LOG.isDebugEnabled()) { LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand); } if (totalWorkOnHand > 0 || !isSyncInProgress) { return false; } return true; } @Override public OperationState doSync() { isSyncInProgress = true; syncEntity(); while (!isSyncDone()) { try { if (shouldPerformRetry) { syncEntity(); } Thread.sleep(1000); } catch (Exception exc) { // We don't care about this exception } } return OperationState.OK; } private void syncEntity() { String txnId = NodeUtils.getRandomTxnId(); MdcContext.initialize(txnId, "AggregationSuggestionSynchronizer", "", "Sync", ""); AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity(); syncEntity.deriveFields(); String link = null; try { link = getElasticFullUrl("/" + syncEntity.getId(), getIndexName()); } catch (Exception exc) { LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage()); } try { String jsonPayload = null; jsonPayload = syncEntity.getIndexDocumentJson(); if (link != null && jsonPayload != null) { NetworkTransaction elasticPutTxn = new NetworkTransaction(); elasticPutTxn.setLink(link); elasticPutTxn.setOperationType(HttpMethod.PUT); esWorkOnHand.incrementAndGet(); final Map contextMap = MDC.getCopyOfContextMap(); supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn, esDataProvider, contextMap), esPutExecutor).whenComplete((result, error) -> { esWorkOnHand.decrementAndGet(); if (error != null) { String message = "Aggregation suggestion entity sync UPDATE PUT error - " + error.getLocalizedMessage(); LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message); } else { updateElasticSearchCounters(result); wasEsOperationSuccessful(result); } }); } } catch (Exception exc) { String message = "Exception caught during aggregation suggestion entity sync PUT operation. Message - " + exc.getLocalizedMessage(); LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message); } } private void wasEsOperationSuccessful(NetworkTransaction result) { if (result != null) { OperationResult opResult = result.getOperationResult(); if (!opResult.wasSuccessful()) { shouldPerformRetry = true; } else { isSyncInProgress = false; shouldPerformRetry = false; } } } @Override public SynchronizerState getState() { if (!isSyncDone()) { return SynchronizerState.PERFORMING_SYNCHRONIZATION; } return SynchronizerState.IDLE; } @Override public String getStatReport(boolean shouldDisplayFinalReport) { return getStatReport(System.currentTimeMillis() - this.syncStartedTimeStampInMs, shouldDisplayFinalReport); } @Override public void shutdown() { this.shutdownExecutors(); } }