1 /*******************************************************************************
2 * Copyright 2016-2017 ZTE, Inc. and others.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 ******************************************************************************/
16 package org.onap.msb.apiroute.wrapper.consulextend.expose;
18 import java.util.concurrent.CopyOnWriteArrayList;
20 import org.onap.msb.apiroute.wrapper.consulextend.cache.ConsulCache;
21 import org.slf4j.Logger;
22 import org.slf4j.LoggerFactory;
24 import com.orbitz.consul.model.ConsulResponse;
26 public abstract class WatchTask<T> {
27 private final CopyOnWriteArrayList<Filter<T>> filters = new CopyOnWriteArrayList<Filter<T>>();
28 private final CopyOnWriteArrayList<Handler<T>> handlers = new CopyOnWriteArrayList<Handler<T>>();
29 private final static Logger LOGGER = LoggerFactory
30 .getLogger(WatchTask.class);
33 public abstract boolean startWatch();
36 public abstract boolean stopWatch();
39 public interface Filter<T> {
40 public boolean filter(final ConsulResponse<T> object);
43 public boolean addFilter(Filter<T> filter) {
44 boolean added = filters.add(filter);
48 public void removeAllFilter() {
53 public final CopyOnWriteArrayList<Filter<T>> getAllFilters(){
58 public interface Handler<T> {
59 void handle(final ConsulResponse<T> object);
62 public boolean addHandler(Handler<T> handler) {
63 boolean added = handlers.add(handler);
67 public void removeAllHandler() {
72 protected class InternalListener implements ConsulCache.Listener<T> {
74 public void notify(ConsulResponse<T> newValues) {
76 long startTime = System.currentTimeMillis();
79 for (Filter<T> f : filters) {
81 if (!f.filter(newValues)) {
87 for (Handler<T> h : handlers) {
91 long endTime = System.currentTimeMillis();
93 if(endTime-startTime > 10*1000)
95 LOGGER.info("WatchTask THEAD WORK TIMEOUT");