9be334241f1082fb99d4dad273a96de57dd18546
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / autosuggestion / sync / VnfAliasSuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.autosuggestion.sync;
24
25 import static java.util.concurrent.CompletableFuture.supplyAsync;
26
27 import java.util.Map;
28 import java.util.concurrent.ExecutorService;
29
30 import org.onap.aai.cl.api.Logger;
31 import org.onap.aai.cl.eelf.LoggerFactory;
32 import org.onap.aai.cl.mdc.MdcContext;
33 import org.onap.aai.restclient.client.OperationResult;
34 import org.onap.aai.sparky.dal.NetworkTransaction;
35 import org.onap.aai.sparky.dal.rest.HttpMethod;
36 import org.onap.aai.sparky.logging.AaiUiMsgs;
37 import org.onap.aai.sparky.search.filters.config.FiltersConfig;
38 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
39 import org.onap.aai.sparky.sync.IndexSynchronizer;
40 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
41 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
42 import org.onap.aai.sparky.sync.entity.AggregationSuggestionEntity;
43 import org.onap.aai.sparky.sync.enumeration.OperationState;
44 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
45 import org.onap.aai.sparky.sync.task.PerformElasticSearchPut;
46 import org.onap.aai.sparky.util.NodeUtils;
47 import org.slf4j.MDC;
48
49
50 public class VnfAliasSuggestionSynchronizer extends AbstractEntitySynchronizer
51     implements IndexSynchronizer {
52
53   private static final Logger LOG =
54       LoggerFactory.getInstance().getLogger(VnfAliasSuggestionSynchronizer.class);
55
56   private boolean isSyncInProgress;
57   private boolean shouldPerformRetry;
58   private Map<String, String> contextMap;
59   protected ExecutorService esPutExecutor;
60   private FiltersConfig filtersConfig;
61
62   public VnfAliasSuggestionSynchronizer(ElasticSearchSchemaConfig schemaConfig,
63       int internalSyncWorkers, int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
64       NetworkStatisticsConfig esStatConfig, FiltersConfig filtersConfig) throws Exception {
65     super(LOG, "VASS-" + schemaConfig.getIndexName().toUpperCase(), internalSyncWorkers, aaiWorkers,
66         esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
67
68     this.isSyncInProgress = false;
69     this.shouldPerformRetry = false;
70     this.synchronizerName = "VNFs Alias Suggestion Synchronizer";
71     this.contextMap = MDC.getCopyOfContextMap();
72     this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
73     this.filtersConfig = filtersConfig;
74   }
75
76   @Override
77   protected boolean isSyncDone() {
78     int totalWorkOnHand = esWorkOnHand.get();
79
80     if (LOG.isDebugEnabled()) {
81       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
82           indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
83     }
84
85     if (totalWorkOnHand > 0 || !isSyncInProgress) {
86       return false;
87     }
88
89     return true;
90   }
91
92   @Override
93   public OperationState doSync() {
94     isSyncInProgress = true;
95     this.syncDurationInMs = -1;
96     syncStartedTimeStampInMs = System.currentTimeMillis();
97
98     syncEntity();
99
100     while (!isSyncDone()) {
101       try {
102         if (shouldPerformRetry) {
103           syncEntity();
104         }
105         Thread.sleep(1000);
106       } catch (Exception exc) {
107         // We don't care about this exception
108       }
109     }
110
111     return OperationState.OK;
112   }
113
114   private void syncEntity() {
115     String txnId = NodeUtils.getRandomTxnId();
116     MdcContext.initialize(txnId, synchronizerName, "", "Sync", "");
117     
118     AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity(filtersConfig);
119     syncEntity.deriveFields();
120     syncEntity.initializeFilters();
121
122     String link = null;
123     try {
124       link = elasticSearchAdapter.buildElasticSearchGetDocUrl(getIndexName(), syncEntity.getId());
125     } catch (Exception exc) {
126       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
127     }
128     
129     try {
130       String jsonPayload = null;
131       jsonPayload = syncEntity.getAsJson();
132       if (link != null && jsonPayload != null) {
133
134         NetworkTransaction elasticPutTxn = new NetworkTransaction();
135         elasticPutTxn.setLink(link);
136         elasticPutTxn.setOperationType(HttpMethod.PUT);
137
138         esWorkOnHand.incrementAndGet();
139         final Map<String, String> contextMap = MDC.getCopyOfContextMap();
140         supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn,
141             elasticSearchAdapter, contextMap), esPutExecutor).whenComplete((result, error) -> {
142
143               esWorkOnHand.decrementAndGet();
144
145               if (error != null) {
146                 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
147                     + error.getLocalizedMessage();
148                 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
149               } else {
150                 updateElasticSearchCounters(result);
151                 wasEsOperationSuccessful(result);
152               }
153             });
154       }
155     } catch (Exception exc) {
156       String message =
157           "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
158               + exc.getLocalizedMessage();
159       LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
160     }
161   }
162
163   private void wasEsOperationSuccessful(NetworkTransaction result) {
164     if (result != null) {
165       OperationResult opResult = result.getOperationResult();
166
167       if (!opResult.wasSuccessful()) {
168         shouldPerformRetry = true;
169       } else {
170         isSyncInProgress = false;
171         shouldPerformRetry = false;
172       }
173     }
174   }
175
176   @Override
177   public SynchronizerState getState() {
178     if (!isSyncDone()) {
179       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
180     }
181
182     return SynchronizerState.IDLE;
183   }
184
185   @Override
186   public String getStatReport(boolean shouldDisplayFinalReport) {
187     syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
188     return getStatReport(syncDurationInMs, shouldDisplayFinalReport);
189   }
190
191   @Override
192   public void shutdown() {
193     this.shutdownExecutors();
194   }
195 }