1 # ============LICENSE_START=======================================================
3 # ================================================================================
4 # Copyright (c) 2017 AT&T Intellectual Property. All rights reserved.
5 # ================================================================================
6 # Licensed under the Apache License, Version 2.0 (the "License");
7 # you may not use this file except in compliance with the License.
8 # You may obtain a copy of the License at
10 # http://www.apache.org/licenses/LICENSE-2.0
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 # ============LICENSE_END=========================================================
19 # ECOMP is a trademark and service mark of AT&T Intellectual Property.
21 # -*- coding: utf-8 -*-
23 Provides the mock catalog
29 from functools import partial
30 from datetime import datetime
34 from sqlalchemy import create_engine as create_engine_, event, and_, or_
35 from sqlalchemy.exc import IntegrityError
36 from sqlalchemy.orm import sessionmaker
37 from sqlalchemy.orm.exc import NoResultFound
38 from sqlalchemy_utils import database_exists, create_database, drop_database
40 from dcae_cli import _version as cli_version
41 from dcae_cli.catalog.mock.tables import Component, Format, FormatPair, Base
42 from dcae_cli.catalog.mock.schema import validate_component, validate_format, apply_defaults_docker_config
43 from dcae_cli.util import reraise_with_msg, get_app_dir
44 from dcae_cli.util.config import get_config, get_path_component_spec, \
46 from dcae_cli.util.logger import get_logger
47 from dcae_cli.util.docker_util import image_exists
48 from dcae_cli.catalog.exc import CatalogError, DuplicateEntry, MissingEntry, FrozenEntry
49 from dcae_cli.util.cdap_util import normalize_cdap_params
52 logger = get_logger('Catalog')
56 def _get_component(session, name, version):
57 '''Returns a single component ORM'''
60 query = session.query(Component).filter(Component.name==name).order_by(Component.version.desc()).limit(1)
62 query = session.query(Component).filter(Component.name==name, Component.version==version)
65 comp_msg = "{}:{}".format(name, version) if version else name
66 raise MissingEntry("Component '{}' was not found in the catalog".format(comp_msg))
68 def _get_docker_image_from_spec(spec):
69 images = [ art["uri"] for art in spec["artifacts"] if art["type"] == "docker image" ]
72 def _get_docker_image(session, name, version):
73 '''Returns the docker image name of a given component'''
74 comp = _get_component(session, name, version)
75 return _get_docker_image_from_spec(comp.get_spec_as_dict())
77 def _add_docker_component(session, user, spec, update, enforce_image=True):
78 '''Adds/updates a docker component to the catalog'''
79 image = _get_docker_image_from_spec(spec)
81 if enforce_image and not image_exists(image):
82 raise CatalogError("Specified image '{}' does not exist locally.".format(image))
84 comp = build_generic_component(session, user, spec, update)
87 def _get_cdap_jar_from_spec(spec):
88 jars = [ art["uri"] for art in spec["artifacts"] if art["type"] == "jar" ]
91 def _add_cdap_component(session, user, spec, update):
92 '''Adds/updates a cdap component to the catalog'''
93 comp = build_generic_component(session, user, spec, update)
98 @contextlib.contextmanager
99 def SessionTransaction(engine):
100 '''Provides a transactional scope around a series of operations'''
101 Session = sessionmaker(engine)
106 except IntegrityError as e:
108 _raise_if_duplicate(str(engine.url), e)
117 _dup_e = DuplicateEntry('Entry already exists. Try using the --update flag.')
119 def _raise_if_duplicate(url, e):
120 '''Raises if the exception relates to duplicate entries'''
122 if 'UNIQUE' in e.orig.args[0].upper():
124 elif 'postgres' in url:
125 # e.orig is of type psycopg2.IntegrityError that has
126 # pgcode which uses the following:
128 # https://www.postgresql.org/docs/current/static/errcodes-appendix.html#ERRCODES-TABLE
130 # 23505 means "unique_violation"
131 if e.orig.pgcode == "23505":
134 def create_engine(base, db_name=None, purge_existing=False, db_url=None):
135 '''Returns an initialized database engine'''
138 # no url or db name indicates we want to use the tool's configured db
139 config = get_config()
140 url = config['db_url']
142 # if only a db name is given, interpret as a sqlite db in the app dir. this maintains backwards compat with existing tests.
143 db_path = os.path.join(get_app_dir(), db_name)
144 url = ''.join(('sqlite:///', db_path))
146 # a full db url is the most explicit input and should be used over other inputs if provided
149 if not database_exists(url):
155 engine = create_engine_(url)
156 _configure_engine(engine)
157 base.metadata.create_all(engine)
161 def _configure_engine(engine):
162 '''Performs additional db-specific configurations'''
163 str_url = str(engine.url)
164 if 'sqlite' in str_url:
165 event.listen(engine, 'connect', lambda conn, record: conn.execute('pragma foreign_keys=ON'))
168 def get_format(session, name, version):
169 '''Returns a single data format ORM'''
172 query = session.query(Format).filter(Format.name==name).order_by(Format.version.desc()).limit(1)
174 query = session.query(Format).filter(Format.name==name, Format.version==version)
176 except NoResultFound:
177 msg = "{}:{}".format(name, version) if version else name
178 raise MissingEntry("Data format '{}' was not found in the catalog.".format(msg))
180 def _create_format_tuple(entry):
181 '''Create tuple to identify format'''
182 return (entry['format'], entry['version'])
185 def _get_format_pair(session, req_name, req_version, resp_name, resp_version, create=True):
186 '''Returns a single data format pair ORM'''
187 req = get_format(session, req_name, req_version)
188 resp = get_format(session, resp_name, resp_version)
190 query = session.query(FormatPair).filter(and_(FormatPair.req == req, FormatPair.resp == resp))
193 except NoResultFound:
195 raise MissingEntry("Data format pair with request '{}:{}' and response '{}:{}' was not found in the catalog.".format(req.name, req.version, resp.name, resp.version))
197 pair = FormatPair(req=req, resp=resp)
201 def _create_format_pair_tuple(entry):
202 '''Create tuple to identify format pair'''
203 req_name, req_version = entry['request']['format'], entry['request']['version']
204 resp_name, resp_version = entry['response']['format'], entry['response']['version']
205 return (req_name, req_version, resp_name, resp_version)
207 def _get_unique_format_things(create_tuple, get_func, entries):
208 '''Get unique format things (formats, format pairs, ..)
212 create_tuple: Function that has the signature dict->tuple
213 get_func: Function that has the signature *tuple->orm
214 entries: list of dicts that have data format details that come from
215 streams.publishes, streams.subscribes, services.calls, services.provides
221 src = set(create_tuple(entry) for entry in entries)
222 return [get_func(*yo) for yo in src]
225 def verify_component(session, name, version):
226 '''Returns the orm name and version of a given component'''
227 orm = _get_component(session, name, version)
228 return orm.name, orm.version
231 def get_component_type(session, name, version):
232 '''Returns the component_type of a given component'''
233 return _get_component(session, name, version).component_type
236 def get_component_spec(session, name, version):
237 '''Returns the spec dict of a given component'''
238 return json.loads(_get_component(session, name, version).spec)
241 def get_format_spec(session, name, version):
242 '''Returns the spec dict of a given data format'''
243 return json.loads(get_format(session, name, version).spec)
246 def build_generic_component(session, user, spec, update):
247 '''Builds, adds, and returns a generic component ORM. Does not commit changes.'''
248 attrs = spec['self'].copy()
249 attrs['spec'] = json.dumps(spec)
251 # TODO: This should really come from the spec too
252 attrs['owner'] = user
254 # grab existing or create a new component
255 name, version = attrs['name'], attrs['version']
257 comp = _get_component(session, name, version)
258 if comp.is_published():
259 raise FrozenEntry("Component '{}:{}' has been pushed and cannot be updated".format(name, version))
264 # REVIEW: Inject these parameters as function arguments instead of this
266 # WATCH: This has to be done here before the code below because there is a
267 # commit somewhere below and since these fields are not nullable, you'll get a
269 comp.cli_version = cli_version.__version__
270 comp.schema_path = get_path_component_spec()
273 for attr, val in six.iteritems(attrs):
274 setattr(comp, attr, val)
276 # update relationships
277 get_format_local = partial(get_format, session)
278 get_unique_formats = partial(_get_unique_format_things, _create_format_tuple,
282 comp.publishes = get_unique_formats(spec['streams']['publishes'])
283 except MissingEntry as e:
284 reraise_with_msg(e, 'Add failed while traversing "publishes"')
287 comp.subscribes = get_unique_formats(spec['streams']['subscribes'])
288 except MissingEntry as e:
289 reraise_with_msg(e, 'Add failed while traversing "subscribes"')
291 get_format_pairs = partial(_get_format_pair, session)
292 get_unique_format_pairs = partial(_get_unique_format_things,
293 _create_format_pair_tuple, get_format_pairs)
296 comp.provides = get_unique_format_pairs(spec['services']['provides'])
297 except MissingEntry as e:
298 reraise_with_msg(e, 'Add failed while traversing "provides"')
301 comp.calls = get_unique_format_pairs(spec['services']['calls'])
302 except MissingEntry as e:
303 reraise_with_msg(e, 'Add failed while traversing "calls"')
308 def add_format(session, spec, user, update):
309 '''Helper function which adds a data format to the catalog'''
310 attrs = spec['self'].copy()
311 attrs['spec'] = json.dumps(spec)
312 name, version = attrs['name'], attrs['version']
314 # TODO: This should really come from the spec too
315 attrs['owner'] = user
318 data_format = get_format(session, name, version)
319 if data_format.is_published():
320 raise FrozenEntry("Data format {}:{} has been pushed and cannot be updated".format(name, version))
322 data_format = Format()
323 session.add(data_format)
326 for attr, val in six.iteritems(attrs):
327 setattr(data_format, attr, val)
329 # REVIEW: Inject these parameters as function arguments instead of this
331 data_format.cli_version = cli_version.__version__
332 data_format.schema_path = get_path_data_format()
337 def _filter_neighbors(session, neighbors=None):
338 '''Returns a Component query filtered by available neighbors'''
339 if neighbors is None:
340 query = session.query(Component)
342 subfilt = or_(and_(Component.name==n, Component.version==v) for n,v in neighbors)
343 query = session.query(Component).filter(subfilt)
347 def get_subscribers(session, orm, neighbors=None):
348 '''Returns a list of component ORMs which subscribe to the specified format'''
349 query = _filter_neighbors(session, neighbors)
350 return query.filter(Component.subscribes.contains(orm)).all()
353 def get_providers(session, orm, neighbors=None):
354 '''Returns a list of component ORMs which provide the specified format pair'''
355 query = _filter_neighbors(session, neighbors)
356 return query.filter(Component.provides.contains(orm)).all()
359 def _match_pub(entries, orms):
360 '''Aligns the publishes orms with spec entries to get the config key'''
361 lookup = {(orm.name, orm.version): orm for orm in orms}
362 for entry in entries:
363 if "http" not in entry["type"]:
366 key = (entry['format'], entry['version'])
367 yield entry['config_key'], lookup[key]
370 def _match_call(entries, orms):
371 '''Aligns the calls orms with spec entries to get the config key'''
372 lookup = {(orm.req.name, orm.req.version, orm.resp.name, orm.resp.version): orm for orm in orms}
373 for entry in entries:
374 key = (entry['request']['format'], entry['request']['version'], entry['response']['format'], entry['response']['version'])
375 yield entry['config_key'], lookup[key]
377 def get_discovery(get_params_func, session, name, version, neighbors=None):
378 '''Returns the parameters and interface map for a given component and considering its neighbors'''
379 comp = _get_component(session, name, version)
380 spec = json.loads(comp.spec)
382 for key, orm in _match_pub(spec['streams']['publishes'], comp.publishes):
383 interfaces[key] = [(c.name, c.version) for c in get_subscribers(session, orm, neighbors) if not c is comp]
385 for key, orm in _match_call(spec['services']['calls'], comp.calls):
386 interfaces[key] = [(c.name, c.version) for c in get_providers(session, orm, neighbors) if not c is comp]
388 params = get_params_func(spec)
389 return params, interfaces
391 _get_discovery_for_cdap = partial(get_discovery, normalize_cdap_params)
392 _get_discovery_for_docker = partial(get_discovery,
393 lambda spec: {param['name']: param['value'] for param in spec['parameters']})
396 def _get_discovery_for_dmaap(get_component_spec_func, name, version):
397 """Get all config keys that are for dmaap streams
401 Tuple of message router config keys list, data router config keys list
403 spec = get_component_spec_func(name, version)
405 all_streams = spec["streams"].get("publishes", []) \
406 + spec["streams"].get("subscribes", [])
408 def is_for_message_router(stream):
409 return stream["type"] == "message router" \
410 or stream["type"] == "message_router"
412 mr_keys = [ stream["config_key"] for stream in filter(is_for_message_router, all_streams) ]
414 def is_for_data_router(stream):
415 return stream["type"] == "data router" \
416 or stream["type"] == "data_router"
418 dr_keys = [ stream["config_key"] for stream in filter(is_for_data_router, all_streams) ]
419 return mr_keys, dr_keys
422 def _filter_latest(orms):
423 '''Filters and yields only (name, version, *) orm tuples with the highest version'''
424 get_first_key_func = lambda x: x[0]
425 # itertools.groupby requires the input to be sorted
426 sorted_orms = sorted(orms, key=get_first_key_func)
427 for _, g in itertools.groupby(sorted_orms, get_first_key_func):
428 yield max(g, key=lambda x: x[1])
431 def list_components(session, user, only_published, subscribes=None, publishes=None,
432 provides=None, calls=None, latest=True):
433 """Get list of components
437 List of component orms as dicts
441 filters.extend(Component.subscribes.contains(get_format(session, n, v)) for n, v in subscribes)
443 filters.extend(Component.publishes.contains(get_format(session, n, v)) for n, v in publishes)
445 filters.extend(Component.provides.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
446 for (reqn, reqv), (respn, respv) in provides)
448 filters.extend(Component.calls.contains(_get_format_pair(session, reqn, reqv, respn, respv, create=False))
449 for (reqn, reqv), (respn, respv) in calls)
451 query = session.query(Component).filter(or_(*filters))
453 query = session.query(Component)
456 query = query.filter(Component.owner==user)
458 query = query.filter(Component.when_published!=None)
460 orms = ((orm.name, orm.version, orm.component_type, orm) for orm in query)
463 orms = _filter_latest(orms)
465 return [ orm.__dict__ for _, _, _, orm in orms ]
468 def _list_formats(session, user, only_published, latest=True):
469 """Get list of data formats
473 List of data format orms as dicts
475 query = session.query(Format).order_by(Format.modified.desc())
478 query = query.filter(Format.owner==user)
480 query = query.filter(Format.when_published!=None)
482 orms = [ (orm.name, orm.version, orm) for orm in query ]
485 orms = _filter_latest(orms)
486 return [ orm.__dict__ for _, _, orm in orms ]
489 def build_config_keys_map(spec):
490 """Build config keys map
494 Dict where each item:
496 <config_key>: { "group": <grouping>, "type": <http|message_router|data_router> }
498 where grouping includes "streams_publishes", "streams_subscribes", "services_calls"
500 # subscribing as http doesn't have config key
501 ss = [ (s["config_key"], { "group": "streams_subscribes", "type": s["type"] })
502 for s in spec["streams"]["subscribes"] if "config_key" in s]
503 sp = [ (s["config_key"], { "group": "streams_publishes", "type": s["type"] })
504 for s in spec["streams"]["publishes"] ]
505 sc = [ (s["config_key"], { "group": "services_calls" })
506 for s in spec["services"]["calls"] ]
507 return dict(ss+sp+sc)
510 def get_data_router_subscriber_route(spec, config_key):
511 """Get route by config key for data router subscriber
513 Utility method that parses the component spec
515 for s in spec["streams"].get("subscribes", []):
516 if s["type"] in ["data_router", "data router"] \
517 and s["config_key"] == config_key:
520 raise MissingEntry("No data router subscriber for {0}".format(config_key))
523 class MockCatalog(object):
525 def __init__(self, purge_existing=False, enforce_image=True, db_name=None, engine=None, db_url=None):
526 self.engine = create_engine(Base, db_name=db_name, purge_existing=purge_existing, db_url=db_url) if engine is None else engine
527 self.enforce_image = enforce_image
529 def add_component(self, user, spec, update=False):
530 '''Validates component specification and adds component to the mock catalog'''
531 validate_component(spec)
533 component_type = spec["self"]["component_type"]
535 with SessionTransaction(self.engine) as session:
536 if component_type == "cdap":
537 _add_cdap_component(session, user, spec, update)
538 elif component_type == "docker":
539 _add_docker_component(session, user, spec, update,
540 enforce_image=self.enforce_image)
542 raise CatalogError("Unknown component type: {0}".format(component_type))
544 def get_docker_image(self, name, version):
545 '''Returns the docker image name associated with this component'''
546 with SessionTransaction(self.engine) as session:
547 return _get_docker_image(session, name, version)
549 def get_docker(self, name, version):
550 with SessionTransaction(self.engine) as session:
551 comp = _get_component(session, name, version)
552 spec = comp.get_spec_as_dict()
553 # NOTE: Defaults are being applied for docker config here at read
554 # time. Not completely sure that this is the correct approach. The
555 # benefit is that defaults can be changed without altering the stored
556 # specs. It's a nice layering.
557 docker_config = apply_defaults_docker_config(spec["auxilary"])
558 return _get_docker_image_from_spec(spec), docker_config, spec
560 def get_docker_config(self, name, version):
561 _, docker_config, _ = self.get_docker(name, version)
564 def get_cdap(self, name, version):
565 '''Returns a tuple representing this cdap component
569 tuple(jar, config, spec)
571 URL where the CDAP jar is located.
573 A dictionary loaded from the CDAP JSON configuration file.
575 The dcae-cli component specification file.
577 with SessionTransaction(self.engine) as session:
578 comp = _get_component(session, name, version)
579 spec = comp.get_spec_as_dict()
580 cdap_config = spec["auxilary"]
581 return _get_cdap_jar_from_spec(spec), cdap_config, spec
583 def get_component_type(self, name, version):
584 '''Returns the component type associated with this component'''
585 with SessionTransaction(self.engine) as session:
586 return get_component_type(session, name, version)
588 def get_component_spec(self, name, version):
589 '''Returns the spec dict associated with this component'''
590 with SessionTransaction(self.engine) as session:
591 return get_component_spec(session, name, version)
593 def get_format_spec(self, name, version):
594 '''Returns the spec dict associated with this data format'''
595 with SessionTransaction(self.engine) as session:
596 return get_format_spec(session, name, version)
598 def add_format(self, spec, user, update=False):
599 '''Validates data format specification and adds data format to the mock catalog'''
600 validate_format(spec)
601 with SessionTransaction(self.engine) as session:
602 add_format(session, spec, user, update)
604 def get_discovery_for_cdap(self, name, version, neighbors=None):
605 '''Returns the parameters and interface map for a given component and considering its neighbors'''
606 with SessionTransaction(self.engine) as session:
607 return _get_discovery_for_cdap(session, name, version, neighbors)
609 def get_discovery_for_docker(self, name, version, neighbors=None):
610 '''Returns the parameters and interface map for a given component and considering its neighbors'''
611 with SessionTransaction(self.engine) as session:
612 return _get_discovery_for_docker(session, name, version, neighbors)
614 def get_discovery_for_dmaap(self, name, version):
615 with SessionTransaction(self.engine) as session:
616 get_component_spec_func = partial(get_component_spec, session)
617 return _get_discovery_for_dmaap(get_component_spec_func, name, version)
619 def get_discovery_from_spec(self, user, target_spec, neighbors=None):
620 '''Get pieces to generate configuration for the given target spec
622 This function is used to obtain the pieces needed to generate
623 the application configuration json: parameters map, interfaces map, dmaap
624 map. Where the input is a provided specification that hasn't been added to
625 the catalog - prospective specs - which includes a component that doesn't
626 exist or a new version of an existing spec.
630 Tuple of three elements:
632 - Dict of parameter name to parameter value
633 - Dict of "config_key" to list of (component.name, component.version)
634 known as "interface_map"
635 - Tuple of lists of "config_key" the first for message router the second
636 for data router known as "dmaap_map"
638 validate_component(target_spec)
640 with SessionTransaction(self.engine) as session:
641 # The following approach was taken in order to:
642 # 1. Re-use existing functionality e.g. implement fast
643 # 2. In order to make ORM-specific queries, I need the entire ORM
644 # in SQLAlchemy meaning I cannot do arbitrary DataFormatPair queries
646 name = target_spec["self"]["name"]
647 version = target_spec["self"]["version"]
650 # Build a component with update to True first because you may
651 # want to run this for an existing component
652 build_generic_component(session, user, target_spec, True)
654 # Since it doesn't exist already, build a new component
655 build_generic_component(session, user, target_spec, False)
657 # This is needed so that subsequent queries will "see" the component
660 ctype = target_spec["self"]["component_type"]
663 params, interface_map = _get_discovery_for_cdap(session, name,
665 elif ctype == "docker":
666 params, interface_map = _get_discovery_for_docker(session, name,
669 # Don't want to commit these changes so rollback.
672 # Use the target spec as the source to compile the config keys from
673 dmaap_config_keys = _get_discovery_for_dmaap(
674 lambda name, version: target_spec, name, version)
676 return params, interface_map, dmaap_config_keys
678 def verify_component(self, name, version):
679 '''Returns the component's name and version if it exists and raises an exception otherwise'''
680 with SessionTransaction(self.engine) as session:
681 return verify_component(session, name, version)
683 def list_components(self, subscribes=None, publishes=None, provides=None,
684 calls=None, latest=True, user=None, only_published=False):
685 '''Returns a list of component names which match the specified filter sequences'''
686 with SessionTransaction(self.engine) as session:
687 return list_components(session, user, only_published, subscribes,
688 publishes, provides, calls, latest)
690 def list_formats(self, latest=True, user=None, only_published=False):
691 """Get list of data formats
695 List of data formats as dicts
697 with SessionTransaction(self.engine) as session:
698 return _list_formats(session, user, only_published, latest)
700 def get_format(self, name, version):
703 Throws MissingEntry exception if no matches found.
707 Dict representation of data format
709 with SessionTransaction(self.engine) as session:
710 return get_format(session, name, version).__dict__
712 def _publish(self, get_func, user, name, version):
713 """Publish data format
717 get_func: Function that takes a session, name, version and outputs a data
718 object either Component or Format
722 True upon success else False
724 # TODO: To make function composeable, it should take in the data format
726 with SessionTransaction(self.engine) as session:
727 obj = get_func(session, name, version)
730 if obj.owner != user:
731 logger.error("Not authorized to modify component or data format")
733 elif obj.when_published:
734 logger.warn("Component or data format has already been published")
737 obj.when_published = datetime.utcnow()
740 logger.error("Component or data format not found: {0}, {1}".format(name, version))
745 def publish_format(self, user, name, version):
746 """Publish data format
750 True upon success else False
752 return self._publish(get_format, user, name, version)
754 def get_unpublished_formats(self, comp_name, comp_version):
755 """Get unpublished formats for given component
759 List of unique data format name, version pairs
761 with SessionTransaction(self.engine) as session:
762 comp = _get_component(session, comp_name, comp_version)
764 dfs = comp.publishes + comp.subscribes
765 dfs += [ p.req for p in comp.provides]
766 dfs += [ p.resp for p in comp.provides]
767 dfs += [ c.req for c in comp.calls]
768 dfs += [ c.resp for c in comp.calls]
770 def is_not_published(orm):
771 return orm.when_published == None
773 formats = [(df.name, df.version) for df in filter(is_not_published, dfs)]
774 return list(set(formats))
776 def publish_component(self, user, name, version):
781 True upon success else False
783 return self._publish(_get_component, user, name, version)