2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
26 package org.openecomp.sparky.synchronizer;
28 import static java.util.concurrent.CompletableFuture.supplyAsync;
30 import java.io.IOException;
31 import java.net.InetAddress;
32 import java.net.UnknownHostException;
33 import java.sql.Timestamp;
34 import java.text.SimpleDateFormat;
35 import java.util.Collection;
36 import java.util.EnumSet;
38 import java.util.Map.Entry;
40 import java.util.concurrent.ConcurrentHashMap;
41 import java.util.concurrent.atomic.AtomicInteger;
42 import java.util.function.Supplier;
44 import javax.json.Json;
46 import org.openecomp.cl.api.Logger;
47 import org.openecomp.cl.eelf.LoggerFactory;
48 import org.openecomp.sparky.config.oxm.OxmEntityDescriptor;
49 import org.openecomp.sparky.dal.rest.HttpMethod;
50 import org.openecomp.sparky.dal.rest.OperationResult;
51 import org.openecomp.sparky.logging.AaiUiMsgs;
52 import org.openecomp.sparky.synchronizer.enumeration.OperationState;
53 import org.openecomp.sparky.synchronizer.enumeration.SynchronizerState;
54 import org.openecomp.sparky.util.NodeUtils;
57 import org.openecomp.cl.mdc.MdcContext;
59 import org.openecomp.cl.mdc.MdcContext;
60 import com.fasterxml.jackson.databind.JsonNode;
61 import com.fasterxml.jackson.databind.node.ArrayNode;
64 * The Class HistoricalEntitySummarizer.
66 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
67 implements IndexSynchronizer {
69 private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
70 private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
72 private boolean allWorkEnumerated;
73 private ConcurrentHashMap<String, AtomicInteger> entityCounters;
74 private boolean syncInProgress;
75 private Map<String, String> contextMap;
78 * Instantiates a new historical entity summarizer.
80 * @param indexName the index name
81 * @throws Exception the exception
83 public HistoricalEntitySummarizer(String indexName) throws Exception {
84 super(LOG, "HES", 2, 5, 5, indexName);
86 this.allWorkEnumerated = false;
87 this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
88 this.synchronizerName = "Historical Entity Summarizer";
89 this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
90 this.syncInProgress = false;
91 this.contextMap = MDC.getCopyOfContextMap();
95 * Collect all the work.
97 * @return the operation state
99 private OperationState collectAllTheWork() {
101 Map<String, OxmEntityDescriptor> descriptorMap =
102 oxmModelLoader.getSearchableEntityDescriptors();
104 if (descriptorMap.isEmpty()) {
105 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
107 return OperationState.ERROR;
110 Collection<String> entityTypes = descriptorMap.keySet();
112 AtomicInteger asyncWoH = new AtomicInteger(0);
114 asyncWoH.set(entityTypes.size());
117 for (String entityType : entityTypes) {
119 supplyAsync(new Supplier<Void>() {
123 MDC.setContextMap(contextMap);
125 OperationResult typeLinksResult =
126 aaiDataProvider.getSelfLinksByEntityType(entityType);
127 updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
128 processEntityTypeSelfLinks(entityType, typeLinksResult);
129 } catch (Exception exc) {
130 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
137 }, aaiExecutor).whenComplete((result, error) -> {
139 asyncWoH.decrementAndGet();
142 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
150 while (asyncWoH.get() > 0) {
152 if (LOG.isDebugEnabled()) {
153 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
159 esWorkOnHand.set(entityCounters.size());
161 // start doing the real work
162 allWorkEnumerated = true;
164 insertEntityTypeCounters();
166 if (LOG.isDebugEnabled()) {
168 StringBuilder sb = new StringBuilder(128);
170 sb.append("\n\nHistorical Entity Counters:");
172 for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
173 sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
176 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
180 } catch (Exception exc) {
181 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
185 allWorkEnumerated = true;
187 return OperationState.ERROR;
190 return OperationState.OK;
195 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
198 public OperationState doSync() {
199 String txnID = NodeUtils.getRandomTxnId();
200 MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
202 if (syncInProgress) {
203 LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
204 return OperationState.PENDING;
209 syncInProgress = true;
210 this.syncStartedTimeStampInMs = System.currentTimeMillis();
211 allWorkEnumerated = false;
213 return collectAllTheWork();
217 * Process entity type self links.
219 * @param entityType the entity type
220 * @param operationResult the operation result
222 private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
224 JsonNode rootNode = null;
226 final String jsonResult = operationResult.getResult();
228 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
231 rootNode = mapper.readTree(jsonResult);
232 } catch (IOException exc) {
233 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
237 JsonNode resultData = rootNode.get("result-data");
238 ArrayNode resultDataArrayNode = null;
240 if (resultData != null && resultData.isArray()) {
241 resultDataArrayNode = (ArrayNode) resultData;
242 entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
249 * Insert entity type counters.
251 private void insertEntityTypeCounters() {
253 if (esWorkOnHand.get() <= 0) {
257 SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
258 Timestamp timestamp = new Timestamp(System.currentTimeMillis());
259 String currentFormattedTimeStamp = dateFormat.format(timestamp);
261 Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
263 for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
265 supplyAsync(new Supplier<Void>() {
269 MDC.setContextMap(contextMap);
270 String jsonString = Json.createObjectBuilder().add(
271 "count", entityCounterEntry.getValue().get())
272 .add("entityType", entityCounterEntry.getKey())
273 .add("timestamp", currentFormattedTimeStamp).build().toString();
277 link = getElasticFullUrl("", indexName);
278 OperationResult or = esDataProvider.doPost(link, jsonString, "application/json");
279 updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
280 } catch (Exception exc) {
281 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
287 }, esExecutor).whenComplete((result, error) -> {
289 esWorkOnHand.decrementAndGet();
295 while (esWorkOnHand.get() > 0) {
299 } catch (InterruptedException exc) {
300 LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
307 public SynchronizerState getState() {
310 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
313 return SynchronizerState.IDLE;
318 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
321 public String getStatReport(boolean showFinalReport) {
322 return getStatReport(System.currentTimeMillis() - this.syncStartedTimeStampInMs,
327 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
330 public void shutdown() {
331 this.shutdownExecutors();
335 protected boolean isSyncDone() {
337 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
339 if (LOG.isDebugEnabled()) {
340 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
341 + " all work enumerated = " + allWorkEnumerated);
344 if (totalWorkOnHand > 0 || !allWorkEnumerated) {
348 this.syncInProgress = false;
354 * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
357 public void clearCache() {
359 if (syncInProgress) {
360 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
365 this.resetCounters();
366 if (entityCounters != null) {
367 entityCounters.clear();
370 allWorkEnumerated = false;