2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dmaap.dbcapi.service;
23 import java.util.ArrayList;
24 import java.util.List;
35 //import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
36 import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
37 import org.onap.dmaap.dbcapi.client.MrTopicConnection;
38 import org.onap.dmaap.dbcapi.database.DatabaseClass;
39 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
40 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
41 import org.onap.dmaap.dbcapi.model.ApiError;
42 import org.onap.dmaap.dbcapi.model.MR_Cluster;
43 import org.onap.dmaap.dbcapi.model.MirrorMaker;
44 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
45 import org.onap.dmaap.dbcapi.util.DmaapConfig;
46 import org.onap.dmaap.dbcapi.util.RandomInteger;
48 public class MirrorMakerService extends BaseLoggingClass {
50 private Map<String, MirrorMaker> mirrors = DatabaseClass.getMirrorMakers();
51 private static MrTopicConnection prov;
52 private static AafDecrypt decryptor;
54 static final String PROV_USER_PROPERTY = "MM.ProvUserMechId";
55 static final String PROV_PWD_PROPERTY = "MM.ProvUserPwd";
56 static final String PROV_PWD_DEFAULT = "pwdNotSet";
57 static final String SOURCE_REPLICATION_PORT_PROPERTY = "MR.SourceReplicationPort";
58 static final String SOURCE_REPLICATION_PORT_DEFAULT = "9092";
59 static final String TARGET_REPLICATION_PORT_PROPERTY = "MR.TargetReplicationPort";
60 static final String TARGET_REPLICATION_PORT_DEFAULT = "2181";
62 private static String provUser;
63 private static String provUserPwd;
64 private static String defaultProducerPort;
65 private static String defaultConsumerPort;
66 private static String centralFqdn;
67 private int maxTopicsPerMM;
68 private boolean mmPerMR;
70 public MirrorMakerService() {
72 decryptor = new AafDecrypt();
73 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
74 provUser = p.getProperty(PROV_USER_PROPERTY);
75 provUserPwd = decryptor.decrypt(p.getProperty( PROV_PWD_PROPERTY, PROV_PWD_DEFAULT ));
76 defaultProducerPort = p.getProperty( SOURCE_REPLICATION_PORT_PROPERTY, SOURCE_REPLICATION_PORT_DEFAULT );
77 defaultConsumerPort = p.getProperty( TARGET_REPLICATION_PORT_PROPERTY, TARGET_REPLICATION_PORT_DEFAULT );
78 centralFqdn = p.getProperty("MR.CentralCname", "notSet");
79 maxTopicsPerMM = Integer.valueOf( p.getProperty( "MaxTopicsPerMM", "5"));
80 mmPerMR = "true".equalsIgnoreCase(p.getProperty("MirrorMakerPerMR", "true"));
83 // will create a MM on MMagent if needed
84 // will update the MMagent whitelist with all topics for this MM
85 public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
86 logger.info( "updateMirrorMaker");
88 prov = new MrTopicConnection( provUser, provUserPwd );
90 DmaapService dmaap = new DmaapService();
91 MR_ClusterService clusters = new MR_ClusterService();
92 MR_Cluster target_cluster = null;
93 String override = null;
96 // in ECOMP, MM Agent is only deployed at central, so this case is needed for backwards compatibility
97 // we use a cname for the central MR cluster that is active, and provision on agent topic on that target
98 // but only send 1 message so MM Agents can read it relying on kafka delivery
99 for( MR_Cluster cluster: clusters.getCentralClusters() ) {
101 target_cluster = cluster;
102 override = centralFqdn;
103 // we only want to send one message even if there are multiple central clusters
108 // In ONAP deployment architecture, the MM Agent is deployed with each target MR
109 target_cluster = clusters.getMr_ClusterByFQDN(mm.getTargetCluster());
113 prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
114 ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
115 if ( ! resp.is2xx() ) {
117 errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
118 mm.setStatus(DmaapObject_Status.INVALID);
120 prov.makeTopicConnection(target_cluster, dmaap.getBridgeAdminFqtn(), override );
121 resp = prov.doPostMessage(mm.getWhitelistUpdateJSON());
122 if ( ! resp.is2xx()) {
123 errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
124 mm.setStatus(DmaapObject_Status.INVALID);
126 mm.setStatus(DmaapObject_Status.VALID);
131 return mirrors.put( mm.getMmName(), mm);
133 public MirrorMaker getMirrorMaker( String part1, String part2, int index ) {
136 // original mm names did not have any index, so leave off index 0 for
137 // backwards compatibility
141 targetPart = part2 + "_" + index;
143 logger.info( "getMirrorMaker using " + part1 + " and " + targetPart );
144 return mirrors.get(MirrorMaker.genKey(part1, targetPart));
146 public MirrorMaker getMirrorMaker( String part1, String part2 ) {
147 logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
148 return mirrors.get(MirrorMaker.genKey(part1, part2));
150 public MirrorMaker getMirrorMaker( String key ) {
151 logger.info( "getMirrorMaker using " + key);
152 return mirrors.get(key);
156 public void delMirrorMaker( MirrorMaker mm ) {
157 logger.info( "delMirrorMaker");
158 mirrors.remove(mm.getMmName());
161 // TODO: this should probably return sequential values or get replaced by the MM client API
162 // but it should be sufficient for initial 1610 development
163 public static String genTransactionId() {
164 RandomInteger ri = new RandomInteger(100000);
165 int randomInt = ri.next();
166 return Integer.toString(randomInt);
168 public List<String> getAllMirrorMakers() {
169 List<String> ret = new ArrayList<String>();
170 for( String key: mirrors.keySet()) {
177 public MirrorMaker findNextMM( String source, String target, String fqtn ) {
179 MirrorMaker mm = null;
180 while( mm == null ) {
182 mm = this.getMirrorMaker( source, target, i);
184 mm = new MirrorMaker(source, target, i);
186 if ( mm.getTopics().contains(fqtn) ) {
189 if ( mm.getTopicCount() >= maxTopicsPerMM ) {
190 logger.info( "getNextMM: MM " + mm.getMmName() + " has " + mm.getTopicCount() + " topics. Moving to next MM");
200 public MirrorMaker splitMM( MirrorMaker orig ) {
202 String source = orig.getSourceCluster();
203 String target = orig.getTargetCluster();
206 ArrayList<String> whitelist = orig.getTopics();
207 while( whitelist.size() > maxTopicsPerMM ) {
209 int last = whitelist.size() - 1;
210 String topic = whitelist.get(last);
211 whitelist.remove(last);
212 MirrorMaker mm = this.findNextMM( source, target, "aValueThatShouldNotMatchAnything" );
214 this.updateMirrorMaker(mm);
217 orig.setTopics(whitelist);
223 public static String getProvUser() {
227 public static void setProvUser(String provUser) {
228 MirrorMakerService.provUser = provUser;
231 public static String getProvUserPwd() {
235 public static void setProvUserPwd(String provUserPwd) {
236 MirrorMakerService.provUserPwd = provUserPwd;
239 public static String getDefaultProducerPort() {
240 return defaultProducerPort;
243 public static void setDefaultProducerPort(String defaultProducerPort) {
244 MirrorMakerService.defaultProducerPort = defaultProducerPort;
247 public static String getDefaultConsumerPort() {
248 return defaultConsumerPort;
251 public static void setDefaultConsumerPort(String defaultConsumerPort) {
252 MirrorMakerService.defaultConsumerPort = defaultConsumerPort;