Remove dead code from VESCollector
[dcaegen2/collectors/ves.git] / src / main / java / org / onap / dcae / commonFunction / DmaapPublishers.java
1 /*-
2  * ============LICENSE_START=======================================================
3  * org.onap.dcaegen2.collectors.ves
4  * ================================================================================
5  * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6  * Copyright (C) 2018 Nokia. All rights reserved.
7  * ================================================================================
8  * Licensed under the Apache License, Version 2.0 (the "License");
9  * you may not use this file except in compliance with the License.
10  * You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  * Unless required by applicable law or agreed to in writing, software
15  * distributed under the License is distributed on an "AS IS" BASIS,
16  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  * See the License for the specific language governing permissions and
18  * limitations under the License.
19  * ============LICENSE_END=========================================================
20  */
21 package org.onap.dcae.commonFunction;
22
23 import com.att.nsa.cambria.client.CambriaBatchingPublisher;
24 import com.google.common.cache.CacheBuilder;
25 import com.google.common.cache.CacheLoader;
26 import com.google.common.cache.LoadingCache;
27 import com.google.common.cache.RemovalListener;
28 import java.io.IOException;
29 import java.net.MalformedURLException;
30 import java.security.GeneralSecurityException;
31 import java.util.List;
32 import java.util.concurrent.TimeUnit;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 class DmaapPublishers {
37
38     private static final Logger log = LoggerFactory.getLogger(DmaapPublishers.class);
39     private final LoadingCache<String, CambriaBatchingPublisher> publishers;
40
41     private DmaapPublishers(
42             LoadingCache<String, CambriaBatchingPublisher> publishers) {
43         this.publishers = publishers;
44     }
45
46     static DmaapPublishers create() {
47         return create(new CambriaPublisherFactory());
48     }
49
50     static DmaapPublishers create(final CambriaPublisherFactory publisherFactory) {
51         final LoadingCache<String, CambriaBatchingPublisher> cache = CacheBuilder.<String, CambriaBatchingPublisher>newBuilder()
52                 .removalListener((RemovalListener<String, CambriaBatchingPublisher>) notification -> {
53                     if (notification.getValue() != null) {
54                         onCacheItemInvalidated(notification.getValue());
55                     }
56                 })
57                 .build(new CacheLoader<String, CambriaBatchingPublisher>() {
58                     @Override
59                     public CambriaBatchingPublisher load(String streamId)
60                             throws MalformedURLException, GeneralSecurityException {
61                         try {
62                             return publisherFactory.createCambriaPublisher(streamId);
63                         } catch (MalformedURLException | GeneralSecurityException e) {
64                             log.error("CambriaClientBuilders connection reader exception : streamID - " + streamId + " "
65                                     + e.getMessage());
66                             throw e;
67                         }
68                     }
69                 });
70         return new DmaapPublishers(cache);
71     }
72
73     CambriaBatchingPublisher getByStreamId(String streamId) {
74         return publishers.getUnchecked(streamId);
75     }
76
77     void closeByStreamId(String streamId) {
78         publishers.invalidate(streamId);
79     }
80
81     private static void onCacheItemInvalidated(CambriaBatchingPublisher pub) {
82         try {
83             final List<?> stuck = pub.close(20, TimeUnit.SECONDS);
84             if (!stuck.isEmpty()) {
85                 log.error(stuck.size() + " messages unsent");
86             }
87         } catch (InterruptedException | IOException e) {
88             log.error("Caught Exception on Close event: {}", e);
89         }
90     }
91 }