2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
7 * ================================================================================
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
12 * http://www.apache.org/licenses/LICENSE-2.0
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 * ============LICENSE_END=========================================================
22 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
24 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PollingConsumer;
25 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
30 * java.net based client to build message router consumers
32 public class PollingConsumerImpl implements PollingConsumer {
34 //RunnableConsumer is a private inner class so run cannot be called from other code
35 private class RunnableConsumer extends AbstractBaseConsumer implements Runnable, PollingConsumer {
36 private final Logger LOG = LoggerFactory.getLogger(PollingConsumerImpl.class);
37 private volatile Thread t;
38 private final Integer fetchPause;
40 public RunnableConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
41 super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
42 this.fetchPause = fetchPause;
48 LOG.info("ConsumerImpl started. Fetch period is {} ms.", fetchPause);
53 LOG.info("ConsumerImpl stopped.");
58 if (this.url != null) {
59 Thread thisThread = Thread.currentThread();
60 while (t == thisThread) {
63 LOG.trace("Next fetch from MessageRouter url {} after {} milliseconds.", url, fetchPause);
64 Thread.sleep(fetchPause);
65 } catch (InterruptedException e) {
66 LOG.warn("Thread sleep was interrupted.", e);
70 LOG.error("URL is null, can't listen for messages");
75 public void close() throws Exception {
80 private RunnableConsumer c;
82 public PollingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
83 c = new RunnableConsumer(username, password, host, authentication, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);
92 public void registerHandler(String topic, RequestHandler requestHandler) {
93 c.registerHandler(topic, requestHandler);
97 public void close() throws Exception {