Update the dependencies to use project version
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / autosuggestion / sync / VnfAliasSuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.autosuggestion.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.util.Map;
28 import java.util.concurrent.ExecutorService;
29
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.restclient.client.OperationResult;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.rest.HttpMethod;
36 import org.onap.aai.sparky.logging.AaiUiMsgs;
37 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
38 import org.onap.aai.sparky.sync.IndexSynchronizer;
39 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
40 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
41 import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity;
42 import org.onap.aai.sparky.sync.enumeration.OperationState;
43 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
44 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
45 import org.onap.aai.sparky.util.NodeUtils;
46 import org.slf4j.MDC;
47
48
49 public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
50     implements IndexSynchronizer {
51
52   private static final Logger LOG =
53       LoggerFactory.getInstance().getLogger(VnfAliasSuggestionSynchronizer.class);
54
55   private boolean isSyncInProgress;
56   private boolean shouldPerformRetry;
57   private Map<String, String> contextMap;
58   protected ExecutorService esPutExecutor;
59
60   public VnfAliasSuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig,
61       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
62       NetworkStatisticsConfig esStatConfig) throws Exception {
63     super(LOG, "VASS-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
64         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
65
66     this.isSyncInProgress = false;
67     this.shouldPerformRetry = false;
68     this.synchronizerName = "VNFs Alias Suggestion Synchronizer";
69     this.contextMap = MDC.getCopyOfContextMap();
70     this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
71   }
72
73   @Override
74   protected boolean isSyncDone() {
75     int totalWorkOnHand = esWorkOnHand.get();
76
77     if (LOG.isDebugEnabled()) {
78       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
79           indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
80     }
81
82     if (totalWorkOnHand > 0 || !isSyncInProgress) {
83       return false;
84     }
85
86     return true;
87   }
88
89   @Override
90   public OperationState doSync() {
91     isSyncInProgress = true;
92     this.syncDurationInMs = -1;
93     syncStartedTimeStampInMs = System.currentTimeMillis();
94
95     syncEntity();
96
97     while (!isSyncDone()) {
98       try {
99         if (shouldPerformRetry) {
100           syncEntity();
101         }
102         Thread.sleep(1000);
103       } catch (Exception exc) {
104         // We don't care about this exception
105       }
106     }
107
108     return OperationState.OK;
109   }
110
111   private void syncEntity() {
112     String txnId = NodeUtils.getRandomTxnId();
113     MdcContext.initialize(txnId, synchronizerName, "", "Sync", "");
114
115     AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity();
116     syncEntity.deriveFields();
117     syncEntity.initializeFilters();
118
119     String link = null;
120     try {
121       link = getElasticFullUrl("/" + syncEntity.getId(), getIndexName());
122     } catch (Exception exc) {
123       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
124     }
125
126     try {
127       String jsonPayload = null;
128       jsonPayload = syncEntity.getAsJson();
129       if (link != null && jsonPayload != null) {
130
131         NetworkTransaction elasticPutTxn = new NetworkTransaction();
132         elasticPutTxn.setLink(link);
133         elasticPutTxn.setOperationType(HttpMethod.PUT);
134
135         esWorkOnHand.incrementAndGet();
136         final Map<String, String> contextMap = MDC.getCopyOfContextMap();
137         supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn, elasticSearchAdapter,
138             contextMap), esPutExecutor).whenComplete((result, error) -> {
139
140               esWorkOnHand.decrementAndGet();
141
142               if (error != null) {
143                 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
144                     + error.getLocalizedMessage();
145                 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
146               } else {
147                 updateElasticSearchCounters(result);
148                 wasEsOperationSuccessful(result);
149               }
150             });
151       }
152     } catch (Exception exc) {
153       String message =
154           "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
155               + exc.getLocalizedMessage();
156       LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
157     }
158   }
159
160   private void wasEsOperationSuccessful(NetworkTransaction result) {
161     if (result != null) {
162       OperationResult opResult = result.getOperationResult();
163
164       if (!opResult.wasSuccessful()) {
165         shouldPerformRetry = true;
166       } else {
167         isSyncInProgress = false;
168         shouldPerformRetry = false;
169       }
170     }
171   }
172
173   @Override
174   public SynchronizerState getState() {
175     if (!isSyncDone()) {
176       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
177     }
178
179     return SynchronizerState.IDLE;
180   }
181
182   @Override
183   public String getStatReport(boolean shouldDisplayFinalReport) {
184     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
185     return getStatReport(syncDurationInMs, shouldDisplayFinalReport);
186   }
187
188   @Override
189   public void shutdown() {
190     this.shutdownExecutors();
191   }
192 }