Support Dmaap Message Util 49/134049/1
authorChuanyuChen <chenchuanyu@huawei.com>
Mon, 10 Apr 2023 02:37:47 +0000 (10:37 +0800)
committerChuanyuChen <chenchuanyu@huawei.com>
Mon, 10 Apr 2023 02:37:47 +0000 (10:37 +0800)
Support Dmaap Message Util
Issue-ID: USECASEUI-794

Signed-off-by: ChuanyuChen <chenchuanyu@huawei.com>
Change-Id: If54a17f72370667444fc87129d8fdc0958be8692

intentanalysis/pom.xml
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java [new file with mode: 0644]
intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java [new file with mode: 0644]
intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json [new file with mode: 0644]
intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json [new file with mode: 0644]

index ac21f4d..84f93aa 100644 (file)
             <artifactId>commons-io</artifactId>
             <version>2.7</version>
         </dependency>
+        <dependency>
+            <groupId>org.onap.dmaap.messagerouter.dmaapclient</groupId>
+            <artifactId>dmaapClient</artifactId>
+            <version>1.1.12</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>apache-log4j-extras</artifactId>
+                    <groupId>log4j</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.onap.dcaegen2.services.sdk.rest.services</groupId>
+            <artifactId>dmaap-client</artifactId>
+            <version>1.8.7</version>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicMonitor.java
new file mode 100644 (file)
index 0000000..4712d0c
--- /dev/null
@@ -0,0 +1,240 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ *  Copyright (C) 2022 Huawei Canada Limited.
+ *  Copyright (C) 2022 CTC, Inc.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonParser;
+import io.vavr.collection.List;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import lombok.Getter;
+import lombok.NonNull;
+import org.apache.commons.io.FileUtils;
+import org.apache.ibatis.io.Resources;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeResponse;
+import org.onap.usecaseui.intentanalysis.util.DmaapUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Mono;
+
+/**
+ * This is a Dmaap message-router topic monitor.
+ * It takes advantage of AT&T's dmaap-client's long-polling implementation, this monitor constantly fetch/refetch target msg topic.
+ * So that new msg can be notified almost immediately. This is the major different from previous implementation.
+ */
+public class MRTopicMonitor implements Runnable {
+
+    private final String name;
+    private volatile boolean running = false;
+    private static Logger logger = LoggerFactory.getLogger(MRTopicMonitor.class);
+    private static int DEFAULT_TIMEOUT_MS_FETCH = 15000;
+    private MRConsumerWrapper consumerWrapper;
+    private NotificationCallback callback;
+
+    /**
+     * Constructor
+     * @param name name of topic subscriber in config
+     * @param callback callbackfunction for received message
+     */
+    public MRTopicMonitor(String name, NotificationCallback callback){
+        this.name = name;
+        this.callback = callback;
+    }
+
+    /**
+     * Start the monitoring thread
+     */
+    public void start(){
+        logger.info("Starting Dmaap Bus Monitor");
+        try {
+            File configFile = Resources.getResourceAsFile("intentPolicy/modifycll.json");
+            String configBody = FileUtils.readFileToString(configFile, StandardCharsets.UTF_8);
+            JsonObject jsonObject = JsonParser.parseString(configBody).getAsJsonObject();
+            consumerWrapper = buildConsumerWrapper(jsonObject);
+            running = true;
+            Executor executor = Executors.newSingleThreadExecutor();
+            executor.execute(this);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+    }
+
+    /**
+     * Main loop that keep fetching and processing
+     */
+    @Override
+    public void run(){
+        while (running){
+            try {
+                logger.debug("Topic: {} getting new msg...", name);
+                List<JsonElement> dmaapMsgs = consumerWrapper.fetch();
+                for (JsonElement msg : dmaapMsgs){
+                    logger.debug("Received message: {}" +
+                            "\r\n and processing start", msg);
+                    process(msg.toString());
+                }
+            } catch (IOException | RuntimeException e){
+                logger.error("fetchMessage encountered error: {}", e);
+            }
+        }
+        logger.info("{}: exiting thread", this);
+    }
+
+    /**
+     * Stop the monitor
+     */
+    public void stop(){
+        logger.info("{}: exiting", this);
+        running = false;
+        this.consumerWrapper.close();
+        this.consumerWrapper = null;
+    }
+
+    private void process(String msg) {
+        try {
+            callback.activateCallBack(msg);
+        } catch (Exception e){
+            logger.error("process message encountered error: {}", e);
+        }
+    }
+
+    private List<JsonElement> fetch() throws IOException {
+        return this.consumerWrapper.fetch();
+    }
+
+    private MRConsumerWrapper buildConsumerWrapper(@NonNull JsonObject topicParamsJson )
+            throws IllegalArgumentException {
+        MRTopicParams topicParams = MRTopicParams.builder().buildFromConfigJson(topicParamsJson).build();
+        return new MRConsumerWrapper(topicParams);
+    }
+
+    /**
+     * Wrapper class of DmaapClient (package org.onap.dmaap.mr.client)
+     * A polling fashion dmaap  consumer, whose #fetch() sleep a certain time when connection fails, otherwise keep retryiny.
+     * It supports both https and http protocols.
+     */
+    private class MRConsumerWrapper {
+        /**
+         * Name of the "protocol" property.
+         */
+        protected static final String PROTOCOL_PROP = "Protocol";
+        /**
+         * Fetch timeout.
+         */
+        protected int fetchTimeout;
+
+        /**
+         * Time to sleep on a fetch failure.
+         */
+        @Getter
+        private final int sleepTime;
+
+        /**
+         * Counted down when {@link #close()} is invoked.
+         */
+        private final CountDownLatch closeCondition = new CountDownLatch(1);
+
+        protected MessageRouterSubscriber subscriber;
+        protected MessageRouterSubscribeRequest request;
+
+        /**
+         * Constructs the object.
+         *
+         * @param MRTopicParams parameters for the bus topic
+         */
+        protected MRConsumerWrapper(MRTopicParams MRTopicParams) {
+            this.fetchTimeout = MRTopicParams.getFetchTimeout();
+
+            if (this.fetchTimeout <= 0) {
+                this.sleepTime = DEFAULT_TIMEOUT_MS_FETCH;
+            } else {
+                // don't sleep too long, even if fetch timeout is large
+                this.sleepTime = Math.min(this.fetchTimeout, DEFAULT_TIMEOUT_MS_FETCH);
+            }
+
+            if (MRTopicParams.isTopicInvalid()) {
+                throw new IllegalArgumentException("No topic for DMaaP");
+            }
+
+            if (MRTopicParams.isServersInvalid()) {
+                throw new IllegalArgumentException("Must provide at least one host for HTTP AAF");
+            }
+
+            try{
+                this.subscriber = DmaapUtil.buildSubscriber();
+                this.request = DmaapUtil.buildSubscriberRequest("aai_subscriber", MRTopicParams.getTopic());
+
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Illegal MrConsumer parameters");
+            }
+
+        }
+
+        /**
+         * Try fetch new message. But backoff for some sleepTime when connection fails.
+         * @return
+         * @throws IOException
+         */
+        public List<JsonElement> fetch() throws IOException {
+            Mono<MessageRouterSubscribeResponse> responses = this.subscriber.get(this.request);
+            MessageRouterSubscribeResponse resp = responses.block();
+            List<JsonElement> list = resp.items();
+            return list;
+
+        }
+
+        /**
+         * Causes the thread to sleep; invoked after fetch() fails.  If the consumer is closed,
+         * or the thread is interrupted, then this will return immediately.
+         */
+        protected void sleepAfterFetchFailure() {
+            try {
+                logger.info("{}: backoff for {}ms", this, sleepTime);
+                if (this.closeCondition.await(this.sleepTime, TimeUnit.MILLISECONDS)) {
+                    logger.info("{}: closed while handling fetch error", this);
+                }
+
+            } catch (InterruptedException e) {
+                logger.warn("{}: interrupted while handling fetch error", this, e);
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        /**
+         * Close the dmaap client and this thread
+         */
+        public void close() {
+            this.closeCondition.countDown();
+        }
+    }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/MRTopicParams.java
new file mode 100644 (file)
index 0000000..7e2b3b6
--- /dev/null
@@ -0,0 +1,375 @@
+/*
+ * ============LICENSE_START=======================================================
+ * ONAP
+ * ================================================================================
+ * Copyright (C) 2018 Samsung Electronics Co., Ltd. All rights reserved.
+ * Copyright (C) 2018-2019, 2021 AT&T Intellectual Property. All rights reserved.
+ * Copyright (C) 2019 Nordix Foundation.
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * Copyright (C) 2022 CTC, Inc.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonObject;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * Partially copied from Onap Policy
+ * policy/common/policy-endpoints/src/main/java/org/onap/policy/common/endpoints/event/comm/bus/internal/BusTopicParams.java
+ * Modified to fit this project.
+ * Member variables of this Params class are as follows.
+ *
+ * <p>servers DMaaP servers
+ * topic DMaaP Topic to be monitored
+ * apiKey DMaaP API Key (optional)
+ * apiSecret DMaaP API Secret (optional)
+ * consumerGroup DMaaP Reader Consumer Group
+ * consumerInstance DMaaP Reader Instance
+ * fetchTimeout DMaaP fetch timeout
+ * fetchLimit DMaaP fetch limit
+ * environment DME2 Environment
+ * aftEnvironment DME2 AFT Environment
+ * partner DME2 Partner
+ * latitude DME2 Latitude
+ * longitude DME2 Longitude
+ * additionalProps Additional properties to pass to DME2
+ * useHttps does connection use HTTPS?
+ * allowSelfSignedCerts are self-signed certificates allow
+ */
+@Getter
+@Setter
+public class MRTopicParams {
+
+    private int port;
+    private List<String> servers;
+    private Map<String, String> additionalProps;
+    private String topic;
+    private String effectiveTopic;
+    private String apiKey;
+    private String apiSecret;
+    private String consumerGroup;
+    private String consumerInstance;
+    private int fetchTimeout;
+    private int fetchLimit;
+    private boolean useHttps;
+    private boolean allowSelfSignedCerts;
+    private boolean managed;
+
+    private String userName;
+    private String password;
+    private String environment;
+    private String aftEnvironment;
+    private String partner;
+    private String latitude;
+    private String longitude;
+    private String partitionId;
+    private String clientName;
+    private String hostname;
+    private String basePath;
+    @Getter
+    private String serializationProvider;
+
+    public static TopicParamsBuilder builder() {
+        return new TopicParamsBuilder();
+    }
+
+    /**
+     * Methods to Check if the property is INVALID.
+     */
+
+    public boolean isEnvironmentInvalid() {
+        return StringUtils.isBlank(environment);
+    }
+
+    public boolean isAftEnvironmentInvalid() {
+        return StringUtils.isBlank(aftEnvironment);
+    }
+
+    public boolean isLatitudeInvalid() {
+        return StringUtils.isBlank(latitude);
+    }
+
+    public boolean isLongitudeInvalid() {
+        return StringUtils.isBlank(longitude);
+    }
+
+    public boolean isConsumerInstanceInvalid() {
+        return StringUtils.isBlank(consumerInstance);
+    }
+
+    public boolean isConsumerGroupInvalid() {
+        return StringUtils.isBlank(consumerGroup);
+    }
+
+    public boolean isClientNameInvalid() {
+        return StringUtils.isBlank(clientName);
+    }
+
+    public boolean isPartnerInvalid() {
+        return StringUtils.isBlank(partner);
+    }
+
+    public boolean isServersInvalid() {
+        return (servers == null || servers.isEmpty()
+                || (servers.size() == 1 && ("".equals(servers.get(0)))));
+    }
+
+    public boolean isTopicInvalid() {
+        return StringUtils.isBlank(topic);
+    }
+
+    public boolean isPartitionIdInvalid() {
+        return StringUtils.isBlank(partitionId);
+    }
+
+    public boolean isHostnameInvalid() {
+        return StringUtils.isBlank(hostname);
+    }
+
+    public boolean isPortInvalid() {
+        return  (port <= 0 || port >= 65535);
+    }
+
+    /**
+     * Methods to Check if the property is Valid.
+     */
+
+    public boolean isApiKeyValid() {
+        return StringUtils.isNotBlank(apiKey);
+    }
+
+    public boolean isApiSecretValid() {
+        return StringUtils.isNotBlank(apiSecret);
+    }
+
+    public boolean isUserNameValid() {
+        return StringUtils.isNotBlank(userName);
+    }
+
+    public boolean isPasswordValid() {
+        return StringUtils.isNotBlank(password);
+    }
+
+    public boolean isAdditionalPropsValid() {
+        return additionalProps != null;
+    }
+
+    @NoArgsConstructor(access = AccessLevel.PRIVATE)
+    public static class TopicParamsBuilder {
+
+        final MRTopicParams params = new MRTopicParams();
+
+        public TopicParamsBuilder servers(List<String> servers) {
+            this.params.servers = servers;
+            return this;
+        }
+
+        public TopicParamsBuilder topic(String topic) {
+            this.params.topic = topic;
+            return this;
+        }
+
+        public TopicParamsBuilder effectiveTopic(String effectiveTopic) {
+            this.params.effectiveTopic = effectiveTopic;
+            return this;
+        }
+
+        public TopicParamsBuilder apiKey(String apiKey) {
+            this.params.apiKey = apiKey;
+            return this;
+        }
+
+        public TopicParamsBuilder apiSecret(String apiSecret) {
+            this.params.apiSecret = apiSecret;
+            return this;
+        }
+
+        public TopicParamsBuilder consumerGroup(String consumerGroup) {
+            this.params.consumerGroup = consumerGroup;
+            return this;
+        }
+
+        public TopicParamsBuilder consumerInstance(String consumerInstance) {
+            this.params.consumerInstance = consumerInstance;
+            return this;
+        }
+
+        public TopicParamsBuilder fetchTimeout(int fetchTimeout) {
+            this.params.fetchTimeout = fetchTimeout;
+            return this;
+        }
+
+        public TopicParamsBuilder fetchLimit(int fetchLimit) {
+            this.params.fetchLimit = fetchLimit;
+            return this;
+        }
+
+        public TopicParamsBuilder useHttps(boolean useHttps) {
+            this.params.useHttps = useHttps;
+            return this;
+        }
+
+        public TopicParamsBuilder allowSelfSignedCerts(boolean allowSelfSignedCerts) {
+            this.params.allowSelfSignedCerts = allowSelfSignedCerts;
+            return this;
+        }
+
+        public TopicParamsBuilder userName(String userName) {
+            this.params.userName = userName;
+            return this;
+        }
+
+        public TopicParamsBuilder password(String password) {
+            this.params.password = password;
+            return this;
+        }
+
+        public TopicParamsBuilder environment(String environment) {
+            this.params.environment = environment;
+            return this;
+        }
+
+        public TopicParamsBuilder aftEnvironment(String aftEnvironment) {
+            this.params.aftEnvironment = aftEnvironment;
+            return this;
+        }
+
+        public TopicParamsBuilder partner(String partner) {
+            this.params.partner = partner;
+            return this;
+        }
+
+        public TopicParamsBuilder latitude(String latitude) {
+            this.params.latitude = latitude;
+            return this;
+        }
+
+        public TopicParamsBuilder longitude(String longitude) {
+            this.params.longitude = longitude;
+            return this;
+        }
+
+        public TopicParamsBuilder additionalProps(Map<String, String> additionalProps) {
+            this.params.additionalProps = additionalProps;
+            return this;
+        }
+
+        public TopicParamsBuilder partitionId(String partitionId) {
+            this.params.partitionId = partitionId;
+            return this;
+        }
+
+        public MRTopicParams build() {
+            return params;
+        }
+
+        public TopicParamsBuilder buildFromConfigJson(JsonObject jsonObject) {
+            String consumerGroup = null;
+            String consumerInstance = null;
+            String aafUsername = null;
+            String aafPassword = null;
+            List<String> servers = new ArrayList<>();
+            String topic = null;
+            boolean useHttps = false;
+            int fetchTimeout = -1;
+            int fetchLimit = -1;
+
+            if (jsonObject.has("consumer_group") && !jsonObject.get("consumer_group").isJsonNull()) {
+                consumerGroup = jsonObject.get("consumer_group").getAsString();
+            }
+            if (jsonObject.has("consumer_instance") && !jsonObject.get("consumer_instance").isJsonNull()) {
+                consumerInstance = jsonObject.get("consumer_instance").getAsString();
+            }
+            if (jsonObject.has("aaf_username") && !jsonObject.get("aaf_username").isJsonNull()) {
+                aafUsername = jsonObject.get("aaf_username").getAsString();
+            }
+            if (jsonObject.has("aaf_password") && !jsonObject.get("aaf_password").isJsonNull()) {
+                aafPassword = jsonObject.get("aaf_password").getAsString();
+            }
+            if (jsonObject.has("fetch_timeout") && !jsonObject.get("fetch_timeout").isJsonNull()) {
+                fetchTimeout = jsonObject.get("fetch_timeout").getAsInt();
+            }
+            if (jsonObject.has("fetch_limit") && !jsonObject.get("fetch_limit").isJsonNull()) {
+                fetchLimit = jsonObject.get("fetch_limit").getAsInt();
+            }
+            if (jsonObject.has("servers") && !jsonObject.get("servers").isJsonNull()) {
+                JsonArray jsonArray = jsonObject.get("servers").getAsJsonArray();
+                servers = new ArrayList<>();
+                for (int i=0, e=jsonArray.size(); i<e; i++){
+                    servers.add(jsonArray.get(i).getAsString());
+                }
+            }
+
+            String topicUrl = jsonObject.get("dmaap_info").getAsJsonObject().get("topic_url").getAsString();
+            if (topicUrl.startsWith("https")){
+                useHttps = true;
+            }
+            String[] pmTopicSplit = topicUrl.split("\\/");
+            topic = pmTopicSplit[pmTopicSplit.length - 1];
+
+            this.params.topic = topicUrl;
+            this.params.servers = servers;
+            this.params.consumerGroup = consumerGroup;
+            this.params.consumerInstance = consumerInstance;
+            this.params.password = aafPassword;
+            this.params.userName = aafUsername;
+            this.params.fetchTimeout = fetchTimeout;
+            this.params.fetchLimit = fetchLimit;
+            this.params.useHttps = useHttps;
+            return this;
+        }
+
+        public TopicParamsBuilder managed(boolean managed) {
+            this.params.managed = managed;
+            return this;
+        }
+
+        public TopicParamsBuilder hostname(String hostname) {
+            this.params.hostname = hostname;
+            return this;
+        }
+
+        public TopicParamsBuilder clientName(String clientName) {
+            this.params.clientName = clientName;
+            return this;
+        }
+
+        public TopicParamsBuilder port(int port) {
+            this.params.port = port;
+            return this;
+        }
+
+        public TopicParamsBuilder basePath(String basePath) {
+            this.params.basePath = basePath;
+            return this;
+        }
+
+        public TopicParamsBuilder serializationProvider(String serializationProvider) {
+            this.params.serializationProvider = serializationProvider;
+            return this;
+        }
+
+    }
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/adapters/dmaap/NotificationCallback.java
new file mode 100644 (file)
index 0000000..dc46485
--- /dev/null
@@ -0,0 +1,26 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.adapters.dmaap;
+
+/**
+ * Interface for DmaapNotificationCallback
+ *
+ */
+public interface NotificationCallback {
+
+    public abstract void activateCallBack(String msg);
+
+}
diff --git a/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java b/intentanalysis/src/main/java/org/onap/usecaseui/intentanalysis/util/DmaapUtil.java
new file mode 100644 (file)
index 0000000..c6cbfce
--- /dev/null
@@ -0,0 +1,80 @@
+/*
+ * Copyright 2023 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onap.usecaseui.intentanalysis.util;
+
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.ImmutableMessageRouterSource;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSink;
+import org.onap.dcaegen2.services.sdk.model.streams.dmaap.MessageRouterSource;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.ContentType;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.DmaapClientFactory;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterPublisher;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.api.MessageRouterSubscriber;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.ImmutableMessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterPublishRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.MessageRouterSubscribeRequest;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableDmaapConnectionPoolConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.ImmutableMessageRouterSubscriberConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterPublisherConfig;
+import org.onap.dcaegen2.services.sdk.rest.services.dmaap.client.model.config.MessageRouterSubscriberConfig;
+
+public class DmaapUtil {
+    public static MessageRouterSubscriber buildSubscriber(){
+        MessageRouterSubscriberConfig connectionPoolConfiguration = ImmutableMessageRouterSubscriberConfig.builder()
+            .connectionPoolConfig(ImmutableDmaapConnectionPoolConfig.builder()
+                .connectionPool(16)
+                .maxIdleTime(10) //in seconds
+                .maxLifeTime(20) //in seconds
+                .build())
+            .build();
+
+        MessageRouterSubscriber cut = DmaapClientFactory.createMessageRouterSubscriber(connectionPoolConfiguration);
+        return cut;
+    }
+
+    public static MessageRouterSubscribeRequest buildSubscriberRequest(String name, String topicUrl){
+        MessageRouterSource sourceDefinition = ImmutableMessageRouterSource.builder()
+            .name(name)
+            .topicUrl(topicUrl)
+            .build();
+        MessageRouterSubscribeRequest request = ImmutableMessageRouterSubscribeRequest.builder()
+            .consumerGroup("1")
+            .consumerId("1")
+            .sourceDefinition(sourceDefinition)
+            .build();
+
+        return request;
+    }
+
+    public static MessageRouterPublisher buildPublisher(){
+        MessageRouterPublisher pub = DmaapClientFactory
+            .createMessageRouterPublisher(MessageRouterPublisherConfig.createDefault());
+        return pub;
+    }
+
+    public static MessageRouterPublishRequest buildPublisherRequest(String name, String topicUrl){
+        MessageRouterSink sinkDefinition = ImmutableMessageRouterSink.builder()
+            .name(name)
+            .topicUrl(topicUrl)
+            .build();
+        MessageRouterPublishRequest request = ImmutableMessageRouterPublishRequest.builder()
+            .sinkDefinition(sinkDefinition)
+            .contentType(ContentType.TEXT_PLAIN)
+            .build();
+        return request;
+    }
+}
diff --git a/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json b/intentanalysis/src/main/resources/dmaapConfig/dcae_dmaap_config.json
new file mode 100644 (file)
index 0000000..d22e843
--- /dev/null
@@ -0,0 +1,20 @@
+{
+    "dcae_subscriber":{
+        "type":"message_router",
+        "aaf_username": null,
+        "aaf_password": null,
+        "api_key" : null,
+        "api_secret" : null,
+        "servers" : ["message-router:3904"],
+        "consumer_group" : "intent_analysis_dcaeevent",
+        "consumer_instance" : "intent_analysis_dcaeevent_1",
+        "fetch_timeout" : 15000,
+        "fetch_limit" : 100,
+        "dmaap_info":{
+          "topic_url":"http://message-router:3904/events/INTENT-EVENT",
+          "client_role":"org.onap.uui.intentanalysisSub",
+          "location":"onap",
+          "client_id":"intent-analysis-1"
+        }
+      }
+}
diff --git a/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json b/intentanalysis/src/main/resources/dmaapConfig/policy_dmaap_config.json
new file mode 100644 (file)
index 0000000..0ec440f
--- /dev/null
@@ -0,0 +1,20 @@
+{
+    "policy_subscriber":{
+        "type":"message_router",
+        "aaf_username": null,
+        "aaf_password": null,
+        "api_key" : null,
+        "api_secret" : null,
+        "servers" : ["message-router:3904"],
+        "consumer_group" : "intent_analysis_policyevent",
+        "consumer_instance" : "intent_analysis_policyevent_1",
+        "fetch_timeout" : 15000,
+        "fetch_limit" : 100,
+        "dmaap_info":{
+          "topic_url":"http://message-router:3904/events/INTENT-EVENT",
+          "client_role":"org.onap.uui.intentanalysisSub",
+          "location":"onap",
+          "client_id":"intent-analysis-1"
+        }
+      }
+}