1 /*******************************************************************************
2 * Copyright 2016-2017 ZTE, Inc. and others.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 ******************************************************************************/
16 package org.onap.msb.apiroute.wrapper.queue;
18 import java.io.IOException;
19 import java.util.ArrayList;
20 import java.util.HashSet;
21 import java.util.List;
24 import org.apache.http.HttpEntity;
25 import org.onap.msb.apiroute.SyncDataManager;
26 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
27 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
28 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
32 import com.fasterxml.jackson.core.JsonFactory;
33 import com.fasterxml.jackson.core.JsonParser;
34 import com.fasterxml.jackson.core.JsonToken;
37 public class ServiceListConsumer implements Runnable {
39 private static final Logger LOGGER = LoggerFactory
40 .getLogger(ServiceListConsumer.class);
42 private boolean isRunning = true;
47 public ServiceListConsumer() {
52 LOGGER.info("run ServiceList Consumer Thread [" + index + "]");
57 ServiceData<HttpEntity> serviceData = QueueManager
58 .getInstance().takeFromServiceListQueue(index);
59 LOGGER.debug("ServiceList Consumer Thread [" + index +
60 "] take out serviceData from Queue successfully");
62 HttpEntity newValues = serviceData.getData();
64 Set<String> newServiceNameList = filterServiceList(newValues);
66 if (ServiceListCache.getLatestServiceNamelist().size() == 0) {
67 boolean initSuccess=initServiceList(newServiceNameList);
69 ServiceListCache.setLatestServiceNamelist(newServiceNameList);
72 updateServiceList(newServiceNameList);
73 ServiceListCache.setLatestServiceNamelist(newServiceNameList);
77 } catch (Exception e) {
79 "ServiceListConsumer throw Exception: ", e);
84 private void startWatchService(String serviceName) {
85 // start to Watch service nodes
87 SyncDataManager.startWatchService(serviceName);
90 private void updateServiceList(Set<String> newServiceNameList) {
91 Set<String> registerServiceNameList = CommonUtil.getDiffrent(
92 ServiceListCache.getLatestServiceNamelist(), newServiceNameList);
94 if (registerServiceNameList.size() > 0) {
95 LOGGER.info("***need to start Watch Service num from consul :"
96 + registerServiceNameList.size());
98 for (String serviceName : registerServiceNameList) {
99 startWatchService(serviceName);
104 private boolean initServiceList(Set<String> newServiceNameList) {
105 LOGGER.info("***start to initialize service List when System startup ***");
107 Set<String> dbServiceNameList = MicroServiceWrapper
108 .getInstance().getAllMicroServiceKey();
110 if(dbServiceNameList==null){
111 LOGGER.error("init ServiceList from redis fail ");
117 Set<String> delServiceNameList = CommonUtil.getDiffrent(
118 newServiceNameList, dbServiceNameList);
120 LOGGER.info("***need to delete Service num from redis :"
121 + delServiceNameList.size());
122 for (String serviceName : delServiceNameList) {
124 MicroServiceWrapper.getInstance()
125 .deleteMicroService4AllVersion(serviceName);
126 LOGGER.info("delete MicroService success from initialize:[serviceName]"
129 } catch (Exception e) {
131 "initialize serviceList :Delete MicroServiceInfo serviceName:"
132 + serviceName + " FAIL : ", e);
137 LOGGER.info("***need to start Watch Service num from initialize :"
138 + newServiceNameList.size());
140 for (String serviceName : newServiceNameList) {
141 startWatchService(serviceName);
148 /*private ImmutableSet<String> filterServiceList(
149 final Map<String, List<String>> serviceList) {
150 if (serviceList == null || serviceList.isEmpty()) {
151 return ImmutableSet.of();
154 final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
156 for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) {
158 String key = entry.getKey();
159 if (key != null && !"consul".equals(key)) {
161 List<String> value = entry.getValue();
162 if (ServiceFilter.getInstance().isFilterService(value)) {
168 LOGGER.info("consul all service num:" + serviceList.size());
169 LOGGER.info("consul filter service num:" + builder.build().size());
171 return builder.build();
174 private Set<String> filterServiceList(final HttpEntity serviceList) {
176 if (serviceList == null || serviceList.getContentLength() == 0) {
177 return new HashSet<String>();
180 final Set<String> builder = new HashSet<String>();
182 JsonFactory f = new JsonFactory();
183 JsonParser jp = null;
184 List<String> tagList = null;
185 int inputServiceNum = 0;
187 jp = f.createParser(serviceList.getContent());
189 while (jp.nextToken() != JsonToken.END_OBJECT) {
190 String serviceName = jp.getCurrentName();
193 tagList = new ArrayList<>();
194 while (jp.nextToken() != JsonToken.END_ARRAY) {
195 tagList.add(jp.getText());
198 if (serviceName != null && !"consul".equals(serviceName)) {
199 if (ServiceFilter.getInstance().isFilterService(tagList)) {
200 builder.add(serviceName);
204 } catch (IOException e) {
205 LOGGER.warn("parse service list error",e);
206 return new HashSet<String>();
210 } catch (IOException e) {
211 LOGGER.warn("parse service list error",e);
212 return new HashSet<String>();
216 int latestServiceNum=ServiceListCache.getLatestServiceNamelist().size();
217 // if(latestServiceNum!=builder.size()){
218 LOGGER.info("[consul] all service num:" + inputServiceNum+ ", filter service num: new——" + builder.size()+" old——"+latestServiceNum);