Update the dependencies to use project version
[aai/sparky-be.git] / src / main / java / org / onap / aai / sparky / sync / ElasticSearchIndexCleaner.java
1 /**
2  * ============LICENSE_START=======================================================
3  * org.onap.aai
4  * ================================================================================
5  * Copyright © 2017 AT&T Intellectual Property. All rights reserved.
6  * Copyright © 2017 Amdocs
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *       http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  *
21  * ECOMP is a trademark and service mark of AT&T Intellectual Property.
22  */
23 package org.onap.aai.sparky.sync;
24
25 import java.io.IOException;
26 import java.util.ArrayList;
27 import java.util.Collection;
28 import java.util.Iterator;
29 import java.util.List;
30
31 import javax.ws.rs.core.MediaType;
32
33 import org.onap.aai.cl.api.Logger;
34 import org.onap.aai.cl.eelf.LoggerFactory;
35 import org.onap.aai.restclient.client.OperationResult;
36 import org.onap.aai.sparky.dal.ElasticSearchAdapter;
37 import org.onap.aai.sparky.logging.AaiUiMsgs;
38 import org.onap.aai.sparky.sync.config.ElasticSearchEndpointConfig;
39 import org.onap.aai.sparky.sync.config.ElasticSearchSchemaConfig;
40 import org.onap.aai.sparky.sync.entity.ObjectIdCollection;
41 import org.onap.aai.sparky.sync.entity.SearchableEntity;
42 import org.onap.aai.sparky.sync.enumeration.OperationState;
43
44 import com.fasterxml.jackson.core.JsonProcessingException;
45 import com.fasterxml.jackson.databind.JsonNode;
46 import com.fasterxml.jackson.databind.ObjectMapper;
47 import com.fasterxml.jackson.databind.node.ArrayNode;
48 import com.fasterxml.jackson.databind.node.ObjectNode;
49
50 /**
51  * The Class ElasticSearchIndexCleaner.
52  */
53 public class ElasticSearchIndexCleaner implements IndexCleaner {
54
55   private static final Logger LOG =
56       LoggerFactory.getInstance().getLogger(ElasticSearchIndexCleaner.class);
57
58   private static final String BULK_OP_LINE_TEMPLATE = "%s\n";
59   private static final String TIMESTAMP_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
60
61   private ObjectIdCollection before;
62   private ObjectIdCollection after;
63
64   private ObjectMapper mapper;
65   private ElasticSearchAdapter esAdapter;
66   private ElasticSearchEndpointConfig endpointConfig;
67   private ElasticSearchSchemaConfig schemaConfig;
68
69   /**
70    * Instantiates a new elastic search index cleaner.
71    *
72    * @param restDataProvider the rest data provider
73    * @param indexName the index name
74    * @param indexType the index type
75    * @param host the host
76    * @param port the port
77    * @param scrollContextTimeToLiveInMinutes the scroll context time to live in minutes
78    * @param numItemsToGetBulkRequest the num items to get bulk request
79    */
80   public ElasticSearchIndexCleaner(ElasticSearchAdapter esAdapter,
81       ElasticSearchEndpointConfig endpointConfig, ElasticSearchSchemaConfig schemaConfig) {
82     this.esAdapter = esAdapter;
83     this.before = null;
84     this.after = null;
85     this.endpointConfig = endpointConfig;
86     this.schemaConfig = schemaConfig;
87     this.mapper = new ObjectMapper();
88   }
89
90   /*
91    * (non-Javadoc)
92    * 
93    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePreOperationCollection()
94    */
95   @Override
96   public OperationState populatePreOperationCollection() {
97
98     try {
99       before = retrieveAllDocumentIdentifiers();
100       return OperationState.OK;
101     } catch (Exception exc) {
102       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
103       return OperationState.ERROR;
104     }
105
106   }
107
108   /*
109    * (non-Javadoc)
110    * 
111    * @see org.openecomp.sparky.synchronizer.IndexCleaner#populatePostOperationCollection()
112    */
113   @Override
114   public OperationState populatePostOperationCollection() {
115     try {
116       after = retrieveAllDocumentIdentifiers();
117       return OperationState.OK;
118     } catch (Exception exc) {
119       LOG.error(AaiUiMsgs.ES_PRE_SYNC_FAILURE, schemaConfig.getIndexName(), exc.getMessage());
120       return OperationState.ERROR;
121     }
122   }
123
124   /*
125    * (non-Javadoc)
126    * 
127    * @see org.openecomp.sparky.synchronizer.IndexCleaner#performCleanup()
128    */
129   @Override
130   public OperationState performCleanup() {
131     // TODO Auto-generated method stub
132     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP, schemaConfig.getIndexName());
133
134     int sizeBefore = before.getSize();
135     int sizeAfter = after.getSize();
136
137     LOG.info(AaiUiMsgs.ES_SYNC_CLEAN_UP_SIZE, String.valueOf(sizeBefore),
138         String.valueOf(sizeAfter));
139
140     /*
141      * If the processedImportIds size <= 0, then something has failed in the sync operation and we
142      * shouldn't do the selective delete right now.
143      */
144
145     if (sizeAfter > 0) {
146
147       Collection<String> presyncIds = before.getImportedObjectIds();
148       presyncIds.removeAll(after.getImportedObjectIds());
149
150       try {
151         LOG.info(AaiUiMsgs.ES_SYNC_SELECTIVE_DELETE, schemaConfig.getIndexName(),
152             schemaConfig.getIndexDocType(), String.valueOf(presyncIds.size()));
153
154         ObjectIdCollection bulkIds = new ObjectIdCollection();
155
156         Iterator<String> it = presyncIds.iterator();
157         int numItemsInBulkRequest = 0;
158         int numItemsRemainingToBeDeleted = presyncIds.size();
159
160         while (it.hasNext()) {
161
162           bulkIds.addObjectId(it.next());
163           numItemsInBulkRequest++;
164
165           if (numItemsInBulkRequest >= endpointConfig.getScrollContextBatchRequestSize()) {
166             LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
167                 String.valueOf(bulkIds.getSize()));
168             bulkDelete(bulkIds.getImportedObjectIds());
169             numItemsRemainingToBeDeleted -= numItemsInBulkRequest;
170             numItemsInBulkRequest = 0;
171             bulkIds.clear();
172           }
173         }
174
175         if (numItemsRemainingToBeDeleted > 0) {
176           LOG.info(AaiUiMsgs.ES_BULK_DELETE, schemaConfig.getIndexName(),
177               String.valueOf(bulkIds.getSize()));
178           bulkDelete(bulkIds.getImportedObjectIds());
179         }
180
181
182       } catch (Exception exc) {
183         LOG.error(AaiUiMsgs.ES_BULK_DELETE_ERROR, schemaConfig.getIndexName(),
184             exc.getLocalizedMessage());
185
186       }
187     }
188
189     return OperationState.OK;
190   }
191
192   @Override
193   public String getIndexName() {
194     return schemaConfig.getIndexName();
195   }
196
197   /**
198    * Builds the initial scroll request payload.
199    *
200    * @param numItemsToGetPerRequest the num items to get per request
201    * @param fieldList the field list
202    * @return the string
203    * @throws JsonProcessingException the json processing exception
204    */
205   protected String buildInitialScrollRequestPayload(int numItemsToGetPerRequest,
206       List<String> fieldList) throws JsonProcessingException {
207
208     ObjectNode rootNode = mapper.createObjectNode();
209     rootNode.put("size", numItemsToGetPerRequest);
210
211     ArrayNode fields = mapper.createArrayNode();
212
213     for (String f : fieldList) {
214       fields.add(f);
215     }
216
217     rootNode.set("fields", fields);
218
219     ObjectNode queryNode = mapper.createObjectNode();
220     queryNode.set("match_all", mapper.createObjectNode());
221
222     rootNode.set("query", queryNode);
223
224     return mapper.writeValueAsString(rootNode);
225
226   }
227
228   /**
229    * Builds the subsequent scroll context request payload.
230    *
231    * @param scrollId the scroll id
232    * @param contextTimeToLiveInMinutes the context time to live in minutes
233    * @return the string
234    * @throws JsonProcessingException the json processing exception
235    */
236   protected String buildSubsequentScrollContextRequestPayload(String scrollId,
237       int contextTimeToLiveInMinutes) throws JsonProcessingException {
238
239     ObjectNode rootNode = mapper.createObjectNode();
240
241     rootNode.put("scroll", contextTimeToLiveInMinutes + "m");
242     rootNode.put("scroll_id", scrollId);
243
244     return mapper.writeValueAsString(rootNode);
245
246   }
247
248   /**
249    * Parses the elastic search result.
250    *
251    * @param jsonResult the json result
252    * @return the json node
253    * @throws JsonProcessingException the json processing exception
254    * @throws IOException Signals that an I/O exception has occurred.
255    */
256   protected JsonNode parseElasticSearchResult(String jsonResult)
257       throws JsonProcessingException, IOException {
258     ObjectMapper mapper = new ObjectMapper();
259     return mapper.readTree(jsonResult);
260   }
261
262   /**
263    * Lookup index doc.
264    *
265    * @param ids the ids
266    * @param docs the docs
267    * @return the array list
268    */
269   protected ArrayList<SearchableEntity> lookupIndexDoc(ArrayList<String> ids,
270       List<SearchableEntity> docs) {
271     ArrayList<SearchableEntity> objs = new ArrayList<SearchableEntity>();
272
273     if (ids != null && docs != null) {
274       for (SearchableEntity d : docs) {
275         if (ids.contains(d.getId())) {
276           objs.add(d);
277         }
278       }
279     }
280
281     return objs;
282   }
283
284   /**
285    * Builds the delete data object.
286    *
287    * @param index the index
288    * @param type the type
289    * @param id the id
290    * @return the object node
291    */
292   protected ObjectNode buildDeleteDataObject(String index, String type, String id) {
293
294     ObjectNode indexDocProperties = mapper.createObjectNode();
295
296     indexDocProperties.put("_index", index);
297     indexDocProperties.put("_type", type);
298     indexDocProperties.put("_id", id);
299
300     ObjectNode rootNode = mapper.createObjectNode();
301     rootNode.set("delete", indexDocProperties);
302
303     return rootNode;
304   }
305
306   /**
307    * This method might appear to be a little strange, and is simply an optimization to take an
308    * elipsed JsonNode key path and retrieve the node at the end of the path, if it exists.
309    *
310    * @param startNode the start node
311    * @param fieldPath the field path
312    * @return the node path
313    */
314   protected JsonNode getNodePath(JsonNode startNode, String... fieldPath) {
315
316     JsonNode jsonNode = null;
317
318     for (String field : fieldPath) {
319       if (jsonNode == null) {
320         jsonNode = startNode.get(field);
321       } else {
322         jsonNode = jsonNode.get(field);
323       }
324
325       /*
326        * This is our safety net in case any intermediate path returns a null
327        */
328
329       if (jsonNode == null) {
330         return null;
331       }
332
333     }
334
335     return jsonNode;
336   }
337
338   /**
339    * Gets the full url.
340    *
341    * @param resourceUrl the resource url
342    * @return the full url
343    */
344   private String getFullUrl(String resourceUrl) {
345     return String.format("http://%s:%s%s", endpointConfig.getEsIpAddress(),
346         endpointConfig.getEsServerPort(), resourceUrl);
347   }
348
349   /**
350    * Retrieve all document identifiers.
351    *
352    * @return the object id collection
353    * @throws IOException Signals that an I/O exception has occurred.
354    */
355   public ObjectIdCollection retrieveAllDocumentIdentifiers() throws IOException {
356
357     ObjectIdCollection currentDocumentIds = new ObjectIdCollection();
358
359     long opStartTimeInMs = System.currentTimeMillis();
360
361     List<String> fields = new ArrayList<String>();
362     fields.add("_id");
363     // fields.add("entityType");
364
365     String scrollRequestPayload =
366         buildInitialScrollRequestPayload(endpointConfig.getScrollContextBatchRequestSize(), fields);
367
368     final String fullUrlStr =
369         getFullUrl("/" + schemaConfig.getIndexName() + "/" + schemaConfig.getIndexDocType()
370             + "/_search?scroll=" + endpointConfig.getScrollContextTimeToLiveInMinutes() + "m");
371
372     OperationResult result =
373         esAdapter.doPost(fullUrlStr, scrollRequestPayload, MediaType.APPLICATION_JSON_TYPE);
374
375     if (result.wasSuccessful()) {
376
377       JsonNode rootNode = parseElasticSearchResult(result.getResult());
378
379       /*
380        * Check the result for success / failure, and enumerate all the index ids that resulted in
381        * success, and ignore the ones that failed or log them so we have a record of the failure.
382        */
383       int totalRecordsAvailable = 0;
384       String scrollId = null;
385       int numRecordsFetched = 0;
386
387       if (rootNode != null) {
388
389         scrollId = getFieldValue(rootNode, "_scroll_id");
390         final String tookStr = getFieldValue(rootNode, "took");
391         int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
392         boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
393
394         if (timedOut) {
395           LOG.error(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "all document Identifiers",
396               String.valueOf(tookInMs));
397         } else {
398           LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "all document Identifiers",
399               String.valueOf(tookInMs));
400         }
401
402         JsonNode hitsNode = rootNode.get("hits");
403         totalRecordsAvailable = Integer.parseInt(hitsNode.get("total").asText());
404
405         LOG.info(AaiUiMsgs.COLLECT_TOTAL, "all document Identifiers",
406             String.valueOf(totalRecordsAvailable));
407
408         /*
409          * Collect all object ids
410          */
411
412         ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
413
414         Iterator<JsonNode> nodeIterator = hitsArray.iterator();
415
416         String key = null;
417         String value = null;
418         JsonNode jsonNode = null;
419
420         while (nodeIterator.hasNext()) {
421
422           jsonNode = nodeIterator.next();
423
424           key = getFieldValue(jsonNode, "_id");
425
426           if (key != null) {
427             currentDocumentIds.addObjectId(key);
428           }
429
430         }
431
432         int totalRecordsRemainingToFetch = (totalRecordsAvailable - numRecordsFetched);
433
434         int numRequiredAdditionalFetches =
435             (totalRecordsRemainingToFetch / endpointConfig.getScrollContextBatchRequestSize());
436
437         /*
438          * Do an additional fetch for the remaining items (if needed)
439          */
440
441         if (totalRecordsRemainingToFetch % endpointConfig.getScrollContextBatchRequestSize() != 0) {
442           numRequiredAdditionalFetches += 1;
443         }
444
445         if (LOG.isDebugEnabled()) {
446           LOG.debug(AaiUiMsgs.SYNC_NUMBER_REQ_FETCHES,
447               String.valueOf(numRequiredAdditionalFetches));
448         }
449
450
451         for (int x = 0; x < numRequiredAdditionalFetches; x++) {
452
453           if (collectItemsFromScrollContext(scrollId, currentDocumentIds) != OperationState.OK) {
454             // abort the whole thing because now we can't reliably cleanup the orphans.
455             throw new IOException(
456                 "Failed to collect pre-sync doc collection from index.  Aborting operation");
457           }
458           if (LOG.isDebugEnabled()) {
459             LOG.debug(AaiUiMsgs.SYNC_NUMBER_TOTAL_FETCHES,
460                 String.valueOf(currentDocumentIds.getSize()),
461                 String.valueOf(totalRecordsAvailable));
462           }
463
464         }
465
466       }
467
468     } else {
469       // scroll context get failed, nothing else to do
470       LOG.error(AaiUiMsgs.ERROR_GENERIC, result.toString());
471     }
472
473     LOG.info(AaiUiMsgs.COLLECT_TOTAL_TIME, "all document Identifiers",
474         String.valueOf((System.currentTimeMillis() - opStartTimeInMs)));
475
476     return currentDocumentIds;
477
478   }
479
480   /**
481    * Collect items from scroll context.
482    *
483    * @param scrollId the scroll id
484    * @param objectIds the object ids
485    * @return the operation state
486    * @throws IOException Signals that an I/O exception has occurred.
487    */
488   private OperationState collectItemsFromScrollContext(String scrollId,
489       ObjectIdCollection objectIds) throws IOException {
490
491     String requestPayload = buildSubsequentScrollContextRequestPayload(scrollId,
492         endpointConfig.getScrollContextTimeToLiveInMinutes());
493
494     final String fullUrlStr = getFullUrl("/_search/scroll");
495
496     OperationResult opResult =
497         esAdapter.doPost(fullUrlStr, requestPayload, MediaType.APPLICATION_JSON_TYPE);
498
499     if (opResult.getResultCode() >= 300) {
500       LOG.warn(AaiUiMsgs.ES_SCROLL_CONTEXT_ERROR, opResult.getResult());
501       return OperationState.ERROR;
502     }
503
504     JsonNode rootNode = parseElasticSearchResult(opResult.getResult());
505     boolean timedOut = Boolean.parseBoolean(getFieldValue(rootNode, "timed_out"));
506     final String tookStr = getFieldValue(rootNode, "took");
507     int tookInMs = (tookStr == null) ? 0 : Integer.parseInt(tookStr);
508
509     JsonNode hitsNode = rootNode.get("hits");
510
511     /*
512      * Check the result for success / failure, and enumerate all the index ids that resulted in
513      * success, and ignore the ones that failed or log them so we have a record of the failure.
514      */
515
516     if (rootNode != null) {
517
518       if (timedOut) {
519         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_ERROR, "Scroll Context", String.valueOf(tookInMs));
520       } else {
521         LOG.info(AaiUiMsgs.COLLECT_TIME_WITH_SUCCESS, "Scroll Context", String.valueOf(tookInMs));
522       }
523
524       /*
525        * Collect all object ids
526        */
527
528       ArrayNode hitsArray = (ArrayNode) hitsNode.get("hits");
529       String key = null;
530       String value = null;
531       JsonNode jsonNode = null;
532
533       Iterator<JsonNode> nodeIterator = hitsArray.iterator();
534
535       while (nodeIterator.hasNext()) {
536
537         jsonNode = nodeIterator.next();
538
539         key = getFieldValue(jsonNode, "_id");
540
541         if (key != null) {
542           objectIds.addObjectId(key);
543
544         }
545
546       }
547     }
548
549     return OperationState.OK;
550   }
551
552   /**
553    * Gets the field value.
554    *
555    * @param node the node
556    * @param fieldName the field name
557    * @return the field value
558    */
559   protected String getFieldValue(JsonNode node, String fieldName) {
560
561     JsonNode field = node.get(fieldName);
562
563     if (field != null) {
564       return field.asText();
565     }
566
567     return null;
568
569   }
570
571   /**
572    * Bulk delete.
573    *
574    * @param docIds the doc ids
575    * @return the operation result
576    * @throws IOException Signals that an I/O exception has occurred.
577    */
578   public OperationResult bulkDelete(Collection<String> docIds) throws IOException {
579
580     if (docIds == null || docIds.size() == 0) {
581       LOG.info(AaiUiMsgs.ES_BULK_DELETE_SKIP);
582       return new OperationResult(500,
583           "Skipping bulkDelete(); operation because docs to delete list is empty");
584     }
585
586     LOG.info(AaiUiMsgs.ES_BULK_DELETE_START, String.valueOf(docIds.size()));
587
588     StringBuilder sb = new StringBuilder(128);
589
590     for (String id : docIds) {
591       sb.append(String.format(BULK_OP_LINE_TEMPLATE,
592           buildDeleteDataObject(schemaConfig.getIndexName(), schemaConfig.getIndexDocType(), id)));
593     }
594
595     sb.append("\n");
596
597     final String fullUrlStr = getFullUrl("/_bulk");
598
599     return esAdapter.doPost(fullUrlStr, sb.toString(), MediaType.APPLICATION_FORM_URLENCODED_TYPE);
600
601   }
602
603   /*
604   
605   */
606
607 }