Convert Sparky to Spring-Boot
[aai/sparky-be.git] / sparkybe-onap-service / src / main / java / org / onap / aai / sparky / sync / ElasticSearchIndexCleaner.java
1 /**
2  * ============LICENSE_START===================================================
3  * SPARKY (AAI UI service)
4  * ============================================================================
5  * Copyright © 2017 AT&T Intellectual Property.
6  * Copyright © 2017 Amdocs
7  * All rights reserved.
8  * ============================================================================
9  * Licensed under the Apache License, Version 2.0 (the "License");
10  * you may not use this file except in compliance with the License.
11  * You may obtain a copy of the License at
12  *
13  *      http://www.apache.org/licenses/LICENSE-2.0
14  *
15  * Unless required by applicable law or agreed to in writing, software
16  * distributed under the License is distributed on an "AS IS" BASIS,
17  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18  * See the License for the specific language governing permissions and
19  * limitations under the License.
20  * ============LICENSE_END=====================================================
21  *
22  * ECOMP and OpenECOMP are trademarks
23  * and service marks of AT&T Intellectual Property.
24  */
25 package org.onap.aai.sparky.sync;
26
27 import java.io.IOException;
28 import java.util.ArrayList;
29 import java.util.Collection;
30 import java.util.Iterator;
31 import java.util.List;
32
33 import javax.ws.rs.core.MediaType;
34
35 import org.onap.aai.cl.api.Logger;
36 import org.onap.aai.cl.eelf.LoggerFactory;
37 import org.onap.aai.restclient.client.OperationResult;
38 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
39 import org.onap.aai.sparky.logging.AaiUiMsgs;
40 import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
41 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
42 import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
43 import org.onap.aai.sparky.sync.entity.SearchableEntity;
44 import org.onap.aai.sparky.sync.enumeration.OperationState;
45
46 import com.fasterxml.jackson.core.JsonProcessingException;
47 import com.fasterxml.jackson.databind.JsonNode;
48 import com.fasterxml.jackson.databind.ObjectMapper;
49 import com.fasterxml.jackson.databind.node.ArrayNode;
50 import com.fasterxml.jackson.databind.node.ObjectNode;
51
52 /**
53  * The Class ElasticSearchIndexCleaner.
54  */
55 public class ElasticSearchIndexCleaner implements IndexCleaner {
56
57   private static final Logger LOG =
58       LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
59
60   private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
61   private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
62
63   private ObjectIdCollection before;
64   private ObjectIdCollection after;
65
66   private ObjectMapper mapper;
67   private ElasticSearchAdapter esAdapter;
68   private ElasticSearchEndpointConfig endpointConfig;
69   private ElasticSearchSchemaConfig schemaConfig;
70
71   /**
72    * Instantiates a new elastic search index cleaner.
73    *
74    * @param restDataProvider the rest data provider
75    * @param indexName the index name
76    * @param indexType the index type
77    * @param host the host
78    * @param port the port
79    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
80    * @param numItemsToGetBulkRequest the num items to get bulk request
81    */
82   public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter, ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
83     this.esAdapter = esAdapter;
84     this.before = null;
85     this.after = null;
86     this.endpointConfig = endpointConfig;
87     this.schemaConfig = schemaConfig;
88     this.mapper = new ObjectMapper();
89   }
90
91   /*
92    * (non-Javadoc)
93    * 
94    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
95    */
96   @Override
97   public OperationState populatePreOperationCollection() {
98
99     try {
100       before = retrieveAllDocumentIdentifiers();
101       return OperationState.OK;
102     } catch (Exception exc) {
103       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
104       return OperationState.ERROR;
105     }
106
107   }
108
109   /*
110    * (non-Javadoc)
111    * 
112    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
113    */
114   @Override
115   public OperationState populatePostOperationCollection() {
116     try {
117       after = retrieveAllDocumentIdentifiers();
118       return OperationState.OK;
119     } catch (Exception exc) {
120       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
121       return OperationState.ERROR;
122     }
123   }
124
125   /*
126    * (non-Javadoc)
127    * 
128    * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
129    */
130   @Override
131   public OperationState performCleanup() {
132     // TODO Auto-generated method stub
133     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
134
135     int sizeBefore = before.getSize();
136     int sizeAfter = after.getSize();
137
138     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
139         String.valueOf(sizeAfter));
140
141     /*
142      * If the processedImportIds size <= 0, then something has failed in the sync operation and we
143      * shouldn't do the selective delete right now.
144      */
145
146     if (sizeAfter > 0) {
147
148       Collection<String> presyncIds = before.getImportedObjectIds();
149       presyncIds.removeAll(after.getImportedObjectIds());
150
151       try {
152         LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(), schemaConfig.getIndexDocType(),
153             String.valueOf(presyncIds.size()));
154
155         ObjectIdCollection bulkIds = new ObjectIdCollection();
156
157         Iterator<String> it = presyncIds.iterator();
158         int numItemsInBulkRequest = 0;
159         int numItemsRemainingToBeDeleted = presyncIds.size();
160
161         while (it.hasNext()) {
162
163           bulkIds.addObjectId(it.next());
164           numItemsInBulkRequest++;
165
166           if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
167             LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
168             bulkDelete(bulkIds.getImportedObjectIds());
169             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
170             numItemsInBulkRequest = 0;
171             bulkIds.clear();
172           }
173         }
174
175         if (numItemsRemainingToBeDeleted > 0) {
176           LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(), String.valueOf(bulkIds.getSize()));
177           bulkDelete(bulkIds.getImportedObjectIds());
178         }
179
180
181       } catch (Exception exc) {
182         LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(), exc.getLocalizedMessage());
183
184       }
185     }
186
187     return OperationState.OK;
188   }
189
190   @Override
191   public String getIndexName() {
192     return schemaConfig.getIndexName();
193   }
194
195   /**
196    * Builds the initial scroll request payload.
197    *
198    * @param numItemsToGetPerRequest the num items to get per request
199    * @param fieldList the field list
200    * @return the string
201    * @throws JsonProcessingException the json processing exception
202    */
203   protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
204       List<String> fieldList) throws JsonProcessingException {
205
206     ObjectNode rootNode = mapper.createObjectNode();
207     rootNode.put("size", numItemsToGetPerRequest);
208
209     ArrayNode fields = mapper.createArrayNode();
210
211     for (String f : fieldList) {
212       fields.add(f);
213     }
214
215     rootNode.set("fields", fields);
216
217     ObjectNode queryNode = mapper.createObjectNode();
218     queryNode.set("match_all", mapper.createObjectNode());
219
220     rootNode.set("query", queryNode);
221
222     return mapper.writeValueAsString(rootNode);
223
224   }
225
226   /**
227    * Builds the subsequent scroll context request payload.
228    *
229    * @param scrollId the scroll id
230    * @param contextTimeToLiveInMinutes the context time to live in minutes
231    * @return the string
232    * @throws JsonProcessingException the json processing exception
233    */
234   protected String buildSubsequentScrollContextRequestPayload(String scrollId,
235       int contextTimeToLiveInMinutes) throws JsonProcessingException {
236
237     ObjectNode rootNode = mapper.createObjectNode();
238
239     rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
240     rootNode.put("scroll_id", scrollId);
241
242     return mapper.writeValueAsString(rootNode);
243
244   }
245
246   /**
247    * Parses the elastic search result.
248    *
249    * @param jsonResult the json result
250    * @return the json node
251    * @throws JsonProcessingException the json processing exception
252    * @throws IOException Signals that an I/O exception has occurred.
253    */
254   protected JsonNode parseElasticSearchResult(String jsonResult)
255       throws JsonProcessingException, IOException {
256     ObjectMapper mapper = new ObjectMapper();
257     return mapper.readTree(jsonResult);
258   }
259
260   /**
261    * Lookup index doc.
262    *
263    * @param ids the ids
264    * @param docs the docs
265    * @return the array list
266    */
267   protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
268       List<SearchableEntity> docs) {
269     ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
270
271     if (ids != null && docs != null) {
272       for (SearchableEntity d : docs) {
273         if (ids.contains(d.getId())) {
274           objs.add(d);
275         }
276       }
277     }
278
279     return objs;
280   }
281
282   /**
283    * Builds the delete data object.
284    *
285    * @param index the index
286    * @param type the type
287    * @param id the id
288    * @return the object node
289    */
290   protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
291
292     ObjectNode indexDocProperties = mapper.createObjectNode();
293
294     indexDocProperties.put("_index", index);
295     indexDocProperties.put("_type", type);
296     indexDocProperties.put("_id", id);
297
298     ObjectNode rootNode = mapper.createObjectNode();
299     rootNode.set("delete", indexDocProperties);
300
301     return rootNode;
302   }
303
304   /**
305    * This method might appear to be a little strange, and is simply an optimization to take an
306    * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
307    *
308    * @param startNode the start node
309    * @param fieldPath the field path
310    * @return the node path
311    */
312   protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
313
314     JsonNode jsonNode = null;
315
316     for (String field : fieldPath) {
317       if (jsonNode == null) {
318         jsonNode = startNode.get(field);
319       } else {
320         jsonNode = jsonNode.get(field);
321       }
322
323       /*
324        * This is our safety net in case any intermediate path returns a null
325        */
326
327       if (jsonNode == null) {
328         return null;
329       }
330
331     }
332
333     return jsonNode;
334   }
335
336   /**
337    * Gets the full url.
338    *
339    * @param resourceUrl the resource url
340    * @return the full url
341    */
342   private String getFullUrl(String resourceUrl) {
343     return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
344         endpointConfig.getEsServerPort(), resourceUrl);
345   }
346
347   /**
348    * Retrieve all document identifiers.
349    *
350    * @return the object id collection
351    * @throws IOException Signals that an I/O exception has occurred.
352    */
353   public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
354
355     ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
356
357     long opStartTimeInMs = System.currentTimeMillis();
358
359     List<String> fields = new ArrayList<String>();
360     fields.add("_id");
361     // fields.add("entityType");
362
363     String scrollRequestPayload =
364         buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
365
366     final String fullUrlStr = getFullUrl("/" + schemaConfig.getIndexName()+ "/" + schemaConfig.getIndexDocType() + "/_search?scroll="
367         + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
368
369     OperationResult result =
370         esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
371
372     if (result.wasSuccessful()) {
373
374       JsonNode rootNode = parseElasticSearchResult(result.getResult());
375
376       /*
377        * Check the result for success / failure, and enumerate all the index ids that resulted in
378        * success, and ignore the ones that failed or log them so we have a record of the failure.
379        */
380       int totalRecordsAvailable = 0;
381       String scrollId = null;
382       int numRecordsFetched = 0;
383
384       if (rootNode != null) {
385
386         scrollId = getFieldValue(rootNode, "_scroll_id");
387         final String tookStr = getFieldValue(rootNode, "took");
388         int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
389         boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
390
391         if (timedOut) {
392           LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
393               String.valueOf(tookInMs));
394         } else {
395           LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
396               String.valueOf(tookInMs));
397         }
398
399         JsonNode hitsNode = rootNode.get("hits");
400         totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
401
402         LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
403             String.valueOf(totalRecordsAvailable));
404
405         /*
406          * Collect all object ids
407          */
408
409         ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
410
411         Iterator<JsonNode> nodeIterator = hitsArray.iterator();
412
413         String key = null;
414         String value = null;
415         JsonNode jsonNode = null;
416
417         while (nodeIterator.hasNext()) {
418
419           jsonNode = nodeIterator.next();
420
421           key = getFieldValue(jsonNode, "_id");
422
423           if (key != null) {
424             currentDocumentIds.addObjectId(key);
425           }
426
427         }
428
429         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
430
431         int numRequiredAdditionalFetches =
432             (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
433
434         /*
435          * Do an additional fetch for the remaining items (if needed)
436          */
437
438         if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
439           numRequiredAdditionalFetches += 1;
440         }
441
442         if (LOG.isDebugEnabled()) {
443           LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
444               String.valueOf(numRequiredAdditionalFetches));
445         }
446
447
448         for (int x = 0; x < numRequiredAdditionalFetches; x++) {
449
450           if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
451             // abort the whole thing because now we can't reliably cleanup the orphans.
452             throw new IOException(
453                 "Failed to collect pre-sync doc collection from index.  Aborting operation");
454           }
455           if (LOG.isDebugEnabled()) {
456             LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
457                 String.valueOf(currentDocumentIds.getSize()),
458                 String.valueOf(totalRecordsAvailable));
459           }
460
461         }
462
463       }
464
465     } else {
466       // scroll context get failed, nothing else to do
467       LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
468     }
469
470     LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
471         String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
472
473     return currentDocumentIds;
474
475   }
476
477   /**
478    * Collect items from scroll context.
479    *
480    * @param scrollId the scroll id
481    * @param objectIds the object ids
482    * @return the operation state
483    * @throws IOException Signals that an I/O exception has occurred.
484    */
485   private OperationState collectItemsFromScrollContext(String scrollId,
486       ObjectIdCollection objectIds) throws IOException {
487
488     String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
489         endpointConfig.getScrollContextTimeToLiveInMinutes());
490
491     final String fullUrlStr = getFullUrl("/_search/scroll");
492
493     OperationResult opResult =
494         esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
495
496     if (opResult.getResultCode() >= 300) {
497       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
498       return OperationState.ERROR;
499     }
500
501     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
502     boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
503     final String tookStr = getFieldValue(rootNode, "took");
504     int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
505
506     JsonNode hitsNode = rootNode.get("hits");
507
508     /*
509      * Check the result for success / failure, and enumerate all the index ids that resulted in
510      * success, and ignore the ones that failed or log them so we have a record of the failure.
511      */
512
513     if (rootNode != null) {
514
515       if (timedOut) {
516         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
517       } else {
518         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
519       }
520
521       /*
522        * Collect all object ids
523        */
524
525       ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
526       String key = null;
527       String value = null;
528       JsonNode jsonNode = null;
529
530       Iterator<JsonNode> nodeIterator = hitsArray.iterator();
531
532       while (nodeIterator.hasNext()) {
533
534         jsonNode = nodeIterator.next();
535
536         key = getFieldValue(jsonNode, "_id");
537
538         if (key != null) {
539           objectIds.addObjectId(key);
540
541         }
542
543       }
544     }
545
546     return OperationState.OK;
547   }
548
549   /**
550    * Gets the field value.
551    *
552    * @param node the node
553    * @param fieldName the field name
554    * @return the field value
555    */
556   protected String getFieldValue(JsonNode node, String fieldName) {
557
558     JsonNode field = node.get(fieldName);
559
560     if (field != null) {
561       return field.asText();
562     }
563
564     return null;
565
566   }
567
568   /**
569    * Bulk delete.
570    *
571    * @param docIds the doc ids
572    * @return the operation result
573    * @throws IOException Signals that an I/O exception has occurred.
574    */
575   public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
576
577     if (docIds == null || docIds.size() == 0) {
578       LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
579       return new OperationResult(500,
580           "Skipping bulkDelete(); operation because docs to delete list is empty");
581     }
582
583     LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
584
585     StringBuilder sb = new StringBuilder(128);
586
587     for (String id : docIds) {
588       sb.append(String.format(BULK_OP_LINE_TEMPLATE,
589           buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
590     }
591
592     sb.append("\n");
593
594     final String fullUrlStr = getFullUrl("/_bulk");
595
596     return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);
597
598   }
599
600   /*
601   
602   */
603
604 }