*** Settings ***
Documentation Integration tests for PRH.
... PRH receive events from DMaaP and produce or not PNF_READY notification depends on required fields in received event.
- Suite Setup Run keywords Create Headers AND Create sessions
+ Suite Setup Run keywords Create Headers AND Create sessions AND Set default PRH CBS config
Test Teardown Reset Simulators
Test Timeout 2 minutes
- Library resources/PrhLibrary.py
Resource resources/prh_sessions.robot
Resource resources/prh_library.robot
+ Resource resources/prh_config_library.robot
*** Variables ***
${TEST_CASES_DIR} %{WORKSPACE}/tests/dcaegen2/prh-testcases/assets
Should send PNF ready when logical link exists and replace it in AAI
[Documentation] PRH gets event from DMaaP with an attachment point, PNF is related to a logical link in AAI
[Tags] PRH Attachment point
- [Template] Verify PNF ready sent and old logical link removed from AAI
+ [Template] Verify PNF ready sent and old logical link replaced in AAI
${TEST_CASES_DIR}/pnf-with-existing-logical-link
${TEST_CASES_DIR}/pnf-with-different-logical-link
AAI_RESOURCE_NOT_FOUND = b'{}'
pnf_entries = {}
-logical_links = {}
patched_pnf = AAI_RESOURCE_NOT_FOUND
created_logical_link = AAI_RESOURCE_NOT_FOUND
service_instance = AAI_RESOURCE_NOT_FOUND
if re.search('/setup/patched_pnf', self.path):
httpServerLib.set_response_200_ok(self, payload = patched_pnf)
logger.debug('AAISetup GET /setup/patched_pnf -> 200 OK')
- elif re.search('/verify/created_logical_link', self.path):
+ elif re.search('/verify/logical-link', self.path):
httpServerLib.set_response_200_ok(self, payload = created_logical_link)
- logger.debug('AAISetup GET /setup/created_logical_link -> 200 OK')
- elif re.search('/verify/logical-links', self.path):
- httpServerLib.set_response_200_ok(self, payload= json.dumps(logical_links).encode('utf-8'))
- elif re.search('/verify/logical-link/bbs-link', self.path):
- link_name = basename(self.path)
- if link_name in logical_links:
- httpServerLib.set_response_200_ok(self, payload = logical_links[link_name])
+ logger.debug('AAISetup GET /verify/logical_link -> 200 OK')
else:
httpServerLib.set_response_404_not_found(self)
logger.info('AAISetup GET ' + self.path + ' -> 404 Not found')
if logical_link_name == None:
raise Exception("Invalid logical link entry, could not extract `link-name`")
- global logical_link
- logical_links[logical_link_name] = logical_link_payload
+ global created_logical_link
+ created_logical_link = logical_link_payload
- logical_link = json.loads(logical_link_payload)
httpServerLib.set_response_200_ok(self)
logger.debug('AAISetup PUT /setup/add_logical_link -> 200 OK')
logger.info('AAIHandler GET /aai/v12/network/pnfs/pnf/' + pnf_name + ' -> 404 Not found, actual entries: ' + str(pnf_entries.keys()))
elif re.search('/aai/v12/network/logical-links/logical-link/[^/]*$', self.path):
logical_link_name = basename(self.path)
- if logical_link_name in logical_links:
- httpServerLib.set_response_200_ok(self, payload = logical_links[logical_link_name])
+ if json.loads(created_logical_link).get("link-name") == logical_link_name:
+ httpServerLib.set_response_200_ok(self, payload = created_logical_link)
logger.debug('AAIHandler GET /aai/v12/network/logical-links/logical-link/' + logical_link_name + ' -> 200 OK')
else:
httpServerLib.set_response_404_not_found(self)
- logger.info('AAIHandler GET /aai/v12/network/logical-links/logical-link/' + logical_link_name + ' -> 404 Not found, actual entries: ' + str(logical_links.keys()))
+ logger.info('AAIHandler GET /aai/v12/network/logical-links/logical-link/' + logical_link_name + ' -> 404 Not found, actual link: ' + created_logical_link)
elif re.search('aai/v12/network/pnfs/pnf/business/customers/customer/Demonstration/service-subscriptions/service-subscription/vFW/service-instances/service-instance/bbs_service', self.path):
httpServerLib.set_response_200_ok(self, payload = json.dumps(service_instance).encode('utf-8'))
logger.debug('AAIHandler GET aai/v12/network/pnfs/pnf/business/customers/customer/Demonstration/service-subscriptions/service-subscription/vFW/service-instances/service-instance/bbs_service -> 200 OK')
httpServerLib.set_response_200_ok(self)
logical_link_name = re.search('.+?(?=\?)', basename(self.path)).group(0)
- del logical_links[logical_link_name]
+ global created_logical_link
+ if json.loads(created_logical_link).get("link-name") == logical_link_name:
+ created_logical_link = AAI_RESOURCE_NOT_FOUND
+
logger.debug('AAIHandler DELETE /aai/v12/network/logical-links/logical-link/' + logical_link_name + ' -> 200 OK')
else:
httpServerLib.set_response_404_not_found(self)
def _main_(handler_class=AAIHandler, protocol="HTTP/1.0"):
handler_class.protocol_version = protocol
+ httpServerLib.start_http_endpoint(3333, AAIHandler)
httpServerLib.start_https_endpoint(3334, AAIHandler, keyfile="certs/aai.key", certfile="certs/aai.crt", ca_certs="certs/root.crt")
httpServerLib.start_http_endpoint(3335, AAISetup)
while 1: