2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017-2018 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.policy.common.endpoints.event.comm.bus.internal;
23 import java.util.List;
26 import org.onap.policy.common.endpoints.event.comm.Topic;
27 import org.onap.policy.common.endpoints.event.comm.bus.DmaapTopicSink;
28 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.CambriaPublisherWrapper;
29 import org.onap.policy.common.endpoints.event.comm.bus.internal.impl.DmaapDmePublisherWrapper;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
34 * This implementation publishes events for the associated DMAAP topic, inline with the calling
37 public class InlineDmaapTopicSink extends InlineBusTopicSink implements DmaapTopicSink {
39 protected static Logger logger = LoggerFactory.getLogger(InlineDmaapTopicSink.class);
41 protected final String userName;
42 protected final String password;
44 protected String environment = null;
45 protected String aftEnvironment = null;
46 protected String partner = null;
47 protected String latitude = null;
48 protected String longitude = null;
50 protected Map<String, String> additionalProps = null;
54 * @param servers DMaaP servers
55 * @param topic DMaaP Topic to be monitored
56 * @param apiKey DMaaP API Key (optional)
57 * @param apiSecret DMaaP API Secret (optional)
58 * @param consumerGroup DMaaP Reader Consumer Group
59 * @param consumerInstance DMaaP Reader Instance
60 * @param fetchTimeout DMaaP fetch timeout
61 * @param fetchLimit DMaaP fetch limit
62 * @param environment DME2 Environment
63 * @param aftEnvironment DME2 AFT Environment
64 * @param partner DME2 Partner
65 * @param latitude DME2 Latitude
66 * @param longitude DME2 Longitude
67 * @param additionalProps Additional properties to pass to DME2
68 * @param useHttps does connection use HTTPS?
69 * @param allowSelfSignedCerts are self-signed certificates allow
71 * @throws IllegalArgumentException An invalid parameter passed in
73 public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
74 String password, String partitionKey, String environment, String aftEnvironment, String partner,
75 String latitude, String longitude, Map<String, String> additionalProps, boolean useHttps,
76 boolean allowSelfSignedCerts) {
78 super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
80 this.userName = userName;
81 this.password = password;
83 this.environment = environment;
84 this.aftEnvironment = aftEnvironment;
85 this.partner = partner;
87 this.latitude = latitude;
88 this.longitude = longitude;
90 this.additionalProps = additionalProps;
93 public InlineDmaapTopicSink(List<String> servers, String topic, String apiKey, String apiSecret, String userName,
94 String password, String partitionKey, boolean useHttps, boolean allowSelfSignedCerts) {
96 super(servers, topic, apiKey, apiSecret, partitionKey, useHttps, allowSelfSignedCerts);
98 this.userName = userName;
99 this.password = password;
105 if (allNullOrEmpty(this.environment, this.aftEnvironment, this.latitude, this.longitude, this.partner)) {
106 this.publisher = new CambriaPublisherWrapper(this.servers, this.topic, this.apiKey, this.apiSecret,
107 this.userName, this.password, this.useHttps, this.allowSelfSignedCerts);
109 this.publisher = new DmaapDmePublisherWrapper(this.servers, this.topic, this.userName, this.password,
110 this.environment, this.aftEnvironment, this.partner, this.latitude, this.longitude,
111 this.additionalProps, this.useHttps);
114 logger.info("{}: DMAAP SINK created", this);
121 public CommInfrastructure getTopicCommInfrastructure() {
122 return Topic.CommInfrastructure.DMAAP;
127 public String toString() {
128 return "InlineDmaapTopicSink [userName=" + userName + ", password=" + password
129 + ", getTopicCommInfrastructure()=" + getTopicCommInfrastructure() + ", toString()=" + super.toString()