Attempt to stablize the synch.
Issue-ID: CCSDK-3683
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Change-Id: Ieda858e76082fd5224ac43f153e8967f871322d8
import java.util.List;
import org.json.JSONArray;
-import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
}
}
return Flux.fromIterable(arrayList);
- } catch (JSONException ex) { // invalid json
+ } catch (Exception ex) { // invalid json
logger.debug("Invalid json {}", ex.getMessage());
return Flux.error(ex);
}
JSONObject outputJson = new JSONObject(response);
JSONObject responseParams = outputJson.getJSONObject(OUTPUT);
return Mono.just(responseParams);
- } catch (JSONException ex) { // invalid json
+ } catch (Exception ex) { // invalid json
logger.debug("Invalid json {}", ex.getMessage());
return Mono.error(ex);
}
JSONObject schemaObject = jsonObject.getJSONObject("policySchema");
String schemaString = schemaObject.toString();
return Mono.just(schemaString);
- } catch (JSONException ex) { // invalid json
+ } catch (Exception ex) { // invalid json
logger.debug("Invalid json {}", ex.getMessage());
return Mono.error(ex);
}
return restClient
.postWithAuthHeader(controllerUrl(rpcName), inputJsonString, this.controllerConfig.getUserName(),
this.controllerConfig.getPassword()) //
- .flatMap(this::extractResponseBody);
+ .flatMap(resp -> extractResponseBody(resp, ricUrl));
}
- private Mono<String> extractResponse(JSONObject responseOutput) {
+ private Mono<String> extractResponse(JSONObject responseOutput, String ricUrl) {
AdapterOutput output = gson.fromJson(responseOutput.toString(), AdapterOutput.class);
String body = output.body == null ? "" : output.body;
if (HttpStatus.valueOf(output.httpStatus).is2xxSuccessful()) {
return Mono.just(body);
} else {
- logger.debug("Error response: {} {}", output.httpStatus, body);
+ logger.debug("Error response: {} {}, from: {}", output.httpStatus, body, ricUrl);
byte[] responseBodyBytes = body.getBytes(StandardCharsets.UTF_8);
HttpStatus httpStatus = HttpStatus.valueOf(output.httpStatus);
WebClientResponseException responseException = new WebClientResponseException(httpStatus.value(),
}
}
- private Mono<String> extractResponseBody(String responseStr) {
+ private Mono<String> extractResponseBody(String responseStr, String ricUrl) {
return A1AdapterJsonHelper.getOutput(responseStr) //
- .flatMap(this::extractResponse);
+ .flatMap(responseOutput -> extractResponse(responseOutput, ricUrl));
}
private String controllerUrl(String rpcName) {
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.WebClientException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
if (throwable instanceof WebClientResponseException) {
WebClientResponseException e = (WebClientResponseException) throwable;
return ErrorResponse.createMono(e.getResponseBodyAsString(), e.getStatusCode());
+ } else if (throwable instanceof WebClientException) {
+ WebClientException e = (WebClientException) throwable;
+ return ErrorResponse.createMono(e.getMessage(), HttpStatus.BAD_GATEWAY);
} else if (throwable instanceof RejectionException) {
RejectionException e = (RejectionException) throwable;
return ErrorResponse.createMono(e.getMessage(), e.getStatus());
private Flux<RicData> createTask() {
return Flux.fromIterable(rics.getRics()) //
.flatMap(this::createRicData) //
+ .onErrorResume(t -> Flux.empty()) //
.flatMap(this::checkOneRic, CONCURRENCY);
}
}
private Mono<RicData> createRicData(Ric ric) {
- return Mono.just(ric) //
- .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
+ return this.a1ClientFactory.createA1Client(ric) //
+ .doOnError(t -> logger.debug("Could not create A1 client for ric: {}, reason: {}", ric.id(),
+ t.getMessage())) //
.map(a1Client -> new RicData(ric, a1Client));
}
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.core.publisher.SignalType;
/**
* Synchronizes the content of a Near-RT RIC with the content in the repository.
.flatMapMany(client -> runSynchronization(ric, client)) //
.doOnError(t -> { //
logger.warn("Synchronization failure for ric: {}, reason: {}", ric.id(), t.getMessage()); //
- ric.setState(RicState.UNAVAILABLE); //
deletePoliciesIfNotRecreatable(t, ric);
}) //
.collectList() //
.flatMap(notUsed -> onSynchronizationComplete(ric)) //
- .onErrorResume(t -> Mono.just(ric));
+ .onErrorResume(t -> Mono.just(ric)) //
+ .doFinally(signal -> onFinally(signal, ric));
+ }
+
+ private void onFinally(SignalType signal, Ric ric) {
+ if (ric.getState().equals(RicState.SYNCHRONIZING)) {
+ logger.debug("Resetting ric state after failed synch, ric: {}, signal: {}", ric.id(), signal);
+ ric.setState(RicState.UNAVAILABLE); //
+ }
}
/**
import org.onap.ccsdk.oran.a1policymanagementservice.clients.SecurityContext;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.ApplicationConfig;
import org.onap.ccsdk.oran.a1policymanagementservice.configuration.RicConfig;
+import org.onap.ccsdk.oran.a1policymanagementservice.exceptions.ServiceException;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policies;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Policy;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.PolicyType;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Rics;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Service;
import org.onap.ccsdk.oran.a1policymanagementservice.repository.Services;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
assertThat(ric1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
+ @Test
+ void testConnectionError() {
+ setUpCreationOfA1Client();
+ simulateRicWithNoPolicyTypes();
+ policies.put(policy1);
+ WebClientRequestException exception =
+ new WebClientRequestException(new ServiceException("x"), null, null, null);
+ when(a1ClientMock.deleteAllPolicies()).thenReturn(Flux.error(exception));
+ RicSynchronizationTask synchronizerUnderTest = createTask();
+ ric1.setState(RicState.AVAILABLE);
+ synchronizerUnderTest.run(ric1);
+ await().untilAsserted(() -> RicState.UNAVAILABLE.equals(ric1.getState()));
+ }
+
@Test
void ricIdlePolicyTypeInRepo_thenSynchronizationWithReuseOfTypeFromRepoAndCorrectServiceNotified() {
rics.put(ric1);