Fixing the sync issues with AAI
[aai/sparky-be.git] / src / main / java / org / openecomp / sparky / synchronizer / AggregationSuggestionSynchronizer.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25
26 package org.openecomp.sparky.synchronizer;
27
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
29
30 import java.util.Map;
31 import java.util.concurrent.ExecutorService;
32
33 import org.openecomp.cl.api.Logger;
34 import org.openecomp.cl.eelf.LoggerFactory;
35 import org.openecomp.cl.mdc.MdcContext;
36 import org.openecomp.sparky.dal.NetworkTransaction;
37 import org.openecomp.sparky.dal.rest.HttpMethod;
38 import org.openecomp.sparky.dal.rest.OperationResult;
39 import org.openecomp.sparky.logging.AaiUiMsgs;
40 import org.openecomp.sparky.synchronizer.entity.AggregationSuggestionEntity;
41 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
42 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
43 import org.openecomp.sparky.synchronizer.task.PerformElasticSearchPut;
44 import org.openecomp.sparky.util.NodeUtils;
45 import org.slf4j.MDC;
46
47 public class AggregationSuggestionSynchronizer extends AbstractEntitySynchronizer
48     implements IndexSynchronizer {
49
50   private static final Logger LOG =
51       LoggerFactory.getInstance().getLogger(AggregationSuggestionSynchronizer.class);
52
53   private boolean isSyncInProgress;
54   private boolean shouldPerformRetry;
55   private Map<String, String> contextMap;
56   protected ExecutorService esPutExecutor;
57
58   public AggregationSuggestionSynchronizer(String indexName) throws Exception {
59     super(LOG, "ASS-" + indexName.toUpperCase(), 2, 5, 5, indexName);
60
61     this.isSyncInProgress = false;
62     this.shouldPerformRetry = false;
63     this.synchronizerName = "Aggregation Suggestion Synchronizer";
64     this.contextMap = MDC.getCopyOfContextMap();
65     this.esPutExecutor = NodeUtils.createNamedExecutor("ASS-ES-PUT", 2, LOG);
66   }
67
68   @Override
69   protected boolean isSyncDone() {
70     int totalWorkOnHand = esWorkOnHand.get();
71
72     if (LOG.isDebugEnabled()) {
73       LOG.debug(AaiUiMsgs.DEBUG_GENERIC,
74           indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand);
75     }
76
77     if (totalWorkOnHand > 0 || !isSyncInProgress) {
78       return false;
79     }
80
81     return true;
82   }
83
84   @Override
85   public OperationState doSync() {
86     isSyncInProgress = true;
87     this.syncDurationInMs = -1;
88     syncStartedTimeStampInMs = System.currentTimeMillis();
89     syncEntity();
90
91     while (!isSyncDone()) {
92       try {
93         if (shouldPerformRetry) {
94           syncEntity();
95         }
96         Thread.sleep(1000);
97       } catch (Exception exc) {
98         // We don't care about this exception
99       }
100     }
101
102     return OperationState.OK;
103   }
104
105   private void syncEntity() {
106     String txnId = NodeUtils.getRandomTxnId();
107     MdcContext.initialize(txnId, "AggregationSuggestionSynchronizer", "", "Sync", "");
108     
109     AggregationSuggestionEntity syncEntity = new AggregationSuggestionEntity();
110     syncEntity.deriveFields();
111
112     String link = null;
113     try {
114       link = getElasticFullUrl("/" + syncEntity.getId(), getIndexName());
115     } catch (Exception exc) {
116       LOG.error(AaiUiMsgs.ES_LINK_UPSERT, exc.getLocalizedMessage());
117     }
118     
119     try {
120       String jsonPayload = null;
121       jsonPayload = syncEntity.getIndexDocumentJson();
122       if (link != null && jsonPayload != null) {
123
124         NetworkTransaction elasticPutTxn = new NetworkTransaction();
125         elasticPutTxn.setLink(link);
126         elasticPutTxn.setOperationType(HttpMethod.PUT);
127
128         esWorkOnHand.incrementAndGet();
129         final Map<String, String> contextMap = MDC.getCopyOfContextMap();
130         supplyAsync(new PerformElasticSearchPut(jsonPayload, elasticPutTxn,
131             esDataProvider, contextMap), esPutExecutor).whenComplete((result, error) -> {
132
133               esWorkOnHand.decrementAndGet();
134
135               if (error != null) {
136                 String message = "Aggregation suggestion entity sync UPDATE PUT error - "
137                     + error.getLocalizedMessage();
138                 LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
139               } else {
140                 updateElasticSearchCounters(result);
141                 wasEsOperationSuccessful(result);
142               }
143             });
144       }
145     } catch (Exception exc) {
146       String message =
147           "Exception caught during aggregation suggestion entity sync PUT operation. Message - "
148               + exc.getLocalizedMessage();
149       LOG.error(AaiUiMsgs.ES_AGGREGATION_SUGGESTION_ENTITY_SYNC_ERROR, message);
150     }
151   }
152
153   private void wasEsOperationSuccessful(NetworkTransaction result) {
154     if (result != null) {
155       OperationResult opResult = result.getOperationResult();
156
157       if (!opResult.wasSuccessful()) {
158         shouldPerformRetry = true;
159       } else {
160         isSyncInProgress = false;
161         shouldPerformRetry = false;
162       }
163     }
164   }
165
166   @Override
167   public SynchronizerState getState() {
168     if (!isSyncDone()) {
169       return SynchronizerState.PERFORMING_SYNCHRONIZATION;
170     }
171
172     return SynchronizerState.IDLE;
173   }
174
175   @Override
176   public String getStatReport(boolean shouldDisplayFinalReport) {
177         syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
178         return getStatReport(syncDurationInMs, shouldDisplayFinalReport);
179   }
180
181   @Override
182   public void shutdown() {
183     this.shutdownExecutors();
184   }
185 }