Add etcd python client 93/121093/2
authorkrishnaa96 <krishna.moorthy6@wipro.com>
Wed, 5 May 2021 06:33:32 +0000 (12:03 +0530)
committerkrishnaa96 <krishna.moorthy6@wipro.com>
Wed, 5 May 2021 08:19:20 +0000 (13:49 +0530)
- Add a new python client for etcd
- Add etcd credentials to SMS
- New conf group etcd_api, db_options

Issue-ID: OPTFRA-947
Signed-off-by: krishnaa96 <krishna.moorthy6@wipro.com>
Change-Id: I6b2214d2f2abc29c34613a5aeaad525b5f37a390

conductor.conf
conductor/conductor/common/etcd/__init__.py [new file with mode: 0644]
conductor/conductor/common/etcd/api.py [new file with mode: 0644]
conductor/conductor/common/etcd/utils.py [new file with mode: 0644]
conductor/conductor/common/sms.py
conductor/conductor/tests/unit/common/etcd/__init__.py [new file with mode: 0644]
conductor/conductor/tests/unit/common/etcd/test_utils.py [new file with mode: 0644]
conductor/requirements.txt
csit/scripts/has-properties/has.json

index e3d710c..6e0e8a1 100755 (executable)
@@ -417,6 +417,31 @@ certificate_authority_bundle_file = /usr/local/bin/AAF_RootCA.cer
 # default is false
 enable_https_mode = True
 
+
+[db_options]
+
+# db_backend to use
+db_backend = music
+
+# Use music mock api
+music_mock = False
+
+
+[etcd_api]
+
+# host/ip address of etcd server
+host = localhost
+
+# port of etcd server
+port = 2379
+
+# username for etcd authentication
+username = conductor
+
+# password for etcd authentication
+password = conductor
+
+
 [music_api]
 
 #
diff --git a/conductor/conductor/common/etcd/__init__.py b/conductor/conductor/common/etcd/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/conductor/conductor/common/etcd/api.py b/conductor/conductor/common/etcd/api.py
new file mode 100644 (file)
index 0000000..3170b8c
--- /dev/null
@@ -0,0 +1,178 @@
+#
+# -------------------------------------------------------------------------
+#   Copyright (C) 2021 Wipro Limited.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import copy
+import etcd3
+from grpc import RpcError
+import json
+from oslo_config import cfg
+
+from conductor.common.etcd.utils import EtcdClientException
+from conductor.common.etcd.utils import validate_schema
+
+
+CONF = cfg.CONF
+
+ETCD_API_OPTS = [
+    cfg.StrOpt('host',
+               default='localhost',
+               help='host/ip for etcd'),
+    cfg.StrOpt('port',
+               default='2379',
+               help='port for etcd'),
+    cfg.StrOpt('username',
+               default='root',
+               help='Username for authentication'),
+    cfg.StrOpt('password',
+               default='root',
+               help='Password for authentication'),
+]
+
+CONF.register_opts(ETCD_API_OPTS, group='etcd_api')
+
+
+class EtcdAPI(object):
+
+    def __init__(self):
+        self.host = CONF.etcd_api.host
+        self.port = CONF.etcd_api.port
+        self.user = CONF.etcd_api.username
+        self.password = CONF.etcd_api.password
+
+    def get_client(self):
+        try:
+            return etcd3.client(host=self.host, port=self.port,
+                                user=self.user, password=self.password,
+                                grpc_options={
+                                    'grpc.http2.true_binary': 1,
+                                    'grpc.http2.max_pings_without_data': 0,
+                                }.items())
+        except RpcError as rpc_error:
+            raise EtcdClientException("Failed to establish connection with ETCD. GRPC {}".format(rpc_error.code()))
+
+    def get_raw_value(self, key):
+        return self.get_client().get(key)[0]
+
+    def get_value(self, key):
+        raw_value = self.get_raw_value(key)
+        if raw_value:
+            return json.loads(raw_value)
+        return None
+
+    def get_values_prefix(self, key_prefix, filter_name=None, filter_value=None):
+        values = {kv[1].key.decode().lstrip(key_prefix): json.loads(kv[0])
+                  for kv in self.get_client().get_prefix(key_prefix)}
+        if not filter_name or not filter_value:
+            return values
+
+        return dict(filter(lambda x: x[1].get(filter_name) == filter_value, values.items()))
+
+    def validate_row(self, keyspace, table, values):
+        key = f'{keyspace}/{table}'
+        schema = json.loads(self.get_client().get(key)[0])
+        return validate_schema(values, schema)
+
+    def keyspace_create(self, keyspace):
+        self.get_client().put_if_not_exists(keyspace, "This key is a placeholder for the keyspace".encode())
+
+    def keyspace_delete(self, keyspace):
+        self.get_client().delete_prefix(keyspace)
+
+    def table_create(self, keyspace, table, schema):
+        table_key = '{}/{}'.format(keyspace, table)
+        table_value = json.dumps(schema)
+        self.get_client().put_if_not_exists(table_key, table_value)
+
+    def table_delete(self, keyspace, table):
+        table_key = '{}/{}'.format(keyspace, table)
+        self.get_client().delete_prefix(table_key)
+
+    def row_create(self, keyspace, table, pk_name, pk_value, values, atomic=False, conditional=True):
+        key = f'{keyspace}/{table}/{pk_value}'
+
+        values[pk_name] = pk_value
+        if self.validate_row(keyspace, table, values):
+            put_response = self.get_client().put(key, json.dumps(values))
+            return "SUCCESS" if put_response else "FAILURE"
+
+        return "FAILURE"
+
+    def row_update(self, keyspace, table, pk_name, pk_value, values, atomic=False, condition=True):
+        key = f'{keyspace}/{table}/{pk_value}'
+
+        values[pk_name] = pk_value
+        client = self.get_client()
+        if client.get(key) and self.validate_row(keyspace, table, values):
+            put_response = client.put(key, json.dumps(values))
+            return "SUCCESS" if put_response else "FAILURE"
+
+        return "FAILURE"
+
+    def row_read(self, keyspace, table, pk_name=None, pk_value=None):
+        schema = self.get_value(f'{keyspace}/{table}')
+        if pk_name and pk_value and schema["PRIMARY KEY"] == f'({pk_name})':
+            key = f'{keyspace}/{table}/{pk_value}'
+            return {pk_value: self.get_value(key)}
+
+        key_prefix = f'{keyspace}/{table}/'
+        return self.get_values_prefix(key_prefix, pk_name, pk_value)
+
+    def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False):
+        key = f'{keyspace}/{table}/{pk_value}'
+        self.get_client().delete(key)
+
+    def row_insert_by_condition(self, keyspace, table, pk_name, pk_value, values, exists_status):
+        key = f'{keyspace}/{table}/{pk_value}'
+        values[pk_name] = pk_value
+        exists_values = copy.deepcopy(values)
+        exists_values["status"] = exists_status
+        client = self.get_client()
+        client.transaction(
+            compare=[
+                client.transactions.version(key) > 0,
+            ],
+            success=[
+                client.transactions.put(key, json.dumps(exists_values))
+            ],
+            failure=[
+                client.transactions.put(key, json.dumps(values))
+            ]
+        )
+
+    def row_complex_field_update(self, keyspace, table, pk_name, pk_value, plan_id, updated_fields, values):
+        key = f'{keyspace}/{table}/{pk_value}'
+        value = self.get_value(key)
+        plans = value.get('plans')
+        plans.put(plan_id, updated_fields)
+        value.put('plans', plans)
+        self.get_client().put(key, value)
+
+    def index_create(self, keyspace, table, index):
+        # Since index is irrelevant in a KV store, this method will do nothing
+        pass
+
+    def lock_create(self, keyspace, table, pk_value):
+        lock_name = f'{keyspace}.{table}.{pk_value}'
+        return self.get_client().lock(lock_name)
+
+    def lock_acquire(self, lock):
+        return lock.acquire()
+
+    def lock_delete(self, lock):
+        return lock.release()
diff --git a/conductor/conductor/common/etcd/utils.py b/conductor/conductor/common/etcd/utils.py
new file mode 100644 (file)
index 0000000..f6cf46b
--- /dev/null
@@ -0,0 +1,32 @@
+#
+# -------------------------------------------------------------------------
+#   Copyright (C) 2021 Wipro Limited.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+
+def validate_schema(values, schema):
+    primary_key = schema.get("PRIMARY KEY").lstrip('(').rstrip(')')
+    if primary_key not in list(values.keys()):
+        return False
+    for key, value in values.items():
+        if key not in list(schema.keys()):
+            return False
+    return True
+
+
+class EtcdClientException(Exception):
+    pass
index 5a7c528..e889113 100644 (file)
@@ -24,6 +24,7 @@ from oslo_log import log
 
 import conductor.api.controllers.v1.plans
 from conductor.common import config_loader
