4090e7eb18481744877f004f5ed1d2555d5314d9
[dcaegen2/services.git] /
1 /*
2 * ============LICENSE_START=======================================================
3 * ONAP : DATALAKE
4 * ================================================================================
5 * Copyright 2019 China Mobile
6 *=================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 *
11 *     http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
19 */
20
21 package org.onap.datalake.feeder.service;
22
23 import java.io.IOException;
24 import java.util.List;
25
26 import javax.annotation.PostConstruct;
27 import javax.annotation.PreDestroy;
28
29 import org.apache.commons.lang3.StringUtils;
30 import org.apache.http.HttpHost;
31 import org.elasticsearch.ElasticsearchException;
32 import org.elasticsearch.action.ActionListener;
33 import org.elasticsearch.action.get.GetRequest;
34 import org.elasticsearch.action.get.GetResponse;
35 import org.elasticsearch.action.index.IndexResponse;
36 import org.elasticsearch.client.indices.CreateIndexRequest;
37 import org.elasticsearch.client.indices.CreateIndexResponse;
38 import org.elasticsearch.client.indices.GetIndexRequest; 
39 import org.elasticsearch.action.bulk.BulkRequest;
40 import org.elasticsearch.action.bulk.BulkResponse;
41 import org.elasticsearch.action.index.IndexRequest;
42 import org.elasticsearch.client.RequestOptions;
43 import org.elasticsearch.client.RestClient;
44 import org.elasticsearch.client.RestHighLevelClient;
45 import org.elasticsearch.common.xcontent.XContentType;
46 import org.elasticsearch.rest.RestStatus;
47 import org.json.JSONObject;
48 import org.onap.datalake.feeder.config.ApplicationConfiguration;
49 import org.onap.datalake.feeder.domain.Db;
50 import org.onap.datalake.feeder.dto.TopicConfig;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53
54 import org.springframework.beans.factory.annotation.Autowired;
55 import org.springframework.stereotype.Service;
56
57 /**
58  * Service to use Elasticsearch
59  * 
60  * @author Guobiao Mo
61  *
62  */
63 @Service
64 public class ElasticsearchService {
65
66         private final Logger log = LoggerFactory.getLogger(this.getClass());
67
68         @Autowired
69         private ApplicationConfiguration config;
70         
71         @Autowired
72         private DbService dbService;
73
74         private RestHighLevelClient client;
75         ActionListener<BulkResponse> listener;
76
77
78 //ES Encrypted communication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_encrypted_communication.html#_encrypted_communication
79 //Basic authentication https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_basic_authentication.html
80         @PostConstruct
81         private void init() {
82                 Db elasticsearch = dbService.getElasticsearch();
83                 String elasticsearchHost = elasticsearch.getHost();
84                 
85                 // Initialize the Connection
86                 client = new RestHighLevelClient(RestClient.builder(new HttpHost(elasticsearchHost, 9200, "http"), new HttpHost(elasticsearchHost, 9201, "http")));
87
88                 log.info("Connect to Elasticsearch Host {}", elasticsearchHost);
89
90                 listener = new ActionListener<BulkResponse>() {
91                         @Override
92                         public void onResponse(BulkResponse bulkResponse) {
93
94                         }
95
96                         @Override
97                         public void onFailure(Exception e) {
98                                 log.error(e.getMessage());
99                         }
100                 };
101         }
102
103         @PreDestroy
104         public void cleanUp() throws IOException {
105                 client.close();
106         }
107         
108         public void ensureTableExist(String topic) throws IOException {
109                 String topicLower = topic.toLowerCase();
110                 
111                 GetIndexRequest request = new GetIndexRequest(topicLower); 
112                 
113                 boolean exists = client.indices().exists(request, RequestOptions.DEFAULT);
114                 if(!exists){
115                         //TODO submit mapping template
116                         CreateIndexRequest createIndexRequest = new CreateIndexRequest(topicLower); 
117                         CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest, RequestOptions.DEFAULT);          
118                         log.info("{} : created {}", createIndexResponse.index(), createIndexResponse.isAcknowledged());
119                 }
120         }
121         
122         //TTL is not supported in Elasticsearch 5.0 and later, what can we do? FIXME
123         public void saveJsons(TopicConfig topic, List<JSONObject> jsons) {
124                 BulkRequest request = new BulkRequest();
125
126                 for (JSONObject json : jsons) {
127                         if(topic.isCorrelateClearedMessage()) {
128                                 boolean found = correlateClearedMessage(topic, json);
129                                 if(found) {
130                                         continue;
131                                 }
132                         }
133                         
134                         String id = topic.getMessageId(json); //id can be null
135
136                         request.add(new IndexRequest(topic.getName().toLowerCase(), config.getElasticsearchType(), id).source(json.toString(), XContentType.JSON));
137                 }
138
139                 log.debug("saving text to topic = {}, batch count = {} ", topic, jsons.size());
140                 
141                 if(config.isAsync()) {
142                         client.bulkAsync(request, RequestOptions.DEFAULT, listener);                    
143                 }else {
144                         try {
145                                 client.bulk(request, RequestOptions.DEFAULT);
146                         } catch (IOException e) { 
147                                 log.error( topic.getName() , e);
148                         }
149                 }
150         }
151
152         /**
153          *
154          * @param topic
155          * @param json
156          * @return boolean
157          *
158          * Because of query by id, The search API cannot be used for query.
159          * The search API can only query all data or based on the fields in the source.
160          * So use the get API, three parameters: index, type, document id
161          */
162         private boolean correlateClearedMessage(TopicConfig topic, JSONObject json) {
163                 boolean found = false;
164                 String eName = null;
165
166                 try {
167                         eName = json.query("/event/commonEventHeader/eventName").toString();
168
169                         if (StringUtils.isNotBlank(eName)) {
170
171                                 if (eName.endsWith("Cleared")) {
172
173                                         String name = eName.substring(0, eName.length() - 7);
174                                         String reportingEntityName = json.query("/event/commonEventHeader/reportingEntityName").toString();
175                                         String specificProblem = json.query("/event/faultFields/specificProblem").toString();
176
177                                         String id = null;
178                                         StringBuilder stringBuilder = new StringBuilder();
179                                         stringBuilder = stringBuilder.append(name).append('^').append(reportingEntityName).append('^').append(specificProblem);
180
181                                         id = stringBuilder.toString();//example: id = "aaaa^cccc^bbbbb"
182                                         String index = topic.getName().toLowerCase();
183
184                                         //get
185                                         GetRequest getRequest = new GetRequest(index, config.getElasticsearchType(), id);
186
187                                         GetResponse getResponse = null;
188                                         try {
189                                                 getResponse = client.get(getRequest, RequestOptions.DEFAULT);
190                                                 if (getResponse != null) {
191
192                                                         if (getResponse.isExists()) {
193                                                                 String sourceAsString = getResponse.getSourceAsString();
194                                                                 JSONObject jsonObject = new JSONObject(sourceAsString);
195                                                                 jsonObject.getJSONObject("event").getJSONObject("faultFields").put("vfStatus", "closed");
196                                                                 String jsonString = jsonObject.toString();
197
198                                                                 //update
199                                                                 IndexRequest request = new IndexRequest(index, config.getElasticsearchType(), id);
200                                                                 request.source(jsonString, XContentType.JSON);
201                                                                 IndexResponse indexResponse = null;
202                                                                 try {
203                                                                         indexResponse = client.index(request, RequestOptions.DEFAULT);
204                                                                         found = true;
205                                                                 } catch (IOException e) {
206                                                                         log.error("save failure");
207                                                                 }
208                                                         } else {
209                                                                 log.error("The getResponse was not exists" );
210                                                         }
211
212                                                 } else {
213                                                         log.error("The document for this id was not found" );
214                                                 }
215
216                                         } catch (ElasticsearchException e) {
217                                                 if (e.status() == RestStatus.NOT_FOUND) {
218                                                         log.error("The document for this id was not found" );
219                                                 }
220                                                 if (e.status() == RestStatus.CONFLICT) {
221                                                         log.error("Version conflict" );
222                                                 }
223                                                 log.error("Get document exception", e);
224                                         }catch (IOException e) {
225                                                 log.error(topic.getName() , e);
226                                         }
227
228                                 } else {
229                                         log.info("The data is normal");
230                                 }
231
232                         } else {
233                                 log.debug("event id null");
234                         }
235
236                 } catch (Exception e) {
237                         log.error("error",e);
238                 }
239
240                 return found;
241         }
242
243 }