--- /dev/null
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
+ */
+
+package org.onap.so.bpmn.infrastructure.pnf;
+
+import org.springframework.context.ApplicationEvent;
+
+public class PnfNotificationEvent extends ApplicationEvent {
+
+ private String pnfCorrelationId;
+
+ /**
+ * Create a new ApplicationEvent.
+ *
+ * @param source the object on which the event initially occurred (never {@code null})
+ */
+ public PnfNotificationEvent(Object source, String pnfCorrelationId) {
+ super(source);
+ this.pnfCorrelationId = pnfCorrelationId;
+ }
+
+ public String getPnfCorrelationId() {
+ return pnfCorrelationId;
+ }
+}
import org.camunda.bpm.engine.RuntimeService;
import org.camunda.bpm.engine.delegate.DelegateExecution;
import org.camunda.bpm.engine.delegate.JavaDelegate;
+import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
import org.onap.so.bpmn.infrastructure.pnf.dmaap.DmaapClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
@Component
-public class InformDmaapClient implements JavaDelegate {
+public class InformDmaapClient implements JavaDelegate, ApplicationListener<PnfNotificationEvent> {
+ private Logger logger = LoggerFactory.getLogger(getClass());
private DmaapClient dmaapClient;
+ private DelegateExecution execution;
@Override
public void execute(DelegateExecution execution) {
RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
dmaapClient.registerForUpdate(pnfCorrelationId, () -> runtimeService.createMessageCorrelation("WorkflowMessage")
.processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult());
+ this.execution = execution;
}
@Autowired
public void setDmaapClient(DmaapClient dmaapClient) {
this.dmaapClient = dmaapClient;
}
+
+ @Override
+ public void onApplicationEvent(PnfNotificationEvent event) {
+ logger.info("Received application event for pnfCorrelationId: {}", event.getPnfCorrelationId());
+ RuntimeService runtimeService = execution.getProcessEngineServices().getRuntimeService();
+ runtimeService.createMessageCorrelation("WorkflowMessage")
+ .processInstanceBusinessKey(execution.getProcessBusinessKey()).correlateWithResult();
+ }
}
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
+import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;
private volatile ScheduledThreadPoolExecutor executor;
private volatile boolean dmaapThreadListenerIsRunning;
+ private ApplicationEventPublisher applicationEventPublisher;
+
@Autowired
- public PnfEventReadyDmaapClient(Environment env) {
+ public PnfEventReadyDmaapClient(Environment env, ApplicationEventPublisher applicationEventPublisher) {
+ this.applicationEventPublisher = applicationEventPublisher;
httpClient = HttpClientBuilder.create().build();
pnfCorrelationIdToThreadMap = new ConcurrentHashMap<>();
topicListenerDelayInSeconds = env.getProperty("pnf.dmaap.topicListenerDelayInSeconds", Integer.class);
}
private void informAboutPnfReadyIfPnfCorrelationIdFound(String pnfCorrelationId) {
- Runnable runnable = unregister(pnfCorrelationId);
- if (runnable != null) {
- logger.debug("dmaap listener gets pnf ready event for pnfCorrelationId: {}", pnfCorrelationId);
- // runnable.run();
- runnable = null;
- }
+ unregister(pnfCorrelationId);
+ PnfNotificationEvent pnfNotificationEvent = new PnfNotificationEvent(this, pnfCorrelationId);
+ applicationEventPublisher.publishEvent(pnfNotificationEvent);
}
}
-
}
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
+import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
public class InformDmaapClientTest {
+
@Before
public void setUp() throws Exception {
informDmaapClient = new InformDmaapClient();
inOrder.verify(messageCorrelationBuilder).correlateWithResult();
}
+ @Test
+ public void onApplicationEvent_validPnfNotificationEvent_expectedOutput() {
+
+ PnfNotificationEvent pnfNotificationEvent = new PnfNotificationEvent(this, "testPnfCorrelationId");
+
+ informDmaapClient.execute(delegateExecution);
+ informDmaapClient.onApplicationEvent(pnfNotificationEvent);
+
+ InOrder inOrder = inOrder(messageCorrelationBuilder);
+ inOrder.verify(messageCorrelationBuilder).processInstanceBusinessKey("testBusinessKey");
+ inOrder.verify(messageCorrelationBuilder).correlateWithResult();
+ }
+
private DelegateExecution mockDelegateExecution() {
DelegateExecution delegateExecution = mock(DelegateExecution.class);
when(delegateExecution.getVariable(eq(ExecutionVariableNames.PNF_CORRELATION_ID)))
import static org.junit.Assert.*;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import static org.mockito.Mockito.when;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnitRunner;
+import org.onap.so.bpmn.infrastructure.pnf.PnfNotificationEvent;
import org.onap.so.bpmn.infrastructure.pnf.dmaap.PnfEventReadyDmaapClient.DmaapTopicListenerThread;
+import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.env.Environment;
@RunWith(MockitoJUnitRunner.class)
private Runnable threadMockToNotifyCamundaFlow;
private ScheduledThreadPoolExecutor executorMock;
+ @Mock
+ private ApplicationEventPublisher applicationEventPublisher;
+
@Before
public void init() throws NoSuchFieldException, IllegalAccessException {
when(env.getProperty(eq("pnf.dmaap.port"), eq(Integer.class))).thenReturn(PORT);
when(env.getProperty(eq("pnf.dmaap.consumerGroup"))).thenReturn(CONSUMER_GROUP);
when(env.getProperty(eq("pnf.dmaap.topicListenerDelayInSeconds"), eq(Integer.class)))
.thenReturn(TOPIC_LISTENER_DELAY_IN_SECONDS);
- testedObject = new PnfEventReadyDmaapClient(env);
+ testedObject = new PnfEventReadyDmaapClient(env, applicationEventPublisher);
testedObjectInnerClassThread = testedObject.new DmaapTopicListenerThread();
httpClientMock = mock(HttpClient.class);
threadMockToNotifyCamundaFlow = mock(Runnable.class);
assertEquals(captor1.getValue().getURI().getPath(),
"/" + URI_PATH_PREFIX + "/" + EVENT_TOPIC_TEST + "/" + CONSUMER_GROUP + "/" + CONSUMER_ID + "");
- // verify(threadMockToNotifyCamundaFlow).run();
+ /**
+ * Two PNF returned from HTTP request.
+ */
+ verify(applicationEventPublisher, times(2)).publishEvent(any(PnfNotificationEvent.class));
verify(executorMock).shutdown();
}
-/*
+/*-
+ * ============LICENSE_START=======================================================
+ * Copyright (C) 2019 Nordix Foundation.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * ============LICENSE_START======================================================= Copyright (C) 2019 Nordix
- * Foundation. ================================================================================ Licensed under the
- * Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may
- * obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
- * either express or implied. See the License for the specific language governing permissions and limitations under the
- * License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
- * SPDX-License-Identifier: Apache-2.0 ============LICENSE_END=========================================================
+ * SPDX-License-Identifier: Apache-2.0
+ * ============LICENSE_END=========================================================
*/
package org.onap.so.bpmn.infrastructure.process;
import static org.assertj.core.api.Assertions.fail;
import static org.camunda.bpm.engine.test.assertions.bpmn.BpmnAwareAssertions.assertThat;
import com.google.protobuf.Struct;
-import com.google.protobuf.Value;
import java.io.IOException;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import org.camunda.bpm.engine.runtime.Execution;
import org.camunda.bpm.engine.runtime.ProcessInstance;
if (!execution.isSuspended() && !execution.isEnded()) {
try {
+
runtimeService.signal(execution.getId());
} catch (Exception e) {
logger.info(e.getMessage(), e);