2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.datalake.feeder.service;
23 import java.io.IOException;
24 import java.util.List;
26 import javax.annotation.PostConstruct;
27 import javax.annotation.PreDestroy;
29 import org.apache.commons.lang3.StringUtils;
30 import org.apache.http.HttpHost;
31 import org.elasticsearch.ElasticsearchException;
32 import org.elasticsearch.action.ActionListener;
33 import org.elasticsearch.action.get.GetRequest;
34 import org.elasticsearch.action.get.GetResponse;
35 import org.elasticsearch.action.index.IndexResponse;
36 import org.elasticsearch.client.indices.CreateIndexRequest;
37 import org.elasticsearch.client.indices.CreateIndexResponse;
38 import org.elasticsearch.client.indices.GetIndexRequest;
39 import org.elasticsearch.action.bulk.BulkRequest;
40 import org.elasticsearch.action.bulk.BulkResponse;
41 import org.elasticsearch.action.index.IndexRequest;
42 import org.elasticsearch.client.RequestOptions;
43 import org.elasticsearch.client.RestClient;
44 import org.elasticsearch.client.RestHighLevelClient;
45 import org.elasticsearch.common.xcontent.XContentType;
46 import org.elasticsearch.rest.RestStatus;
47 import org.json.JSONObject;
48 import org.onap.datalake.feeder.config.ApplicationConfiguration;
49 import org.onap.datalake.feeder.domain.Db;
50 import org.onap.datalake.feeder.domain.Topic;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
54 import org.springframework.beans.factory.annotation.Autowired;
55 import org.springframework.stereotype.Service;
58 * Service to use Elasticsearch
64 public class ElasticsearchService {
66 private final Logger log = LoggerFactory.getLogger(this.getClass());
69 private ApplicationConfiguration config;
72 private DbService dbService;
74 private RestHighLevelClient client;
75 ActionListener<BulkResponse> listener;
78 //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
79 //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
82 Db elasticsearch = dbService.getElasticsearch();
83 String elasticsearchHost = elasticsearch.getHost();
85 // Initialize the Connection
86 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
88 log.info("Connect to Elasticsearch Host {}", elasticsearchHost);
90 listener = new ActionListener<BulkResponse>() {
92 public void onResponse(BulkResponse bulkResponse) {
97 public void onFailure(Exception e) {
98 log.error(e.getMessage());
104 public void cleanUp() throws IOException {
108 public void ensureTableExist(String topic) throws IOException {
109 String topicLower = topic.toLowerCase();
111 GetIndexRequest request = new GetIndexRequest(topicLower);
113 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
115 CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower);
116 CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
117 log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
121 //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
122 public void saveJsons(Topic topic, List<JSONObject> jsons) {
123 BulkRequest request = new BulkRequest();
125 for (JSONObject json : jsons) {
126 if(topic.isCorrelateClearedMessage()) {
127 boolean found = correlateClearedMessage(topic, json);
133 String id = topic.getMessageId(json); //id can be null
135 request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
137 if(config.isAsync()) {
138 client.bulkAsync(request, RequestOptions.DEFAULT, listener);
141 client.bulk(request, RequestOptions.DEFAULT);
142 } catch (IOException e) {
143 log.error( topic.getName() , e);
154 * Because of query by id, The search API cannot be used for query.
155 * The search API can only query all data or based on the fields in the source.
156 * So use the get API, three parameters: index, type, document id
158 private boolean correlateClearedMessage(Topic topic, JSONObject json) {
159 boolean found = false;
163 eName = json.query("/event/commonEventHeader/eventName").toString();
165 if (StringUtils.isNotBlank(eName)) {
167 if (eName.endsWith("Cleared")) {
169 String name = eName.substring(0, eName.length() - 7);
170 String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
171 String specificProblem = json.query("/event/faultFields/specificProblem").toString();
174 StringBuilder stringBuilder = new StringBuilder();
175 stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
177 id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
178 String index = topic.getName().toLowerCase();
181 GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
183 GetResponse getResponse = null;
185 getResponse = client.get(getRequest, RequestOptions.DEFAULT);
186 if (getResponse != null) {
188 if (getResponse.isExists()) {
189 String sourceAsString = getResponse.getSourceAsString();
190 JSONObject jsonObject = new JSONObject(sourceAsString);
191 jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
192 String jsonString = jsonObject.toString();
195 IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
196 request.source(jsonString, XContentType.JSON);
197 IndexResponse indexResponse = null;
199 indexResponse = client.index(request, RequestOptions.DEFAULT);
201 } catch (IOException e) {
202 log.error("save failure");
205 log.error("The getResponse was not exists" );
209 log.error("The document for this id was not found" );
212 } catch (ElasticsearchException e) {
213 if (e.status() == RestStatus.NOT_FOUND) {
214 log.error("The document for this id was not found" );
216 if (e.status() == RestStatus.CONFLICT) {
217 log.error("Version conflict" );
219 log.error("Get document exception", e);
220 }catch (IOException e) {
221 log.error(topic.getName() , e);
225 log.info("The data is normal");
229 log.debug("event id null");
232 } catch (Exception e) {
233 log.error("error",e);