Create Message Handler and Activator for the test microservice 89/130189/4
authorFrancescoFioraEst <francesco.fiora@est.tech>
Thu, 4 Aug 2022 09:35:08 +0000 (10:35 +0100)
committerFrancescoFioraEst <francesco.fiora@est.tech>
Tue, 9 Aug 2022 13:24:28 +0000 (14:24 +0100)
Issue-ID: POLICY-4319
Change-Id: If096467ad717fdeaf70e6a9079c531a201e6cec7
Signed-off-by: FrancescoFioraEst <francesco.fiora@est.tech>
13 files changed:
models/src/main/java/org/onap/policy/clamp/models/acm/messages/dmaap/element/ElementMessage.java
models/src/main/java/org/onap/policy/clamp/models/acm/messages/dmaap/element/ElementMessageType.java [new file with mode: 0644]
models/src/main/java/org/onap/policy/clamp/models/acm/messages/dmaap/element/ElementStatus.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java [new file with mode: 0644]
participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java [new file with mode: 0644]

index 2e73b2b..f14b2c4 100644 (file)
@@ -22,16 +22,20 @@ package org.onap.policy.clamp.models.acm.messages.dmaap.element;
 
 import java.time.Instant;
 import java.util.UUID;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
 import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
 
-@Data
-@NoArgsConstructor
-@AllArgsConstructor
+@Getter
+@Setter
+@ToString
 public class ElementMessage {
 
+    @Setter(AccessLevel.NONE)
+    private ElementMessageType messageType;
+
     private ToscaConceptIdentifier elementId;
 
     private String message;
@@ -43,4 +47,13 @@ public class ElementMessage {
      * current time.
      */
     private Instant timestamp = Instant.now();
+
+    /**
+     * Constructor for instantiating a element message class.
+     *
+     * @param messageType the message type
+     */
+    public ElementMessage(ElementMessageType messageType) {
+        this.messageType = messageType;
+    }
 }
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/dmaap/element/ElementMessageType.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/dmaap/element/ElementMessageType.java
new file mode 100644 (file)
index 0000000..9cefaf2
--- /dev/null
@@ -0,0 +1,25 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021-2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.dmaap.element;
+
+public enum ElementMessageType {
+    STATUS, ACK_MSG
+}
diff --git a/models/src/main/java/org/onap/policy/clamp/models/acm/messages/dmaap/element/ElementStatus.java b/models/src/main/java/org/onap/policy/clamp/models/acm/messages/dmaap/element/ElementStatus.java
new file mode 100644 (file)
index 0000000..844ec8a
--- /dev/null
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2021-2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.models.acm.messages.dmaap.element;
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+@Getter
+@Setter
+@ToString(callSuper = true)
+public class ElementStatus extends ElementMessage {
+
+    public ElementStatus() {
+        super(ElementMessageType.STATUS);
+    }
+
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageActivator.java
new file mode 100644 (file)
index 0000000..ac3c72e
--- /dev/null
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import java.io.IOException;
+import java.util.List;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessageType;
+import org.onap.policy.common.endpoints.event.comm.TopicEndpointManager;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.TopicSource;
+import org.onap.policy.common.endpoints.listeners.MessageTypeDispatcher;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.utils.services.ServiceManagerContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.EventListener;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class activates the Kafka together with all its handlers.
+ */
+@Component
+public class MessageActivator extends ServiceManagerContainer implements AutoCloseable {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessageActivator.class);
+
+    private static final String[] MSG_TYPE_NAMES = { "messageType" };
+
+    // Topics from which the AC element receives and sends messages
+    private List<TopicSink> topicSinks;
+    private List<TopicSource> topicSources;
+
+    private final MessageListener listener;
+    private final MessagePublisher publisher;
+
+    private MessageTypeDispatcher msgDispatcher;
+
+    /**
+     * Constructor.
+     *
+     * @param listener  MessageListener
+     * @param publisher MessagePublisher
+     */
+    public MessageActivator(MessageListener listener, MessagePublisher publisher) {
+        super();
+        this.listener = listener;
+        this.publisher = publisher;
+        msgDispatcher = new MessageTypeDispatcher(MSG_TYPE_NAMES);
+    }
+
+    /**
+     * Activate publisher and listener messages.
+     *
+     * @param parameters TopicParameterGroup
+     */
+    public void activate(final TopicParameterGroup parameters) {
+        topicSinks = TopicEndpointManager.getManager().addTopicSinks(parameters.getTopicSinks());
+        topicSources = TopicEndpointManager.getManager().addTopicSources(parameters.getTopicSources());
+
+        // @formatter:off
+        addAction("Topic endpoint management",
+            () -> TopicEndpointManager.getManager().start(),
+            () -> TopicEndpointManager.getManager().shutdown());
+
+        addAction("Message Publisher",
+            () -> publisher.active(topicSinks), publisher::stop);
+
+
+        addAction("Message Listener",
+            () -> msgDispatcher.register(ElementMessageType.STATUS.name(), listener),
+            () -> msgDispatcher.unregister(ElementMessageType.STATUS.name()));
+
+        addAction("Topic Message Dispatcher", this::registerMsgDispatcher, this::unregisterMsgDispatcher);
+        // @formatter:on
+
+        start();
+        LOGGER.info("Kafka configuration initialised successfully");
+    }
+
+    /**
+     * Handle ContextClosedEvent.
+     *
+     * @param ctxClosedEvent ContextClosedEvent
+     */
+    @EventListener
+    public void handleContextClosedEvent(ContextClosedEvent ctxClosedEvent) {
+        deactivate();
+    }
+
+    /**
+     * Deactivate publisher and listener messages.
+     */
+    public void deactivate() {
+        if (isAlive()) {
+            stop();
+        }
+    }
+
+    /**
+     * Registers the dispatcher with the topic source(s).
+     */
+    private void registerMsgDispatcher() {
+        for (final TopicSource source : topicSources) {
+            source.register(msgDispatcher);
+        }
+    }
+
+    /**
+     * Unregisters the dispatcher from the topic source(s).
+     */
+    private void unregisterMsgDispatcher() {
+        for (final TopicSource source : topicSources) {
+            source.unregister(msgDispatcher);
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (isAlive()) {
+            super.shutdown();
+            LOGGER.info("Kafka configuration is uninitialised.");
+        }
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageHandler.java
new file mode 100644 (file)
index 0000000..540c133
--- /dev/null
@@ -0,0 +1,115 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.ws.rs.core.Response;
+import lombok.Getter;
+import lombok.NonNull;
+import org.onap.policy.clamp.acm.element.main.parameters.AcElement;
+import org.onap.policy.clamp.acm.element.service.ElementService;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageHandler {
+
+    private ElementType elementType;
+    private ToscaConceptIdentifier elementId;
+
+    private Map<ElementType, ElementService> map = new HashMap<>();
+
+    @Getter
+    private List<ElementMessage> messages = new ArrayList<>();
+
+    /**
+     * Constructor.
+     *
+     * @param acElement       AcElement
+     * @param elementServices ElementService list
+     */
+    public MessageHandler(AcElement acElement, List<ElementService> elementServices) {
+        elementId = acElement.getElementId();
+        elementServices.stream().forEach(elementService -> map.put(elementService.getType(), elementService));
+    }
+
+    /**
+     * Active Element Service.
+     *
+     * @param elementConfig ElementConfig
+     */
+    public void active(@NonNull ElementConfig elementConfig) {
+        this.elementType = elementConfig.getElementType();
+        getActiveService().active(elementConfig);
+    }
+
+    /**
+     * Update configuration.
+     *
+     * @param elementConfig ElementConfig
+     */
+    public void update(@NonNull ElementConfig elementConfig) {
+        if (elementType == null) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementType not defined!");
+        }
+        if (!elementType.equals(elementConfig.getElementType())) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT, "wrong ElementType!");
+        }
+        getActiveService().update(elementConfig);
+    }
+
+    /**
+     * Get Active Service.
+     *
+     * @return ElementService
+     */
+    public ElementService getActiveService() {
+        if (elementType == null) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementType not defined!");
+        }
+        var service = map.get(elementType);
+        if (service == null) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT, "ElementService not found!");
+        }
+        return service;
+    }
+
+    public void handleMessage(ElementMessage message) {
+        messages.add(message);
+        getActiveService().handleMessage(message);
+    }
+
+    public boolean appliesTo(final ToscaConceptIdentifier elementId) {
+        return this.elementId.equals(elementId);
+    }
+
+    public void deactivateElement() {
+        getActiveService().deactivate();
+        elementType = null;
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessageListener.java
new file mode 100644 (file)
index 0000000..9fe4f06
--- /dev/null
@@ -0,0 +1,45 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus;
+import org.onap.policy.common.endpoints.event.comm.Topic.CommInfrastructure;
+import org.onap.policy.common.endpoints.listeners.ScoListener;
+import org.onap.policy.common.utils.coder.StandardCoderObject;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessageListener extends ScoListener<ElementStatus> {
+
+    private final MessageHandler handler;
+
+    public MessageListener(MessageHandler handler) {
+        super(ElementStatus.class);
+        this.handler = handler;
+    }
+
+    @Override
+    public void onTopicEvent(CommInfrastructure infra, String topic, StandardCoderObject sco, ElementStatus message) {
+        if (handler.appliesTo(message.getElementId())) {
+            handler.handleMessage(message);
+        }
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/handler/MessagePublisher.java
new file mode 100644 (file)
index 0000000..ef0a72f
--- /dev/null
@@ -0,0 +1,71 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.handler;
+
+import java.util.List;
+import javax.ws.rs.core.Response;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.common.endpoints.event.comm.TopicSink;
+import org.onap.policy.common.endpoints.event.comm.client.TopicSinkClient;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MessagePublisher {
+    private static final Logger LOGGER = LoggerFactory.getLogger(MessagePublisher.class);
+    private static final String NOT_ACTIVE_TEXT = "Not Active!";
+
+    private boolean active = false;
+    private TopicSinkClient topicSinkClient;
+
+    /**
+     * Constructor for instantiating MessagePublisher.
+     *
+     * @param topicSinks the topic sinks
+     */
+    public void active(List<TopicSink> topicSinks) {
+        if (topicSinks.size() != 1) {
+            throw new IllegalArgumentException("Configuration unsupported, Topic sinks greater than 1");
+        }
+        this.topicSinkClient = new TopicSinkClient(topicSinks.get(0));
+        active = true;
+    }
+
+    /**
+     * Method to send message.
+     *
+     * @param msg the acknowledgement message
+     */
+    public void publishMsg(final ElementMessage msg) {
+        if (!active) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT, NOT_ACTIVE_TEXT);
+        }
+
+        topicSinkClient.send(msg);
+        LOGGER.debug("Sent message {}", msg);
+    }
+
+    public void stop() {
+        active = false;
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/AbstractElementService.java
new file mode 100644 (file)
index 0000000..7c28c4b
--- /dev/null
@@ -0,0 +1,47 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+
+public abstract class AbstractElementService implements ElementService {
+
+    @Override
+    public void update(ElementConfig elementConfig) {
+        // Not needs
+    }
+
+    @Override
+    public void handleMessage(ElementMessage message) {
+        // Not needs
+    }
+
+    @Override
+    public void active(ElementConfig elementConfig) {
+        // Not needs
+    }
+
+    @Override
+    public void deactivate() {
+        // Not needs
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/BridgeService.java
new file mode 100644 (file)
index 0000000..18f3a6f
--- /dev/null
@@ -0,0 +1,65 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.acm.element.handler.MessagePublisher;
+import org.onap.policy.clamp.acm.element.main.parameters.AcElement;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.springframework.stereotype.Service;
+
+/**
+ * Bridge Service.
+ */
+@Service
+public class BridgeService extends AbstractElementService {
+
+    private final MessagePublisher messagePublisher;
+    private ToscaConceptIdentifier receiver;
+    private ToscaConceptIdentifier elementId;
+
+    public BridgeService(MessagePublisher messagePublisher, AcElement acElement) {
+        this.messagePublisher = messagePublisher;
+        this.elementId = acElement.getElementId();
+    }
+
+    @Override
+    public ElementType getType() {
+        return ElementType.BRIDGE;
+    }
+
+    @Override
+    public void handleMessage(ElementMessage messageFrom) {
+        var messageTo = new ElementStatus();
+        messageTo.setElementId(receiver);
+        // Add Tracking
+        messageTo.setMessage(messageFrom.getMessage() + ", bridge: " + elementId);
+        messagePublisher.publishMsg(messageTo);
+    }
+
+    @Override
+    public void active(ElementConfig elementConfig) {
+        receiver = elementConfig.getElementId();
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ConfigService.java
new file mode 100644 (file)
index 0000000..f542be2
--- /dev/null
@@ -0,0 +1,105 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import java.util.List;
+import javax.ws.rs.core.Response;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import org.onap.policy.clamp.acm.element.handler.MessageActivator;
+import org.onap.policy.clamp.acm.element.handler.MessageHandler;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.common.endpoints.parameters.TopicParameterGroup;
+import org.onap.policy.common.endpoints.parameters.TopicParameters;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+@Service
+@RequiredArgsConstructor
+public class ConfigService {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConfigService.class);
+
+    private ElementConfig elementConfig = new ElementConfig();
+
+    private final MessageHandler handler;
+    private final MessageActivator messageActivator;
+
+    /**
+     * Activate messages and service and create the element configuration.
+     *
+     * @param elementConfig the configuration
+     */
+    public void activateElement(@NonNull ElementConfig elementConfig) {
+        var topicParameters = new TopicParameters();
+        topicParameters.setTopic(elementConfig.getTopicParameterGroup().getTopic());
+        topicParameters.setServers(List.of(elementConfig.getTopicParameterGroup().getServer()));
+        topicParameters.setFetchTimeout(elementConfig.getTopicParameterGroup().getFetchTimeout());
+        topicParameters.setTopicCommInfrastructure(elementConfig.getTopicParameterGroup().getTopicCommInfrastructure());
+        topicParameters.setUseHttps(elementConfig.getTopicParameterGroup().isUseHttps());
+
+        var parameters = new TopicParameterGroup();
+        parameters.setTopicSinks(List.of(topicParameters));
+        parameters.setTopicSources(List.of(topicParameters));
+
+        if (!parameters.isValid()) {
+            throw new PfModelRuntimeException(Response.Status.BAD_REQUEST,
+                    "Validation failed for topic parameter group. Kafka config not activated");
+        }
+
+        if (messageActivator.isAlive()) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT,
+                    "Service Manager already running, cannot add Topic endpoint management");
+        }
+
+        handler.active(elementConfig);
+        messageActivator.activate(parameters);
+        this.elementConfig = elementConfig;
+
+        LOGGER.info("Messages and service activated");
+    }
+
+    /**
+     * Fetch element configuration.
+     *
+     * @return element configuration present
+     */
+    public ElementConfig getElementConfig() {
+        return elementConfig;
+    }
+
+    /**
+     * Deactivate messages and service and delete the element config.
+     */
+    public void deleteConfig() {
+        handler.deactivateElement();
+        messageActivator.deactivate();
+        elementConfig = new ElementConfig();
+        LOGGER.info("Messages and service deactivated");
+    }
+
+    public List<ElementMessage> getMessages() {
+        return handler.getMessages();
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/ElementService.java
new file mode 100644 (file)
index 0000000..00b4d8a
--- /dev/null
@@ -0,0 +1,38 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementMessage;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+
+public interface ElementService {
+
+    ElementType getType();
+
+    public void active(ElementConfig elementConfig);
+
+    public void update(ElementConfig elementConfig);
+
+    void handleMessage(ElementMessage message);
+
+    void deactivate();
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/SinkService.java
new file mode 100644 (file)
index 0000000..61c1c78
--- /dev/null
@@ -0,0 +1,36 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.springframework.stereotype.Service;
+
+/**
+ * Sink Service.
+ */
+@Service
+public class SinkService extends AbstractElementService {
+
+    @Override
+    public ElementType getType() {
+        return ElementType.SINK;
+    }
+}
diff --git a/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java b/participant/participant-impl/participant-impl-acelement/src/main/java/org/onap/policy/clamp/acm/element/service/StarterService.java
new file mode 100644 (file)
index 0000000..589397d
--- /dev/null
@@ -0,0 +1,110 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2022 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.policy.clamp.acm.element.service;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.ws.rs.core.Response;
+import org.onap.policy.clamp.acm.element.handler.MessagePublisher;
+import org.onap.policy.clamp.acm.element.main.parameters.AcElement;
+import org.onap.policy.clamp.models.acm.messages.dmaap.element.ElementStatus;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementConfig;
+import org.onap.policy.clamp.models.acm.messages.rest.element.ElementType;
+import org.onap.policy.models.base.PfModelRuntimeException;
+import org.onap.policy.models.tosca.authorative.concepts.ToscaConceptIdentifier;
+import org.springframework.stereotype.Service;
+
+/**
+ * Starter Service.
+ */
+@Service
+public class StarterService extends AbstractElementService implements AutoCloseable {
+
+    private ScheduledThreadPoolExecutor timerPool;
+    private ScheduledFuture<?> future;
+    private ToscaConceptIdentifier receiver;
+    private ToscaConceptIdentifier elementId;
+
+    private final MessagePublisher messagePublisher;
+
+    public StarterService(MessagePublisher messagePublisher, AcElement acElement) {
+        this.messagePublisher = messagePublisher;
+        this.elementId = acElement.getElementId();
+    }
+
+    @Override
+    public ElementType getType() {
+        return ElementType.STARTER;
+    }
+
+    /**
+     * Deactivate Scheduled ThreadPool Executor.
+     */
+    @Override
+    public void deactivate() {
+        if (timerPool != null) {
+            if (future != null) {
+                future.cancel(true);
+            }
+            timerPool.shutdown();
+            timerPool = null;
+        }
+    }
+
+    @Override
+    public void active(ElementConfig elementConfig) {
+        if (timerPool != null) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService alredy actived!");
+        }
+        receiver = elementConfig.getElementId();
+
+        timerPool = new ScheduledThreadPoolExecutor(1);
+        timerPool.setRemoveOnCancelPolicy(true);
+        future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
+                elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+    }
+
+    private void sendMessage() {
+        var messasge = new ElementStatus();
+        messasge.setElementId(receiver);
+        // Add Tracking
+        messasge.setMessage("starter: " + elementId);
+        messagePublisher.publishMsg(messasge);
+    }
+
+    @Override
+    public void update(ElementConfig elementConfig) {
+        if (timerPool == null) {
+            throw new PfModelRuntimeException(Response.Status.CONFLICT, "StarterService not actived!");
+        }
+        if (future != null) {
+            future.cancel(true);
+        }
+        future = timerPool.scheduleAtFixedRate(this::sendMessage, elementConfig.getTimerSec(),
+                elementConfig.getTimerSec(), TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public void close() throws Exception {
+        deactivate();
+    }
+}