2 * ============LICENSE_START=======================================================
 
   4 * ================================================================================
 
   5 * Copyright 2019 China Mobile
 
   6 *=================================================================================
 
   7 * Licensed under the Apache License, Version 2.0 (the "License");
 
   8 * you may not use this file except in compliance with the License.
 
   9 * You may obtain a copy of the License at
 
  11 *     http://www.apache.org/licenses/LICENSE-2.0
 
  13 * Unless required by applicable law or agreed to in writing, software
 
  14 * distributed under the License is distributed on an "AS IS" BASIS,
 
  15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 
  16 * See the License for the specific language governing permissions and
 
  17 * limitations under the License.
 
  18 * ============LICENSE_END=========================================================
 
  21 package org.onap.datalake.feeder.service.db;
 
  23 import java.io.IOException;
 
  24 import java.util.List;
 
  26 import javax.annotation.PostConstruct;
 
  27 import javax.annotation.PreDestroy;
 
  29 import org.apache.commons.lang3.StringUtils;
 
  30 import org.apache.http.HttpHost;
 
  31 import org.elasticsearch.ElasticsearchException;
 
  32 import org.elasticsearch.action.ActionListener;
 
  33 import org.elasticsearch.action.get.GetRequest;
 
  34 import org.elasticsearch.action.get.GetResponse;
 
  35 import org.elasticsearch.action.index.IndexResponse;
 
  36 import org.elasticsearch.client.indices.CreateIndexRequest;
 
  37 import org.elasticsearch.client.indices.CreateIndexResponse;
 
  38 import org.elasticsearch.client.indices.GetIndexRequest;
 
  39 import org.elasticsearch.action.bulk.BulkRequest;
 
  40 import org.elasticsearch.action.bulk.BulkResponse;
 
  41 import org.elasticsearch.action.index.IndexRequest;
 
  42 import org.elasticsearch.client.RequestOptions;
 
  43 import org.elasticsearch.client.RestClient;
 
  44 import org.elasticsearch.client.RestHighLevelClient;
 
  45 import org.elasticsearch.common.xcontent.XContentType;
 
  46 import org.elasticsearch.rest.RestStatus;
 
  47 import org.json.JSONObject;
 
  48 import org.onap.datalake.feeder.config.ApplicationConfiguration;
 
  49 import org.onap.datalake.feeder.domain.Db;
 
  50 import org.onap.datalake.feeder.domain.EffectiveTopic;
 
  51 import org.onap.datalake.feeder.domain.Topic;
 
  52 import org.slf4j.Logger;
 
  53 import org.slf4j.LoggerFactory;
 
  55 import org.springframework.beans.factory.annotation.Autowired;
 
  56 import org.springframework.stereotype.Service;
 
  59  * Elasticsearch Service for table creation, data submission, as well as data pre-processing. 
 
  65 public class ElasticsearchService implements DbStoreService {
 
  67         private final Logger log = LoggerFactory.getLogger(this.getClass());
 
  69         private Db elasticsearch;
 
  72         private ApplicationConfiguration config;
 
  75 //      private DbService dbService;
 
  77         private RestHighLevelClient client;
 
  78         ActionListener<BulkResponse> listener;
 
  80         public ElasticsearchService( ) {
 
  83         public ElasticsearchService(Db db) {
 
  87         //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
 
  88         //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
 
  91                 //Db elasticsearch = dbService.getElasticsearch();
 
  92                 String elasticsearchHost = elasticsearch.getHost();
 
  94                 // Initialize the Connection
 
  95                 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
 
  97                 log.info("Connected to Elasticsearch Host {}", elasticsearchHost);
 
  99                 listener = new ActionListener<BulkResponse>() {
 
 101                         public void onResponse(BulkResponse bulkResponse) {
 
 102                                 if(bulkResponse.hasFailures()) {
 
 103                                         log.debug(bulkResponse.buildFailureMessage());
 
 108                         public void onFailure(Exception e) {
 
 109                                 log.error(e.getMessage());
 
 115         public void cleanUp() throws IOException {
 
 116                 config.getShutdownLock().readLock().lock();
 
 119                         log.info("cleanUp() closing Elasticsearch client.");
 
 121                 } catch (IOException e) {
 
 122                         log.error("client.close() at cleanUp.", e);
 
 124                         config.getShutdownLock().readLock().unlock();
 
 128         public void ensureTableExist(String topic) throws IOException {
 
 129                 String topicLower = topic.toLowerCase();
 
 131                 GetIndexRequest request = new GetIndexRequest(topicLower);
 
 133                 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
 
 135                         //TODO submit mapping template
 
 136                         CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
 
 137                         CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
 
 138                         log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
 
 142         //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
 
 144         public void saveJsons(EffectiveTopic effectiveTopic, List<JSONObject> jsons) {
 
 146                 BulkRequest request = new BulkRequest();
 
 148                 for (JSONObject json : jsons) {
 
 149                         if (effectiveTopic.getTopic().isCorrelateClearedMessage()) {
 
 150                                 boolean found = correlateClearedMessage(effectiveTopic.getTopic(), json);
 
 156                         String id = effectiveTopic.getTopic().getMessageId(json); //id can be null
 
 158                         request.add(new IndexRequest(effectiveTopic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
 
 161                 log.debug("saving text to effectiveTopic = {}, batch count = {} ", effectiveTopic, jsons.size());
 
 163                 if (config.isAsync()) {
 
 164                         client.bulkAsync(request, RequestOptions.DEFAULT, listener);
 
 167                                 BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
 
 168                                 if(bulkResponse.hasFailures()) {
 
 169                                         log.debug(bulkResponse.buildFailureMessage());
 
 171                         } catch (IOException e) {
 
 172                                 log.error(effectiveTopic.getName(), e);
 
 184          *         Because of query by id, The search API cannot be used for query. The
 
 185          *         search API can only query all data or based on the fields in the
 
 186          *         source. So use the get API, three parameters: index, type, document
 
 189         private boolean correlateClearedMessage(Topic topic, JSONObject json) {
 
 190                 boolean found = false;
 
 194                         eName = json.query("/event/commonEventHeader/eventName").toString();
 
 196                         if (StringUtils.isNotBlank(eName) && eName.endsWith("Cleared")) {
 
 198                                 String name = eName.substring(0, eName.length() - 7);
 
 199                                 String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
 
 200                                 String specificProblem = json.query("/event/faultFields/specificProblem").toString();
 
 202                                 String id = String.join("^", name, reportingEntityName, specificProblem);//example: id = "aaaa^cccc^bbbbb"
 
 203                                 String index = topic.getName().toLowerCase();
 
 206                                 GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
 
 208                                 GetResponse getResponse = null;
 
 210                                         getResponse = client.get(getRequest, RequestOptions.DEFAULT);
 
 211                                         if (getResponse != null) {
 
 213                                                 if (getResponse.isExists()) {
 
 214                                                         String sourceAsString = getResponse.getSourceAsString();
 
 215                                                         JSONObject jsonObject = new JSONObject(sourceAsString);
 
 216                                                         jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
 
 217                                                         String jsonString = jsonObject.toString();
 
 220                                                         IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
 
 221                                                         request.source(jsonString, XContentType.JSON);
 
 222                                                         IndexResponse indexResponse = null;
 
 224                                                                 indexResponse = client.index(request, RequestOptions.DEFAULT);
 
 226                                                         } catch (IOException e) {
 
 227                                                                 log.error("save failure");
 
 230                                                         log.error("The getResponse was not exists");
 
 234                                                 log.error("The document for this id was not found");
 
 237                                 } catch (ElasticsearchException e) {
 
 238                                         if (e.status() == RestStatus.NOT_FOUND) {
 
 239                                                 log.error("The document for this id was not found");
 
 241                                         if (e.status() == RestStatus.CONFLICT) {
 
 242                                                 log.error("Version conflict");
 
 244                                         log.error("Get document exception", e);
 
 245                                 } catch (IOException e) {
 
 246                                         log.error(topic.getName(), e);
 
 251                 } catch (Exception e) {
 
 252                         log.error("error", e);