263e94ca9dc72c437974e19b309972e0edc04dad
[ccsdk/sli.git] /
1 /*-
2  * ============LICENSE_START=======================================================
3  * openECOMP : SDN-C
4  * ================================================================================
5  * Copyright (C) 2017 - 2018 AT&T Intellectual Property. All rights
6  *                      reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21
22 package org.onap.ccsdk.sli.adaptors.messagerouter.consumer.provider.impl;
23
24 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.PollingConsumer;
25 import org.onap.ccsdk.sli.adaptors.messagerouter.consumer.api.RequestHandler;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29 /*
30  * java.net based client to build message router consumers
31  */
32 public class PollingConsumerImpl implements PollingConsumer {
33
34     //RunnableConsumer is a private inner class so run cannot be called from other code
35     private class RunnableConsumer extends AbstractBaseConsumer implements Runnable, PollingConsumer {
36         private final Logger LOG = LoggerFactory.getLogger(PollingConsumerImpl.class);
37         private volatile Thread t;
38         private final Integer fetchPause;
39
40         public RunnableConsumer(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
41             super(username, password, host, authentication, connectTimeout, readTimeout, group, id, filter, limit, timeoutQueryParamValue);
42             this.fetchPause = fetchPause;
43         }
44
45         public void start() {
46             t = new Thread(this);
47             t.start();
48             LOG.info("ConsumerImpl started. Fetch period is {} ms.", fetchPause);
49         }
50
51         public void stop() {
52             t = null;
53             LOG.info("ConsumerImpl stopped.");
54         }
55
56         @Override
57         public void run() {
58             if (this.url != null) {
59                 Thread thisThread = Thread.currentThread();
60                 while (t == thisThread) {
61                     poll();
62                     try {
63                         LOG.trace("Next fetch from MessageRouter url {} after {} milliseconds.", url, fetchPause);
64                         Thread.sleep(fetchPause);
65                     } catch (InterruptedException e) {
66                         LOG.warn("Thread sleep was interrupted.", e);
67                     }
68                 }
69             } else {
70                 LOG.error("URL is null, can't listen for messages");
71             }
72         }
73
74         @Override
75         public void close() throws Exception {
76             stop();
77         }
78     }
79
80     private RunnableConsumer c;
81
82     public PollingConsumerImpl(String username, String password, String host, String authentication, Integer connectTimeout, Integer readTimeout, Integer fetchPause, String group, String id, String filter, Integer limit, Integer timeoutQueryParamValue) {
83         c = new RunnableConsumer(username, password, host, authentication, connectTimeout, readTimeout, fetchPause, group, id, filter, limit, timeoutQueryParamValue);
84     }
85
86     @Override
87     public void start() {
88         c.start();
89     }
90
91     @Override
92     public void registerHandler(String topic, RequestHandler requestHandler) {
93         c.registerHandler(topic, requestHandler);
94     }
95
96     @Override
97     public void close() throws Exception {
98         c.close();
99     }
100 }