2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright (C) 2017 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ============LICENSE_END=========================================================
21 package org.onap.dmaap.dbcapi.service;
23 import java.util.ArrayList;
24 import java.util.List;
35 //import org.openecomp.dmaapbc.aaf.AndrewDecryptor;
36 import org.onap.dmaap.dbcapi.aaf.AafDecrypt;
37 import org.onap.dmaap.dbcapi.client.MrTopicConnection;
38 import org.onap.dmaap.dbcapi.database.DatabaseClass;
39 import org.onap.dmaap.dbcapi.logging.BaseLoggingClass;
40 import org.onap.dmaap.dbcapi.logging.DmaapbcLogMessageEnum;
41 import org.onap.dmaap.dbcapi.model.ApiError;
42 import org.onap.dmaap.dbcapi.model.MR_Cluster;
43 import org.onap.dmaap.dbcapi.model.MirrorMaker;
44 import org.onap.dmaap.dbcapi.model.DmaapObject.DmaapObject_Status;
45 import org.onap.dmaap.dbcapi.util.DmaapConfig;
46 import org.onap.dmaap.dbcapi.util.RandomInteger;
48 public class MirrorMakerService extends BaseLoggingClass {
50 private Map<String, MirrorMaker> mirrors = DatabaseClass.getMirrorMakers();
51 private static MrTopicConnection prov;
52 private static AafDecrypt decryptor;
54 public MirrorMakerService() {
57 decryptor = new AafDecrypt();
60 // will create a MM on MMagent if needed
61 // will update the MMagent whitelist with all topics for this MM
62 public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
63 logger.info( "updateMirrorMaker");
64 DmaapConfig p = (DmaapConfig)DmaapConfig.getConfig();
65 String provUser = p.getProperty("MM.ProvUserMechId");
66 String provUserPwd = decryptor.decrypt(p.getProperty( "MM.ProvUserPwd", "notSet" ));
67 String defaultProducerPort = p.getProperty( "MM.KafkaProducerPort", "9092");
68 String defaultConsumerPort = p.getProperty( "MM.KafkaConsumerPort", "2181");
70 prov = new MrTopicConnection( provUser, provUserPwd );
72 String centralFqdn = p.getProperty("MR.CentralCname", "notSet");
74 DmaapService dmaap = new DmaapService();
75 MR_ClusterService clusters = new MR_ClusterService();
77 // in 1610, MM should only exist for edge-to-central
78 // we use a cname for the central MR cluster that is active, and provision on agent topic on that target
79 // but only send 1 message so MM Agents can read it relying on kafka delivery
80 for( MR_Cluster central: clusters.getCentralClusters() ) {
81 prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
82 ApiError resp = prov.doPostMessage(mm.createMirrorMaker( defaultConsumerPort, defaultProducerPort ));
83 if ( ! resp.is2xx() ) {
85 errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR, "create MM", Integer.toString(resp.getCode()), resp.getMessage());
86 mm.setStatus(DmaapObject_Status.INVALID);
88 prov.makeTopicConnection(central, dmaap.getBridgeAdminFqtn(), centralFqdn );
89 resp = prov.doPostMessage(mm.updateWhiteList());
90 if ( ! resp.is2xx()) {
91 errorLogger.error( DmaapbcLogMessageEnum.MM_PUBLISH_ERROR,"MR Bridge", Integer.toString(resp.getCode()), resp.getMessage());
92 mm.setStatus(DmaapObject_Status.INVALID);
94 mm.setStatus(DmaapObject_Status.VALID);
98 // we only want to send one message even if there are multiple central clusters
106 return mirrors.put( mm.getMmName(), mm);
108 public MirrorMaker getMirrorMaker( String part1, String part2 ) {
109 logger.info( "getMirrorMaker using " + part1 + " and " + part2 );
110 return mirrors.get(MirrorMaker.genKey(part1, part2));
112 public MirrorMaker getMirrorMaker( String key ) {
113 logger.info( "getMirrorMaker using " + key);
114 return mirrors.get(key);
117 /*public MirrorMaker updateMirrorMaker( MirrorMaker mm ) {
118 logger.info( "updateMirrorMaker");
119 return mirrors.put( mm.getMmName(), mm);
123 public void delMirrorMaker( MirrorMaker mm ) {
124 logger.info( "delMirrorMaker");
125 mirrors.remove(mm.getMmName());
128 // TODO: this should probably return sequential values or get replaced by the MM client API
129 // but it should be sufficient for initial 1610 development
130 public static String genTransactionId() {
131 RandomInteger ri = new RandomInteger(100000);
132 int randomInt = ri.next();
133 return Integer.toString(randomInt);
135 public List<String> getAllMirrorMakers() {
136 List<String> ret = new ArrayList<String>();
137 for( String key: mirrors.keySet()) {