+import conductor.common.etcd.api
 import conductor.common.music.api
 from conductor.common.utils import cipherUtils
 import conductor.data.plugins.inventory_provider.aai
@@ -119,6 +120,8 @@ def load_secrets():
     config.set_override('aaf_conductor_user', secret_dict['aaf_api']['aaf_conductor_user'], 'aaf_api')
     config.set_override('username', secret_dict['sdc']['username'], 'sdc')
     config.set_override('password', decrypt_pass(secret_dict['sdc']['password']), 'sdc')
+    config.set_override('username', secret_dict['etcd_api']['username'], 'etcd_api')
+    config.set_override('password', decrypt_pass(secret_dict['etcd_api']['password']), 'etcd_api')
 
 
 def decrypt_pass(passwd):
diff --git a/conductor/conductor/tests/unit/common/etcd/__init__.py b/conductor/conductor/tests/unit/common/etcd/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/conductor/conductor/tests/unit/common/etcd/test_utils.py b/conductor/conductor/tests/unit/common/etcd/test_utils.py
new file mode 100644 (file)
index 0000000..f6e03c2
--- /dev/null
@@ -0,0 +1,43 @@
+#
+# -------------------------------------------------------------------------
+#   Copyright (C) 2021 Wipro Limited.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# -------------------------------------------------------------------------
+#
+
+import time
+import unittest
+from conductor.common.etcd.utils import validate_schema
+from conductor.common.models.plan import Plan
+
+
+class TestUtils(unittest.TestCase):
+
+    def test_validate_schema(self):
+        schema = Plan.schema()
+        plan = {
+            "id": "12345",
+            "status": "template",
+            "created": time.time(),
+            "updated": time.time(),
+            "name": "sample",
+            "timeout": 60
+        }
+        self.assertTrue(validate_schema(plan, schema))
+        plan["abc"] = "xyz"
+        self.assertFalse(validate_schema(plan, schema))
+        plan.pop("abc")
+        plan.pop("id")
+        self.assertFalse(validate_schema(plan, schema))
index 3cf31cc..8aa18ff 100644 (file)
@@ -29,3 +29,4 @@ prometheus-client>=0.3.1
 pycryptodomex==3.10.1
 jsonschema>=3.2.0
 tosca-parser>=2.2.0
+etcd3==0.12.0
index 2074c6c..472bb5b 100644 (file)
                     "password": "demo123456!",
                     "aaf_conductor_user": "oof@oof.onap.org"
                 }
+            },
+            {
+                "name": "etcd_api",
+                "values": {
+                    "username": "conductor",
+                    "password": "conductor"
+                }
             }
         ]
     }