2 * ============LICENSE_START===================================================
3 * SPARKY (AAI UI service)
4 * ============================================================================
5 * Copyright © 2017 AT&T Intellectual Property.
6 * Copyright © 2017 Amdocs
8 * ============================================================================
9 * Licensed under the Apache License, Version 2.0 (the "License");
10 * you may not use this file except in compliance with the License.
11 * You may obtain a copy of the License at
13 * http://www.apache.org/licenses/LICENSE-2.0
15 * Unless required by applicable law or agreed to in writing, software
16 * distributed under the License is distributed on an "AS IS" BASIS,
17 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 * See the License for the specific language governing permissions and
19 * limitations under the License.
20 * ============LICENSE_END=====================================================
22 * ECOMP and OpenECOMP are trademarks
23 * and service marks of AT&T Intellectual Property.
25 package org.onap.aai.sparky.aggregation.sync;
27 import static java.util.concurrent.CompletableFuture.supplyAsync;
29 import java.io.IOException;
30 import java.sql.Timestamp;
31 import java.text.SimpleDateFormat;
32 import java.util.Collection;
33 import java.util.EnumSet;
35 import java.util.Map.Entry;
37 import java.util.concurrent.ConcurrentHashMap;
38 import java.util.concurrent.atomic.AtomicInteger;
39 import java.util.function.Supplier;
41 import javax.json.Json;
42 import javax.ws.rs.core.MediaType;
44 import org.onap.aai.cl.api.Logger;
45 import org.onap.aai.cl.eelf.LoggerFactory;
46 import org.onap.aai.cl.mdc.MdcContext;
47 import org.onap.aai.restclient.client.OperationResult;
48 import org.onap.aai.sparky.config.oxm.SearchableEntityLookup;
49 import org.onap.aai.sparky.config.oxm.SearchableOxmEntityDescriptor;
50 import org.onap.aai.sparky.dal.rest.HttpMethod;
51 import org.onap.aai.sparky.logging.AaiUiMsgs;
52 import org.onap.aai.sparky.sync.AbstractEntitySynchronizer;
53 import org.onap.aai.sparky.sync.IndexSynchronizer;
54 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
55 import org.onap.aai.sparky.sync.config.NetworkStatisticsConfig;
56 import org.onap.aai.sparky.sync.enumeration.OperationState;
57 import org.onap.aai.sparky.sync.enumeration.SynchronizerState;
58 import org.onap.aai.sparky.util.NodeUtils;
61 import com.fasterxml.jackson.databind.JsonNode;
62 import com.fasterxml.jackson.databind.node.ArrayNode;
65 * The Class HistoricalEntitySummarizer.
67 public class HistoricalEntitySummarizer extends AbstractEntitySynchronizer
68 implements IndexSynchronizer {
70 private static final Logger LOG = LoggerFactory.getInstance().getLogger(HistoricalEntitySummarizer.class);
71 private static final String INSERTION_DATE_TIME_FORMAT = "yyyyMMdd'T'HHmmssZ";
73 private boolean allWorkEnumerated;
74 private ConcurrentHashMap<String, AtomicInteger> entityCounters;
75 private boolean syncInProgress;
76 private Map<String, String> contextMap;
77 private ElasticSearchSchemaConfig schemaConfig;
78 private SearchableEntityLookup searchableEntityLookup;
81 * Instantiates a new historical entity summarizer.
83 * @param indexName the index name
84 * @throws Exception the exception
86 public HistoricalEntitySummarizer(ElasticSearchSchemaConfig schemaConfig, int internalSyncWorkers,
87 int aaiWorkers, int esWorkers, NetworkStatisticsConfig aaiStatConfig,
88 NetworkStatisticsConfig esStatConfig, SearchableEntityLookup searchableEntityLookup)
90 super(LOG, "HES", internalSyncWorkers, aaiWorkers, esWorkers, schemaConfig.getIndexName(), aaiStatConfig, esStatConfig);
92 this.schemaConfig = schemaConfig;
93 this.allWorkEnumerated = false;
94 this.entityCounters = new ConcurrentHashMap<String, AtomicInteger>();
95 this.synchronizerName = "Historical Entity Summarizer";
96 this.enabledStatFlags = EnumSet.of(StatFlag.AAI_REST_STATS, StatFlag.ES_REST_STATS);
97 this.syncInProgress = false;
98 this.contextMap = MDC.getCopyOfContextMap();
99 this.syncDurationInMs = -1;
100 this.searchableEntityLookup = searchableEntityLookup;
104 * Collect all the work.
106 * @return the operation state
108 private OperationState collectAllTheWork() {
110 Map<String, SearchableOxmEntityDescriptor> descriptorMap =
111 searchableEntityLookup.getSearchableEntityDescriptors();
113 if (descriptorMap.isEmpty()) {
114 LOG.error(AaiUiMsgs.OXM_FAILED_RETRIEVAL, "historical entities");
116 return OperationState.ERROR;
119 Collection<String> entityTypes = descriptorMap.keySet();
121 AtomicInteger asyncWoH = new AtomicInteger(0);
123 asyncWoH.set(entityTypes.size());
126 for (String entityType : entityTypes) {
128 supplyAsync(new Supplier<Void>() {
132 MDC.setContextMap(contextMap);
134 OperationResult typeLinksResult =
135 aaiAdapter.getSelfLinksByEntityType(entityType);
136 updateActiveInventoryCounters(HttpMethod.GET, entityType, typeLinksResult);
137 processEntityTypeSelfLinks(entityType, typeLinksResult);
138 } catch (Exception exc) {
139 LOG.error(AaiUiMsgs.ERROR_GETTING_DATA_FROM_AAI, exc.getMessage());
146 }, aaiExecutor).whenComplete((result, error) -> {
148 asyncWoH.decrementAndGet();
151 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, error.getMessage());
159 while (asyncWoH.get() > 0) {
161 if (LOG.isDebugEnabled()) {
162 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, indexName + " summarizer waiting for all the links to be processed.");
168 esWorkOnHand.set(entityCounters.size());
170 // start doing the real work
171 allWorkEnumerated = true;
173 insertEntityTypeCounters();
175 if (LOG.isDebugEnabled()) {
177 StringBuilder sb = new StringBuilder(128);
179 sb.append("\n\nHistorical Entity Counters:");
181 for (Entry<String, AtomicInteger> entry : entityCounters.entrySet()) {
182 sb.append("\n").append(entry.getKey()).append(" = ").append(entry.getValue().get());
185 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, sb.toString());
189 } catch (Exception exc) {
190 LOG.error(AaiUiMsgs.HISTORICAL_COLLECT_ERROR, exc.getMessage());
194 allWorkEnumerated = true;
196 return OperationState.ERROR;
199 return OperationState.OK;
204 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#doSync()
207 public OperationState doSync() {
208 this.syncDurationInMs = -1;
209 String txnID = NodeUtils.getRandomTxnId();
210 MdcContext.initialize(txnID, "HistoricalEntitySynchronizer", "", "Sync", "");
212 if (syncInProgress) {
213 LOG.info(AaiUiMsgs.HISTORICAL_SYNC_PENDING);
214 return OperationState.PENDING;
219 syncInProgress = true;
220 this.syncStartedTimeStampInMs = System.currentTimeMillis();
221 allWorkEnumerated = false;
223 return collectAllTheWork();
227 * Process entity type self links.
229 * @param entityType the entity type
230 * @param operationResult the operation result
232 private void processEntityTypeSelfLinks(String entityType, OperationResult operationResult) {
234 JsonNode rootNode = null;
236 final String jsonResult = operationResult.getResult();
238 if (jsonResult != null && jsonResult.length() > 0 && operationResult.wasSuccessful()) {
241 rootNode = mapper.readTree(jsonResult);
242 } catch (IOException exc) {
243 LOG.error(AaiUiMsgs.JSON_PROCESSING_ERROR, exc.getMessage());
247 JsonNode resultData = rootNode.get("result-data");
248 ArrayNode resultDataArrayNode = null;
250 if (resultData != null && resultData.isArray()) {
251 resultDataArrayNode = (ArrayNode) resultData;
252 entityCounters.put(entityType, new AtomicInteger(resultDataArrayNode.size()));
259 * Insert entity type counters.
261 private void insertEntityTypeCounters() {
263 if (esWorkOnHand.get() <= 0) {
267 SimpleDateFormat dateFormat = new SimpleDateFormat(INSERTION_DATE_TIME_FORMAT);
268 Timestamp timestamp = new Timestamp(System.currentTimeMillis());
269 String currentFormattedTimeStamp = dateFormat.format(timestamp);
271 Set<Entry<String, AtomicInteger>> entityCounterEntries = entityCounters.entrySet();
273 for (Entry<String, AtomicInteger> entityCounterEntry : entityCounterEntries) {
275 supplyAsync(new Supplier<Void>() {
279 MDC.setContextMap(contextMap);
280 String jsonString = Json.createObjectBuilder().add(
281 "count", entityCounterEntry.getValue().get())
282 .add("entityType", entityCounterEntry.getKey())
283 .add("timestamp", currentFormattedTimeStamp).build().toString();
287 link = elasticSearchAdapter.buildElasticSearchPostUrl(indexName);
288 OperationResult or = elasticSearchAdapter.doPost(link, jsonString, MediaType.APPLICATION_JSON_TYPE);
289 updateElasticSearchCounters(HttpMethod.POST, entityCounterEntry.getKey(), or);
290 } catch (Exception exc) {
291 LOG.error(AaiUiMsgs.ES_STORE_FAILURE, exc.getMessage() );
297 }, esExecutor).whenComplete((result, error) -> {
299 esWorkOnHand.decrementAndGet();
305 while (esWorkOnHand.get() > 0) {
309 } catch (InterruptedException exc) {
310 LOG.error(AaiUiMsgs.INTERRUPTED, "historical Entities", exc.getMessage());
317 public SynchronizerState getState() {
320 return SynchronizerState.PERFORMING_SYNCHRONIZATION;
323 return SynchronizerState.IDLE;
328 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#getStatReport(boolean)
331 public String getStatReport(boolean showFinalReport) {
332 syncDurationInMs = System.currentTimeMillis() - syncStartedTimeStampInMs;
333 return this.getStatReport(syncDurationInMs, showFinalReport);
337 * @see org.openecomp.sparky.synchronizer.IndexSynchronizer#shutdown()
340 public void shutdown() {
341 this.shutdownExecutors();
345 protected boolean isSyncDone() {
347 int totalWorkOnHand = aaiWorkOnHand.get() + esWorkOnHand.get();
349 if (LOG.isDebugEnabled()) {
350 LOG.debug(AaiUiMsgs.DEBUG_GENERIC,indexName + ", isSyncDone(), totalWorkOnHand = " + totalWorkOnHand
351 + " all work enumerated = " + allWorkEnumerated);
354 if (totalWorkOnHand > 0 || !allWorkEnumerated) {
358 this.syncInProgress = false;
364 * @see org.openecomp.sparky.synchronizer.AbstractEntitySynchronizer#clearCache()
367 public void clearCache() {
369 if (syncInProgress) {
370 LOG.debug(AaiUiMsgs.DEBUG_GENERIC, "Historical Entity Summarizer in progress, request to clear cache ignored");
375 this.resetCounters();
376 if (entityCounters != null) {
377 entityCounters.clear();
380 allWorkEnumerated = false;