2 # -------------------------------------------------------------------------
3 # Copyright (c) 2015-2017 AT&T Intellectual Property
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
17 # -------------------------------------------------------------------------
20 """Music Data Store API"""
27 from oslo_config import cfg
28 from oslo_log import log
30 from conductor.common import rest
31 from conductor.common.utils import basic_auth_util
32 from conductor.i18n import _LE, _LI # pylint: disable=W0212
33 from conductor.common.utils import cipherUtils
35 LOG = log.getLogger(__name__)
42 cfg.StrOpt('server_url',
43 default='http://controller:8080/MUSIC/rest/v2',
44 help='Base URL for Music REST API without a trailing slash.'),
45 cfg.ListOpt('hostnames',
46 deprecated_for_removal=True,
47 deprecated_reason='Use server_url instead',
48 help='List of hostnames (round-robin access)'),
50 deprecated_for_removal=True,
51 deprecated_reason='Use server_url instead',
54 deprecated_for_removal=True,
55 deprecated_reason='Use server_url instead',
57 cfg.FloatOpt('connect_timeout',
59 help='Socket connection timeout'),
60 cfg.FloatOpt('read_timeout',
62 help='Socket read timeout'),
63 cfg.IntOpt('lock_timeout',
66 cfg.IntOpt('replication_factor',
68 help='Replication factor'),
72 cfg.StrOpt('music_topology',
73 default='SimpleStrategy'),
74 #TODO(larry); make the config inputs more generic
75 cfg.StrOpt('first_datacenter_name',
76 help='Name of the first data center'),
77 cfg.IntOpt('first_datacenter_replicas',
78 help='Number of replicas in first data center'),
79 cfg.StrOpt('second_datacenter_name',
80 help='Name of the second data center'),
81 cfg.IntOpt('second_datacenter_replicas',
82 help='Number of replicas in second data center'),
83 cfg.StrOpt('third_datacenter_name',
84 help='Name of the third data center'),
85 cfg.IntOpt('third_datacenter_replicas',
86 help='Number of replicas in third data center'),
87 cfg.BoolOpt('music_new_version', help='new or old version'),
88 cfg.BoolOpt('enable_https_mode', help='enable HTTPs mode for music connection'),
89 cfg.StrOpt('music_version', help='for version'),
90 cfg.StrOpt('aafuser', help='username value that used for creating basic authorization header'),
91 cfg.StrOpt('aafpass', help='password value that used for creating basic authorization header'),
92 cfg.StrOpt('aafns', help='AAF namespace field used in MUSIC request header'),
93 cfg.StrOpt('certificate_authority_bundle_file',
94 default='certificate_authority_bundle.pem',
95 help='Certificate Authority Bundle file in pem format. '
96 'Must contain the appropriate trust chain for the '
100 CONF.register_opts(MUSIC_API_OPTS, group='music_api')
103 class MusicAPI(object):
104 """Wrapper for Music API"""
106 lock_ids = None # Cache of lock ids, indexed by name
107 lock_timeout = None # Maximum time in seconds to acquire a lock
109 rest = None # API Endpoint
110 replication_factor = None # Number of Music nodes to replicate across
116 # set the urllib log level to ERROR
117 logging.getLogger('urllib3').setLevel(logging.ERROR)
119 LOG.info(_LI("Initializing Music API"))
120 server_url = CONF.music_api.server_url.rstrip('/')
122 # host/port/path are deprecated and should not be used anymore.
123 # Defaults removed from oslo_config to give more incentive.
125 # No more round robin either. Just take the first entry.
126 host = next(iter(CONF.music_api.hostnames or []), 'controller')
127 port = CONF.music_api.port or 8080
128 path = CONF.music_api.path or '/MUSIC/rest'
129 version = CONF.version
130 server_url = 'http://{}:{}/{}'.format(
131 host, port, version, path.rstrip('/').lstrip('/'))
134 'server_url': server_url,
135 'log_debug': CONF.music_api.debug,
136 'connect_timeout': CONF.music_api.connect_timeout,
137 'read_timeout': CONF.music_api.read_timeout,
139 self.rest = rest.REST(**kwargs)
141 music_pwd = cipherUtils.AESCipher.get_instance().decrypt(CONF.music_api.aafpass)
142 # Set one parameter for connection mode
143 # Currently depend on music version
144 if CONF.music_api.enable_https_mode:
145 self.rest.server_url = 'https://{}:{}/{}'.format(
146 host, port, version, path.rstrip('/').lstrip('/'))
147 self.rest.session.verify = CONF.music_api.certificate_authority_bundle_file
149 if CONF.music_api.music_new_version:
150 music_version = CONF.music_api.music_version.split(".")
152 self.rest.session.headers['content-type'] = 'application/json'
153 self.rest.session.headers['X-minorVersion'] = music_version[1]
154 self.rest.session.headers['X-patchVersion'] = music_version[2]
155 self.rest.session.headers['ns'] = CONF.music_api.aafns
156 self.rest.session.headers['userId'] = CONF.music_api.aafuser
157 self.rest.session.headers['password'] = music_pwd
158 self.rest.session.headers['Authorization'] = str(basic_auth_util.encode(CONF.music_api.aafuser,
163 # TODO(jdandrea): Allow override at creation time.
164 self.lock_timeout = CONF.music_api.lock_timeout
165 self.replication_factor = CONF.music_api.replication_factor
166 self.music_topology = CONF.music_api.music_topology
168 # TODO(larry) make the code more generic
169 self.first_datacenter_name = CONF.music_api.first_datacenter_name
170 self.first_datacenter_replicas = CONF.music_api.first_datacenter_replicas
171 self.second_datacenter_name = CONF.music_api.second_datacenter_name
172 self.second_datacenter_replicas = CONF.music_api.second_datacenter_replicas
173 self.third_datacenter_name = CONF.music_api.third_datacenter_name
174 self.third_datacenter_replicas = CONF.music_api.third_datacenter_replicas
180 if type(self.lock_ids) is dict:
181 for lock_name in list(self.lock_ids.keys()): # Python 3 Conversion -- dict object to list object
182 self.lock_delete(lock_name)
185 def _row_url_path(keyspace, table, pk_name, pk_value):
186 """Returns a Music-compliant row URL path."""
187 path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
188 'keyspace': keyspace,
192 if pk_name and pk_value:
193 path += '?%s=%s' % (pk_name, pk_value)
197 def _lock_name_generate(keyspace, table, pk_value):
198 """Generate a lock name."""
200 # The Music API dictates that the lock name must be of the form
201 # keyspace.table.primary_key
202 lock_name = '%(keyspace)s.%(table)s.%(primary_key)s' % {
203 'keyspace': keyspace,
205 'primary_key': pk_value,
209 def _lock_id_create(self, lock_name):
210 """Returns the lock id. Use for acquiring and releasing."""
212 path = '/locks/create/%s' % lock_name
213 response = self.rest.request(method='post',
214 content_type='text/plain', path=path)
216 if response and response.ok:
217 lock_id = response.text
220 def _lock_id_acquire(self, lock_id):
222 """Acquire a lock by id. Returns True if successful."""
223 path = '/locks/acquire/%s' % lock_id
224 response = self.rest.request(method='get',
225 content_type='text/plain', path=path)
227 if response and response.ok:
228 status = (response.text.lower() == 'true')
231 def _lock_id_release(self, lock_id):
232 """Release a lock by id. Returns True if successful."""
233 path = '/locks/release/%s' % lock_id
234 response = self.rest.request(method='delete',
235 content_type='text/plain', path=path)
236 return response and response.ok
238 def payload_init(self, keyspace=None, table=None,
239 pk_value=None, atomic=False, condition=None):
240 """Initialize payload for Music requests.
242 Supports atomic operations,
243 Returns a payload of data and lock_name (if any).
246 # lock_name = self.lock_create(keyspace, table, pk_value)
250 #lock_id = self.lock_ids.get(lock_name)
259 data['conditions'] = condition
261 #, 'lock_name': lock_name
262 return {'data': data}
264 def payload_delete(self, payload):
265 """Delete payload for Music requests. Cleans up atomic operations."""
267 # Doesn't actually delete the payload.
268 # We just delete the lock inside of it!
269 # This way payload_init/payload_delete is paired up neatly.
270 lock_name = payload.get('lock_name')
272 self.lock_delete(lock_name)
274 def keyspace_create(self, keyspace):
276 """Creates a keyspace."""
277 payload = self.payload_init()
278 data = payload.get('data')
279 data['durabilityOfWrites'] = True
282 'class': self.music_topology,
285 if self.music_topology == 'SimpleStrategy':
286 replication_info['replication_factor'] = self.replication_factor
287 elif self.music_topology == 'NetworkTopologyStrategy':
288 if self.first_datacenter_name and self.first_datacenter_replicas:
289 replication_info[self.first_datacenter_name] = self.first_datacenter_replicas
290 if self.second_datacenter_name and self.second_datacenter_replicas:
291 replication_info[self.second_datacenter_name] = self.second_datacenter_replicas
292 if self.third_datacenter_name and self.third_datacenter_replicas:
293 replication_info[self.third_datacenter_name] = self.third_datacenter_replicas
295 data['replicationInfo'] = replication_info
297 path = '/keyspaces/%s' % keyspace
298 if CONF.music_api.debug:
299 LOG.debug("Creating keyspace {}".format(keyspace))
300 response = self.rest.request(method='post', path=path, data=data)
302 if response and CONF.music_api.music_new_version:
303 result = response.json().get('result')
306 return response and response.ok
308 def keyspace_delete(self, keyspace):
309 """Drops a keyspace."""
310 payload = self.payload_init()
311 data = payload.get('data')
313 path = '/keyspaces/%s' % keyspace
314 if CONF.music_api.debug:
315 LOG.debug("Deleting keyspace {}".format(keyspace))
316 response = self.rest.request(method='delete', path=path, data=data)
317 return response and response.ok
319 def lock_create(self, keyspace, table, pk_value):
320 """Create and acquire a lock. Returns a lock name."""
322 # Generate the lock name, then create/acquire the lock id.
323 lock_name = self._lock_name_generate(keyspace, table, pk_value)
324 if CONF.music_api.debug:
325 LOG.debug("Creating lock {}".format(lock_name))
326 lock_id = self._lock_id_create(lock_name)
327 time_now = time.time()
328 while not self._lock_id_acquire(lock_id):
329 if time.time() - time_now > self.lock_timeout:
331 _LE('Lock id acquire timeout: %s') % lock_name)
333 # Cache the lock name/id.
334 self.lock_ids[lock_name] = lock_id
337 def lock_release(self, lock_name):
338 """Release lock by name. Returns True if successful"""
340 # No need to delete the lock. lock_create() will not complain
341 # if a lock with the same name is created later.
342 if CONF.music_api.debug:
343 LOG.debug("Releasing lock {}".format(lock_name))
345 return self._lock_id_release(self.lock_ids.get(lock_name))
347 def lock_delete(self, lock_name):
348 """Delete a lock by name. Returns True if successful."""
349 path = '/locks/delete/%s' % lock_name
350 if CONF.music_api.debug:
351 LOG.debug("Deleting lock {}".format(lock_name))
352 response = self.rest.request(content_type='text/plain',
353 method='delete', path=path)
354 if response and response.ok:
355 del self.lock_ids[lock_name]
356 return response and response.ok
358 def row_create(self, keyspace, table, # pylint: disable=R0913
359 pk_name, pk_value, values, atomic=False, conditional=False):
361 payload = self.payload_init(keyspace, table, pk_value, atomic)
362 data = payload.get('data')
363 data['values'] = values
365 path = '/keyspaces/%(keyspace)s/tables/%(table)s/rows' % {
366 'keyspace': keyspace,
369 if CONF.music_api.debug:
370 LOG.debug("Creating row with pk_value {} in table "
371 "{}, keyspace {}".format(pk_value, table, keyspace))
372 response = self.rest.request(method='post', path=path, data=data)
373 self.payload_delete(payload)
374 return response and response.ok
376 def row_update(self, keyspace, table, # pylint: disable=R0913
377 pk_name, pk_value, values, atomic=False, condition=None):
379 payload = self.payload_init(keyspace, table, pk_value, atomic, condition)
380 data = payload.get('data')
381 data['values'] = values
383 path = self._row_url_path(keyspace, table, pk_name, pk_value)
384 if CONF.music_api.debug:
385 LOG.debug("Updating row with pk_value {} in table "
386 "{}, keyspace {}".format(pk_value, table, keyspace))
387 response = self.rest.request(method='put', path=path, data=data)
388 #self.payload_delete(payload)
389 if response is not None and CONF.music_api.music_new_version:
390 response_json = json.loads(response.content)
391 response_status = response_json.get("status")
392 return response_status
394 return response and response.ok and response.content
396 def row_read(self, keyspace, table, pk_name=None, pk_value=None):
397 """Read one or more rows. Not atomic."""
398 path = self._row_url_path(keyspace, table, pk_name, pk_value)
399 if CONF.music_api.debug:
400 LOG.debug("Reading row with pk_value {} from table "
401 "{}, keyspace {}".format(pk_value, table, keyspace))
402 response = self.rest.request(path=path)
404 if response is not None and CONF.music_api.music_new_version:
405 result = response.json().get('result') or {}
408 return response and response.json()
410 def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False):
412 payload = self.payload_init(keyspace, table, pk_value, atomic)
413 data = payload.get('data')
415 path = self._row_url_path(keyspace, table, pk_name, pk_value)
416 if CONF.music_api.debug:
417 LOG.debug("Deleting row with pk_value {} from table "
418 "{}, keyspace {}".format(pk_value, table, keyspace))
419 response = self.rest.request(method='delete', path=path, data=data)
420 self.payload_delete(payload)
421 return response and response.ok
423 def row_insert_by_condition(self, keyspace, table, pk_name, pk_value, values, exists_status):
425 """Insert a row with certain condition."""
426 # Get the plan id from plans field
427 plan_id = next(iter(values.get('plans')))
429 # If id does not exist in order_locks table, insert the 'values_when_id_non_exist'
430 values_when_id_non_exist = values.get('plans')[plan_id]
432 # If id exists in order_locks table, insert the 'values_when_id_exist'
433 values_when_id_exist = copy.deepcopy(values_when_id_non_exist)
434 values_when_id_exist['status'] = exists_status
436 # Common values for new MUSIC api
437 common_values = copy.deepcopy(values_when_id_non_exist)
438 common_values.pop('status', None)
440 if (CONF.music_api.music_new_version):
441 # Conditional Insert request body sends to new version of MUSIC (2.5.5 and lator)
443 "primaryKey": pk_name,
444 "primaryKeyValue": pk_value,
446 "casscadeColumnName": "plans",
449 "is_spinup_completed": values.get('is_spinup_completed')
451 "casscadeColumnData": {
453 "value": common_values
457 "status": values_when_id_exist.get('status')
460 "status": values_when_id_non_exist.get('status')
467 "primaryKey": pk_name,
468 "primaryKeyValue": pk_value,
469 "cascadeColumnKey": plan_id,
470 "cascadeColumnName": "plans",
473 "is_spinup_completed": values.get('is_spinup_completed')
475 "nonExistsCondition": {
476 "value": values_when_id_non_exist
479 "value": values_when_id_exist
483 #conditional/update/keyspaces/conductor_order_locks/tables/order_locks
484 path = '/conditional/insert/keyspaces/%(keyspace)s/tables/%(table)s' % {
485 'keyspace': keyspace,
488 response = self.rest.request(method='post', path=path, data=data)
491 def index_create(self, keyspace, table, index):
493 """Create indexes for a particular table"""
495 path = '/keyspaces/%(keyspace)s/tables/%(table)s/index/%(field)s' % {
496 'keyspace': keyspace,
500 response = self.rest.request(method='post', path=path)
503 def row_complex_field_update(self, keyspace, table, pk_name, pk_value, plan_id, updated_fields, values):
505 if (CONF.music_api.music_new_version):
506 # new version of MUSIC
508 "primaryKey": pk_name,
509 "primaryKeyValue": pk_value,
510 "casscadeColumnName": "plans",
511 "tableValues": values,
512 "casscadeColumnData": {
514 "value": updated_fields
519 "primaryKey": pk_name,
520 "primaryKeyValue": pk_value,
521 "cascadeColumnName": "plans",
523 "updateStatus": updated_fields,
527 path = '/conditional/update/keyspaces/%(keyspace)s/tables/%(table)s' % {
528 'keyspace': keyspace,
531 response = self.rest.request(method='put', path=path, data=data)
532 LOG.debug("Updated the order {} status to {} for conflict_id {} in "
533 "order_locks table, response from MUSIC {}".format(plan_id, updated_fields, pk_value, response))
534 return response and response.ok
538 def _table_path_generate(keyspace, table):
539 path = '/keyspaces/%(keyspace)s/tables/%(table)s/' % {
540 'keyspace': keyspace,
545 def table_create(self, keyspace, table, schema):
546 """Creates a table."""
547 payload = self.payload_init()
548 data = payload.get('data')
549 data['fields'] = schema
551 path = self._table_path_generate(keyspace, table)
552 if CONF.music_api.debug:
553 LOG.debug("Creating table {}, keyspace {}".format(table, keyspace))
554 response = self.rest.request(method='post', path=path, data=data)
555 return response and response.ok
557 def table_delete(self, keyspace, table):
558 """Creates a table."""
559 payload = self.payload_init()
560 data = payload.get('data')
562 path = self._table_path_generate(keyspace, table)
563 if CONF.music_api.debug:
564 LOG.debug("Deleting table {}, keyspace {}".format(table, keyspace))
565 response = self.rest.request(method='delete', path=path, data=data)
566 return response and response.ok
569 """Returns version string."""
571 if CONF.music_api.debug:
572 LOG.debug("Requesting version info")
573 response = self.rest.request(method='get',
574 content_type='text/plain', path=path)
575 return response and response.text
578 class MockAPI(object):
579 """Wrapper for Music API"""
581 # Mock state for Music
588 LOG.info(_LI("Initializing Music Mock API"))
592 self.music['keyspaces'] = {}
597 def _keyspaces(self):
598 return self.music.get('keyspaces')
600 def _set_keyspace(self, keyspace):
601 self._keyspaces[keyspace] = {}
603 def _unset_keyspace(self, keyspace):
604 self._keyspaces.pop(keyspace)
606 def _set_table(self, keyspace, table):
607 self._keyspaces[keyspace][table] = {}
609 def _set_index(self, keyspace, table):
610 self._keyspaces[keyspace][table] = {}
612 def _unset_table(self, keyspace, table):
613 self._keyspaces[keyspace].pop(table)
615 def _get_row(self, keyspace, table, key=None):
618 for row_key, row in self._keyspaces[keyspace][table].items():
619 if not key or key == row_key:
621 rows['row {}'.format(row_num)] = copy.deepcopy(row)
624 def _set_row(self, keyspace, table, key, row):
625 self._keyspaces[keyspace][table][key] = row
627 def _unset_row(self, keyspace, table, row):
628 self._keyspaces[keyspace][table].pop(row)
630 def keyspace_create(self, keyspace):
631 """Creates a keyspace."""
632 if CONF.music_api.debug:
633 LOG.debug("Creating keyspace {}".format(keyspace))
634 self._set_keyspace(keyspace)
637 def keyspace_delete(self, keyspace):
638 """Drops a keyspace."""
639 if CONF.music_api.debug:
640 LOG.debug("Deleting keyspace {}".format(keyspace))
641 self._unset_keyspace(keyspace)
644 def row_create(self, keyspace, table, # pylint: disable=R0913
645 pk_name, pk_value, values, atomic=False):
647 if CONF.music_api.debug:
648 LOG.debug("Creating row with pk_value {} in table "
649 "{}, keyspace {}".format(pk_value, table, keyspace))
650 self._set_row(keyspace, table, pk_value, values)
653 def row_update(self, keyspace, table, # pylint: disable=R0913
654 pk_name, pk_value, values, atomic=False):
656 if CONF.music_api.debug:
657 LOG.debug("Updating row with pk_value {} in table "
658 "{}, keyspace {}".format(pk_value, table, keyspace))
659 self._set_row(keyspace, table, pk_value, values)
662 def row_read(self, keyspace, table, pk_name=None, pk_value=None):
663 """Read one or more rows. Not atomic."""
664 if CONF.music_api.debug:
665 LOG.debug("Reading row with pk_value {} from table "
666 "{}, keyspace {}".format(pk_value, table, keyspace))
667 values = self._get_row(keyspace, table, pk_value)
670 def row_delete(self, keyspace, table, pk_name, pk_value, atomic=False):
672 if CONF.music_api.debug:
673 LOG.debug("Deleting row with pk_value {} from table "
674 "{}, keyspace {}".format(pk_value, table, keyspace))
675 self._unset_row(keyspace, table, pk_value)
678 def table_create(self, keyspace, table, schema):
679 """Creates a table."""
680 if CONF.music_api.debug:
681 LOG.debug("Creating table {}, keyspace {}".format(table, keyspace))
682 self._set_table(keyspace, table)
685 def index_create(self, keyspace, table, index=None):
686 """Creates a index."""
687 if CONF.music_api.debug:
688 LOG.debug("Creating index {}, keyspace {}".format(table, keyspace))
689 self._set_index(keyspace, table)
692 def table_delete(self, keyspace, table):
693 """Creates a table."""
694 if CONF.music_api.debug:
695 LOG.debug("Deleting table {}, keyspace {}".format(table, keyspace))
696 self._unset_table(keyspace, table)
700 """Returns version string."""
701 if CONF.music_api.debug:
702 LOG.debug("Requesting version info")
707 """Wrapper for Music and Music Mock API"""
709 # FIXME(jdandrea): Follow more formal practices for defining/using mocks
710 if CONF.music_api.mock: