1 /*******************************************************************************
2 * Copyright 2016-2017 ZTE, Inc. and others.
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
5 * in compliance with the License. You may obtain a copy of the License at
7 * http://www.apache.org/licenses/LICENSE-2.0
9 * Unless required by applicable law or agreed to in writing, software distributed under the License
10 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
11 * or implied. See the License for the specific language governing permissions and limitations under
13 ******************************************************************************/
14 package org.onap.msb.apiroute.wrapper.queue;
16 import java.io.IOException;
17 import java.util.ArrayList;
18 import java.util.HashSet;
19 import java.util.List;
22 import org.apache.http.HttpEntity;
23 import org.onap.msb.apiroute.SyncDataManager;
24 import org.onap.msb.apiroute.wrapper.MicroServiceWrapper;
25 import org.onap.msb.apiroute.wrapper.util.CommonUtil;
26 import org.onap.msb.apiroute.wrapper.util.ServiceFilter;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
30 import com.fasterxml.jackson.core.JsonFactory;
31 import com.fasterxml.jackson.core.JsonParser;
32 import com.fasterxml.jackson.core.JsonToken;
35 public class ServiceListConsumer implements Runnable {
37 private static final Logger LOGGER = LoggerFactory.getLogger(ServiceListConsumer.class);
39 private boolean isRunning = true;
44 public ServiceListConsumer() {
49 LOGGER.info("run ServiceList Consumer Thread [" + index + "]");
54 ServiceData<HttpEntity> serviceData = QueueManager.getInstance().takeFromServiceListQueue(index);
55 LOGGER.debug("ServiceList Consumer Thread [" + index
56 + "] take out serviceData from Queue successfully");
58 HttpEntity newValues = serviceData.getData();
60 Set<String> newServiceNameList = filterServiceList(newValues);
62 if (ServiceListCache.getLatestServiceNamelist().size() == 0) {
63 boolean initSuccess = initServiceList(newServiceNameList);
65 ServiceListCache.setLatestServiceNamelist(newServiceNameList);
68 updateServiceList(newServiceNameList);
69 ServiceListCache.setLatestServiceNamelist(newServiceNameList);
73 } catch (Exception e) {
74 LOGGER.error("ServiceListConsumer throw Exception: ", e);
79 private void startWatchService(String serviceName) {
80 // start to Watch service nodes
82 SyncDataManager.startWatchService(serviceName);
85 private void updateServiceList(Set<String> newServiceNameList) {
86 Set<String> registerServiceNameList =
87 CommonUtil.getDiffrent(ServiceListCache.getLatestServiceNamelist(), newServiceNameList);
89 if (registerServiceNameList.size() > 0) {
90 LOGGER.info("***need to start Watch Service num from consul :" + registerServiceNameList.size());
92 for (String serviceName : registerServiceNameList) {
93 startWatchService(serviceName);
98 private boolean initServiceList(Set<String> newServiceNameList) {
99 LOGGER.info("***start to initialize service List when System startup ***");
101 Set<String> dbServiceNameList = MicroServiceWrapper.getInstance().getAllMicroServiceKey();
103 if (dbServiceNameList == null) {
104 LOGGER.error("init ServiceList from redis fail ");
110 Set<String> delServiceNameList = CommonUtil.getDiffrent(newServiceNameList, dbServiceNameList);
112 LOGGER.info("***need to delete Service num from redis :" + delServiceNameList.size());
113 for (String serviceName : delServiceNameList) {
115 MicroServiceWrapper.getInstance().deleteMicroService4AllVersion(serviceName);
116 LOGGER.info("delete MicroService success from initialize:[serviceName]" + serviceName);
118 } catch (Exception e) {
119 LOGGER.error("initialize serviceList :Delete MicroServiceInfo serviceName:" + serviceName + " FAIL : ",
125 LOGGER.info("***need to start Watch Service num from initialize :" + newServiceNameList.size());
127 for (String serviceName : newServiceNameList) {
128 startWatchService(serviceName);
136 * private ImmutableSet<String> filterServiceList( final Map<String, List<String>> serviceList)
137 * { if (serviceList == null || serviceList.isEmpty()) { return ImmutableSet.of(); }
139 * final ImmutableSet.Builder<String> builder = ImmutableSet.builder();
141 * for (Map.Entry<String, List<String>> entry : serviceList.entrySet()) {
143 * String key = entry.getKey(); if (key != null && !"consul".equals(key)) {
145 * List<String> value = entry.getValue(); if
146 * (ServiceFilter.getInstance().isFilterService(value)) { builder.add(key); } } }
148 * LOGGER.info("consul all service num:" + serviceList.size());
149 * LOGGER.info("consul filter service num:" + builder.build().size());
151 * return builder.build(); }
153 private Set<String> filterServiceList(final HttpEntity serviceList) {
155 if (serviceList == null || serviceList.getContentLength() == 0) {
156 return new HashSet<String>();
159 final Set<String> builder = new HashSet<String>();
161 JsonFactory f = new JsonFactory();
162 JsonParser jp = null;
163 List<String> tagList = null;
164 int inputServiceNum = 0;
166 jp = f.createParser(serviceList.getContent());
168 while (jp.nextToken() != JsonToken.END_OBJECT) {
169 String serviceName = jp.getCurrentName();
172 tagList = new ArrayList<>();
173 while (jp.nextToken() != JsonToken.END_ARRAY) {
174 tagList.add(jp.getText());
177 if (serviceName != null && !"consul".equals(serviceName)) {
178 if (ServiceFilter.getInstance().isFilterService(tagList)) {
179 builder.add(serviceName);
183 } catch (IOException e) {
184 LOGGER.warn("parse service list error", e);
185 return new HashSet<String>();
189 } catch (IOException e) {
190 LOGGER.warn("parse service list error", e);
191 return new HashSet<String>();
195 int latestServiceNum = ServiceListCache.getLatestServiceNamelist().size();
196 // if(latestServiceNum!=builder.size()){
197 LOGGER.info("[consul] all service num:" + inputServiceNum + ", filter service num: new——" + builder.size()
198 + " old——" + latestServiceNum);