move plugins from from ccsdk to dcaegen2
[dcaegen2/platform/plugins.git] / pgaas / pgaas / pgaas_plugin.py
1 # org.onap.dcaegen2
2 # ============LICENSE_START====================================================
3 # =============================================================================
4 # Copyright (c) 2017-2020 AT&T Intellectual Property. All rights reserved.
5 # Copyright (c) 2020 Pantheon.tech. All rights reserved.
6 # =============================================================================
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
10 #
11 #      http://www.apache.org/licenses/LICENSE-2.0
12 #
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ============LICENSE_END======================================================
19
20 """
21 PostgreSQL plugin to manage passwords
22 """
23
24 from __future__ import print_function
25 import sys
26 import os
27 import re
28 import json
29 import hashlib
30 import socket
31 import traceback
32 import base64
33 import binascii
34 import collections
35 try:
36   from urllib.parse import quote
37 except ImportError:
38   from urllib import quote
39
40 from cloudify import ctx
41 from cloudify.decorators import operation
42 from cloudify.exceptions import NonRecoverableError
43 from cloudify.exceptions import RecoverableError
44
45 try:
46     import psycopg2
47 except ImportError:
48     # FIXME: any users of this plugin installing its dependencies in nonstandard
49     # directories should set up PYTHONPATH accordingly, outside the program code
50     SYSPATH = sys.path
51     sys.path = list(SYSPATH)
52     sys.path.append('/usr/lib64/python2.7/site-packages')
53     import psycopg2
54     sys.path = SYSPATH
55
56 from pgaas.logginginterface import debug, info, warn, error
57
58
59 """
60   To set up a cluster:
61   - https://$NEXUS/repository/raw/type_files/sshkeyshare/sshkey_types.yaml
62   - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml
63   sharedsshkey_pgrs:
64     type: dcae.nodes.ssh.keypair
65   pgaas_cluster:
66     type: dcae.nodes.pgaas.cluster
67     properties:
68       writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
69       readerfqdn: { get_input: k8s_pgaas_instance_fqdn }
70       # OR:
71       # writerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '-write.', { get_input: location_domain } ] }
72       # readerfqdn: { concat: [ { get_input: location_prefix }, '-', { get_input: pgaas_cluster_name }, '.', { get_input: location_domain } ] }
73     relationships:
74       - type: dcae.relationships.pgaas_cluster_uses_sshkeypair
75         target: sharedsshkey_pgrs
76
77   To reference an existing cluster:
78   - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml
79   pgaas_cluster:
80     type: dcae.nodes.pgaas.cluster
81     properties:
82       writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
83       # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-',
84       #                             { get_input: pgaas_cluster_name }, '-write.',
85       #                             { get_input: location_domain } ] }
86       # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] }
87       use_existing: true
88
89   To initialize an existing server to be managed by pgaas_plugin::
90   - https://$NEXUS/repository/raw/type_files/sshkeyshare/sshkey_types.yaml
91   - https://$NEXUS/repository/raw/type_files/pgaas_types.yaml
92   pgaas_cluster:
93     type: dcae.nodes.pgaas.cluster
94     properties:
95       writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
96       readerfqdn: { get_input: k8s_pgaas_instance_fqdn }
97       # OR:
98       # writerfqdn: { concat: [ { get_input: location_prefix }, '-',
99       #                         { get_input: pgaas_cluster_name }, '-write.',
100       #                         { get_input: location_domain } ] }
101       # readerfqdn: { concat: [ { get_input: location_prefix }, '-',
102       #                         { get_input: pgaas_cluster_name }, '.',
103       #                         { get_input: location_domain } ] }
104       initialpassword: { get_input: currentpassword }
105     relationships:
106       - type: dcae.relationships.pgaas_cluster_uses_sshkeypair
107         target: sharedsshkey_pgrs
108
109   - { get_attribute: [ pgaas_cluster, public ] }
110   - { get_attribute: [ pgaas_cluster, base64private ] }
111   # - { get_attribute: [ pgaas_cluster, postgrespswd ] }
112
113
114   To set up a database:
115   - http://$NEXUS/raw/type_files/pgaas_types.yaml
116   pgaasdbtest:
117     type: dcae.nodes.pgaas.database
118     properties:
119       writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
120       # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-',
121       #                             { get_input: pgaas_cluster_name }, '-write.',
122       #                             { get_input: location_domain } ] }
123       # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] }
124       name: { get_input: database_name }
125
126   To reference an existing database:
127   - http://$NEXUS/raw/type_files/pgaas_types.yaml
128   $CLUSTER_$DBNAME:
129     type: dcae.nodes.pgaas.database
130     properties:
131       writerfqdn: { get_input: k8s_pgaas_instance_fqdn }
132       # OR: writerfqdn: { concat: [ { get_input: location_prefix }, '-',
133       #                             { get_input: pgaas_cluster_name }, '-write.',
134       #                             { get_input: location_domain } ] }
135       # OR: writerfqdn: { get_property: [ dns_pgrs_rw, fqdn ] }
136       name: { get_input: database_name }
137       use_existing: true
138
139   $CLUSTER_$DBNAME_admin_host:
140     description: Hostname for $CLUSTER $DBNAME database
141     value: { get_attribute: [ $CLUSTER_$DBNAME, admin, host ] }
142   $CLUSTER_$DBNAME_admin_user:
143     description: Admin Username for $CLUSTER $DBNAME database
144     value: { get_attribute: [ $CLUSTER_$DBNAME, admin, user ] }
145   $CLUSTER_$DBNAME_admin_password:
146     description: Admin Password for $CLUSTER $DBNAME database
147     value: { get_attribute: [ $CLUSTER_$DBNAME, admin, password ] }
148   $CLUSTER_$DBNAME_user_host:
149     description: Hostname for $CLUSTER $DBNAME database
150     value: { get_attribute: [ $CLUSTER_$DBNAME, user, host ] }
151   $CLUSTER_$DBNAME_user_user:
152     description: User Username for $CLUSTER $DBNAME database
153     value: { get_attribute: [ $CLUSTER_$DBNAME, user, user ] }
154   $CLUSTER_$DBNAME_user_password:
155     description: User Password for $CLUSTER $DBNAME database
156     value: { get_attribute: [ $CLUSTER_$DBNAME, user, password ] }
157   $CLUSTER_$DBNAME_viewer_host:
158     description: Hostname for $CLUSTER $DBNAME database
159     value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, host ] }
160   $CLUSTER_$DBNAME_viewer_user:
161     description: Viewer Username for $CLUSTER $DBNAME database
162     value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, user ] }
163   $CLUSTER_$DBNAME_viewer_password:
164     description: Viewer Password for $CLUSTER $DBNAME database
165     value: { get_attribute: [ $CLUSTER_$DBNAME, viewer, password ] }
166
167 """
168
169 OPT_MANAGER_RESOURCES_PGAAS = "/opt/manager/resources/pgaas"
170
171 # pylint: disable=invalid-name
172 def setOptManagerResources(o): # pylint: disable=global-statement
173   """
174   Overrides the default locations of /opt/managers/resources
175   """
176   # pylint: disable=global-statement
177   global OPT_MANAGER_RESOURCES_PGAAS
178   OPT_MANAGER_RESOURCES_PGAAS = "{}/pgaas".format(o)
179
180 def safestr(s):
181   """
182   returns a safely printable version of the string
183   """
184   return quote(str(s), '')
185
186 def raiseRecoverableError(msg):
187   """
188   Print a warning message and raise a RecoverableError exception.
189   This is a handy endpoint to add other extended debugging calls.
190   """
191   warn(msg)
192   raise RecoverableError(msg)
193
194 def raiseNonRecoverableError(msg):
195   """
196   Print an error message and raise a NonRecoverableError exception.
197   This is a handy endpoint to add other extended debugging calls.
198   """
199   error(msg)
200   raise NonRecoverableError(msg)
201
202 def dbexecute(crx, cmd, args=None):
203   """
204   executes the SQL statement
205   Prints the entire command for debugging purposes
206   """
207   debug("executing {}".format(cmd))
208   crx.execute(cmd, args)
209
210
211 def dbexecute_trunc_print(crx, cmd, args=None):
212   """
213   executes the SQL statement.
214   Will print only the first 30 characters in the command
215   Use this function if you are executing an SQL cmd with a password
216   """
217   debug("executing {}".format(cmd[:30]))
218   crx.execute(cmd, args)
219
220
221 def waithp(host, port):
222   """
223   do a test connection to a host and port
224   """
225   debug("waithp({0},{1})".format(safestr(host), safestr(port)))
226   sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
227   try:
228     sock.connect((host, int(port)))
229   except: # pylint: disable=bare-except
230     a, b, c = sys.exc_info()
231     traceback.print_exception(a, b, c)
232     sock.close()
233     raiseRecoverableError('Server at {0}:{1} is not ready'.format(safestr(host), safestr(port)))
234   sock.close()
235
236 def doconn(desc):
237   """
238   open an SQL connection to the PG server
239   """
240   debug("doconn({},{},{})".format(desc['host'], desc['user'], desc['database']))
241   # debug("doconn({},{},{},{})".format(desc['host'], desc['user'], desc['database'], desc['password']))
242   ret = psycopg2.connect(**desc)
243   ret.autocommit = True
244   return ret
245
246 def hostportion(hostport):
247   """
248   return the host portion of a fqdn:port or IPv4:port or [IPv6]:port
249   """
250   ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport)
251   ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport)
252   if ipv4re:
253     return ipv4re.group(1)
254   if ipv6re:
255     return ipv6re.group(1)
256   raiseNonRecoverableError("invalid hostport: {}".format(hostport))
257
258 def portportion(hostport):
259   """
260   Return the port portion of a fqdn:port or IPv4:port or [IPv6]:port.
261   If port is not present, return 5432.
262   """
263   ipv6re = re.match(r"^[[]([^]]+)[]](:(\d+))?", hostport)
264   ipv4re = re.match(r"^([^:]+)(:(\d+))?", hostport)
265   if ipv4re:
266     return ipv4re.group(3) if ipv4re.group(3) else '5432'
267   if ipv6re:
268     return ipv6re.group(3) if ipv6re.group(3) else '5432'
269   raiseNonRecoverableError("invalid hostport: {}".format(hostport))
270
271 def rootdesc(data, dbname, initialpassword=None):
272   """
273   return the postgres connection information
274   """
275   debug("rootdesc(..data..,{0})".format(safestr(dbname)))
276   # pylint: disable=bad-continuation
277   return {
278     'database': dbname,
279     'host': hostportion(data['rw']),
280     'port': portportion(data['rw']),
281     'user': 'postgres',
282     'password': initialpassword if initialpassword else getpass(data, 'postgres', data['rw'], 'postgres')
283   }
284
285 def rootconn(data, dbname='postgres', initialpassword=None):
286   """
287   connect to a given server as postgres,
288   connecting to the specified database
289   """
290   debug("rootconn(..data..,{0})".format(safestr(dbname)))
291   return doconn(rootdesc(data, dbname, initialpassword))
292
293 def onedesc(data, dbname, role, access):
294   """
295   return the connection information for a given user and dbname on a cluster
296   """
297   user = '{0}_{1}'.format(dbname, role)
298   # pylint: disable=bad-continuation
299   return {
300     'database': dbname,
301     'host': hostportion(data[access]),
302     'port': portportion(data[access]),
303     'user': user,
304     'password': getpass(data, user, data['rw'], dbname)
305   }
306
307 def dbdescs(data, dbname):
308   """
309   return the entire set of information for a specific server/database
310   """
311   # pylint: disable=bad-continuation
312   return {
313     'admin': onedesc(data, dbname, 'admin', 'rw'),
314     'user': onedesc(data, dbname, 'user', 'rw'),
315     'viewer': onedesc(data, dbname, 'viewer', 'ro')
316   }
317
318 def getpass(data, ident, hostport, dbname):
319   """
320   generate the password for a given user on a specific server
321   """
322   m = hashlib.sha256()
323   m.update(ident.encode())
324
325   # mix in the seed (the last line) for that database, if one exists
326   hostport = hostport.lower()
327   dbname = dbname.lower()
328   hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport, dbname)
329   try:
330     lastLine = ''
331     with open(hostPortDbname, "r") as fp:
332       for line in fp:
333         lastLine = line
334     m.update(lastLine.encode())
335   except IOError:
336     pass
337
338   m.update(base64.b64decode(data['data']))
339   return m.hexdigest()
340
341 def find_related_nodes(reltype, inst=None):
342   """
343   extract the related_nodes information from the context
344   for a specific relationship
345   """
346   if inst is None:
347     inst = ctx.instance
348   ret = []
349   for rel in inst.relationships:
350     if reltype in rel.type_hierarchy:
351       ret.append(rel.target)
352   return ret
353
354 def chkfqdn(fqdn):
355   """
356   verify that a FQDN is valid
357   """
358   if fqdn is None:
359     return False
360   hp = hostportion(fqdn)
361   # not needed right now: pp = portportion(fqdn)
362   # TODO need to augment this for IPv6 addresses
363   return re.match('^[a-zA-Z0-9_-]+(\\.[a-zA-Z0-9_-]+)+$', hp) is not None
364
365 def chkdbname(dbname):
366   """
367   verify that a database name is valid
368   """
369   ret = re.match('[a-zA-Z][a-zA-Z0-9]{0,43}', dbname) is not None and dbname != 'postgres'
370   if not ret:
371     warn("Invalid dbname: {0}".format(safestr(dbname)))
372   return ret
373
374 def get_valid_domains():
375   """
376   Return a list of the valid names, suitable for inclusion in an error message.
377   """
378   msg = ''
379   import glob
380   validDomains = []
381   for f in glob.glob('{}/*'.format(OPT_MANAGER_RESOURCES_PGAAS)):
382     try:
383       with open(f, "r") as fp:
384         try:
385           tmpdata = json.load(fp)
386           if 'pubkey' in tmpdata:
387             validDomains.append(os.path.basename(f))
388         except: # pylint: disable=bare-except
389           pass
390     except: # pylint: disable=bare-except
391       pass
392   if len(validDomains) == 0:
393     msg += '\nNo valid PostgreSQL cluster information was found'
394   else:
395     msg += '\nThese are the valid PostgreSQL cluster domains found on this manager:'
396     for v in validDomains:
397       msg += '\n\t"{}"'.format(v)
398   return msg
399
400 def get_existing_clusterinfo(wfqdn, rfqdn, related):
401   """
402   Retrieve all of the information specific to an existing cluster.
403   """
404   if rfqdn != '':
405     raiseNonRecoverableError('Read-only FQDN must not be specified when using an existing cluster, fqdn={0}'.format(safestr(rfqdn)))
406   if len(related) != 0:
407     raiseNonRecoverableError('Cluster SSH keypair must not be specified when using an existing cluster')
408   try:
409     fn = '{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower())
410     with open(fn, 'r') as f:
411       data = json.load(f)
412       data['rw'] = wfqdn
413       return data
414   except Exception as e: # pylint: disable=broad-except
415     warn("Error: {0}".format(e))
416     msg = 'Cluster must be deployed when using an existing cluster.\nCheck your domain name: fqdn={0}\nerr={1}'.format(safestr(wfqdn), e)
417     if not os.path.isdir(OPT_MANAGER_RESOURCES_PGAAS):
418       msg += '\nThe directory {} does not exist. No PostgreSQL clusters have been deployed on this manager.'.format(OPT_MANAGER_RESOURCES_PGAAS)
419     else:
420       msg += get_valid_domains()
421     # warn("Stack: {0}".format(traceback.format_exc()))
422     raiseNonRecoverableError(msg)
423
424 def getclusterinfo(wfqdn, reuse, rfqdn, initialpassword, related):
425   """
426   Retrieve all of the information specific to a cluster.
427   if reuse, retrieve it
428   else create and store it
429   """
430   # debug("getclusterinfo({}, {}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn), safestr(initialpassword)))
431   debug("getclusterinfo({}, {}, {}, ..related..)".format(safestr(wfqdn), safestr(reuse), safestr(rfqdn)))
432   if not chkfqdn(wfqdn):
433     raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn)))
434   if reuse:
435     return get_existing_clusterinfo(wfqdn, rfqdn, related)
436
437   if rfqdn == '':
438     rfqdn = wfqdn
439   elif not chkfqdn(rfqdn):
440     raiseNonRecoverableError('Invalid FQDN specified for read-only access, fqdn={0}'.format(safestr(rfqdn)))
441   if len(related) != 1:
442     raiseNonRecoverableError('Cluster SSH keypair must be specified using a dcae.relationships.pgaas_cluster_uses_sshkeypair ' +
443                              'relationship to a dcae.nodes.sshkeypair node')
444   data = {'ro': rfqdn, 'pubkey': related[0].instance.runtime_properties['public'],
445           'data': related[0].instance.runtime_properties['base64private'], 'hash': 'sha256'}
446   os.umask(0o77)
447   try:
448     os.makedirs('{0}'.format(OPT_MANAGER_RESOURCES_PGAAS))
449   except: # pylint: disable=bare-except
450     pass
451   try:
452     with open('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn.lower()), 'w') as f:
453       f.write(json.dumps(data))
454   except Exception as e: # pylint: disable=broad-except
455     warn("Error: {0}".format(e))
456     warn("Stack: {0}".format(traceback.format_exc()))
457     raiseNonRecoverableError('Cannot write cluster information to {0}: fqdn={1}, err={2}'.format(OPT_MANAGER_RESOURCES_PGAAS, safestr(wfqdn), e))
458   data['rw'] = wfqdn
459   if initialpassword:
460     with rootconn(data, initialpassword=initialpassword) as conn:
461       crr = conn.cursor()
462       dbexecute_trunc_print(crr, "ALTER USER postgres WITH PASSWORD %s", (getpass(data, 'postgres', wfqdn, 'postgres'),))
463       crr.close()
464   return data
465
466 @operation
467 def add_pgaas_cluster(**kwargs): # pylint: disable=unused-argument
468   """
469   dcae.nodes.pgaas.cluster:
470   Record key generation data for cluster
471   """
472   try:
473     warn("add_pgaas_cluster() invoked")
474     data = getclusterinfo(ctx.node.properties['writerfqdn'],
475                           ctx.node.properties['use_existing'],
476                           ctx.node.properties['readerfqdn'],
477                           ctx.node.properties['initialpassword'],
478                           find_related_nodes('dcae.relationships.pgaas_cluster_uses_sshkeypair'))
479     ctx.instance.runtime_properties['public'] = data['pubkey']
480     ctx.instance.runtime_properties['base64private'] = data['data']
481     ctx.instance.runtime_properties['postgrespswd'] = getpass(data, 'postgres', ctx.node.properties['writerfqdn'], 'postgres')
482     warn('All done')
483   except Exception as e: # pylint: disable=broad-except
484     ctx.logger.warn("Error: {0}".format(e))
485     ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
486     raise e
487
488 @operation
489 def rm_pgaas_cluster(**kwargs): # pylint: disable=unused-argument
490   """
491   dcae.nodes.pgaas.cluster:
492   Remove key generation data for cluster
493   """
494   try:
495     warn("rm_pgaas_cluster()")
496     wfqdn = ctx.node.properties['writerfqdn']
497     if chkfqdn(wfqdn) and not ctx.node.properties['use_existing']:
498       os.remove('{0}/{1}'.format(OPT_MANAGER_RESOURCES_PGAAS, wfqdn))
499     warn('All done')
500   except Exception as e: # pylint: disable=broad-except
501     ctx.logger.warn("Error: {0}".format(e))
502     ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
503     raise e
504
505 def dbgetinfo(refctx):
506   """
507   Get the data associated with a database.
508   Make sure the connection exists.
509   """
510   wfqdn = refctx.node.properties['writerfqdn']
511   related = find_related_nodes('dcae.relationships.database_runson_pgaas_cluster', refctx.instance)
512   if wfqdn == '':
513     if len(related) != 1:
514       raiseNonRecoverableError('Database Cluster must be specified using exactly one dcae.relationships.database_runson_pgaas_cluster relationship ' +
515                                'to a dcae.nodes.pgaas.cluster node when writerfqdn is not specified')
516     wfqdn = related[0].node.properties['writerfqdn']
517   return dbgetinfo_for_update(wfqdn)
518
519 def dbgetinfo_for_update(wfqdn):
520   """
521   Get the data associated with a database.
522   Make sure the connection exists.
523   """
524
525   if not chkfqdn(wfqdn):
526     raiseNonRecoverableError('Invalid FQDN specified for admin/read-write access, fqdn={0}'.format(safestr(wfqdn)))
527   ret = getclusterinfo(wfqdn, True, '', '', [])
528   waithp(hostportion(wfqdn), portportion(wfqdn))
529   return ret
530
531 @operation
532 def create_database(**kwargs):
533   """
534   dcae.nodes.pgaas.database:
535   Create a database on a cluster
536   """
537   try:
538     debug("create_database() invoked")
539     dbname = ctx.node.properties['name']
540     warn("create_database({0})".format(safestr(dbname)))
541     if not chkdbname(dbname):
542       raiseNonRecoverableError('Unacceptable or missing database name: {0}'.format(safestr(dbname)))
543     debug('create_database(): dbname checked out')
544     dbinfo = dbgetinfo(ctx)
545     debug('Got db server info')
546     descs = dbdescs(dbinfo, dbname)
547     ctx.instance.runtime_properties['admin'] = descs['admin']
548     ctx.instance.runtime_properties['user'] = descs['user']
549     ctx.instance.runtime_properties['viewer'] = descs['viewer']
550     with rootconn(dbinfo) as conn:
551       crx = conn.cursor()
552       dbexecute(crx, 'SELECT datname FROM pg_database WHERE datistemplate = false')
553       existingdbs = [x[0] for x in crx]
554       if ctx.node.properties['use_existing']:
555         if dbname not in existingdbs:
556           raiseNonRecoverableError('use_existing specified but database does not exist, dbname={0}'.format(safestr(dbname)))
557         return
558       dbexecute(crx, 'SELECT rolname FROM pg_roles')
559       existingroles = [x[0] for x in crx]
560       admu = descs['admin']['user']
561       usru = descs['user']['user']
562       vwru = descs['viewer']['user']
563       cusr = '{0}_common_user_role'.format(dbname)
564       cvwr = '{0}_common_viewer_role'.format(dbname)
565       schm = '{0}_db_common'.format(dbname)
566       if admu not in existingroles:
567         dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(admu), (descs['admin']['password'],))
568       if usru not in existingroles:
569         dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(usru), (descs['user']['password'],))
570       if vwru not in existingroles:
571         dbexecute_trunc_print(crx, 'CREATE USER {0} WITH PASSWORD %s'.format(vwru), (descs['viewer']['password'],))
572       if cusr not in existingroles:
573         dbexecute(crx, 'CREATE ROLE {0}'.format(cusr))
574       if cvwr not in existingroles:
575         dbexecute(crx, 'CREATE ROLE {0}'.format(cvwr))
576       if dbname not in existingdbs:
577         dbexecute(crx, 'CREATE DATABASE {0} WITH OWNER {1}'.format(dbname, admu))
578       crx.close()
579     with rootconn(dbinfo, dbname) as dbconn:
580       crz = dbconn.cursor()
581       for r in [cusr, cvwr, usru, vwru]:
582         dbexecute(crz, 'REVOKE ALL ON DATABASE {0} FROM {1}'.format(dbname, r))
583       dbexecute(crz, 'GRANT {0} TO {1}'.format(cvwr, cusr))
584       dbexecute(crz, 'GRANT {0} TO {1}'.format(cusr, admu))
585       dbexecute(crz, 'GRANT CONNECT ON DATABASE {0} TO {1}'.format(dbname, cvwr))
586       dbexecute(crz, 'CREATE SCHEMA IF NOT EXISTS {0} AUTHORIZATION {1}'.format(schm, admu))
587       for r in [admu, cusr, cvwr, usru, vwru]:
588         dbexecute(crz, 'ALTER ROLE {0} IN DATABASE {1} SET search_path = public, {2}'.format(r, dbname, schm))
589       dbexecute(crz, 'GRANT USAGE ON SCHEMA {0} to {1}'.format(schm, cvwr))
590       dbexecute(crz, 'GRANT CREATE ON SCHEMA {0} to {1}'.format(schm, admu))
591       dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT SELECT ON TABLES TO {1}'.format(admu, cvwr))
592       dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT INSERT, UPDATE, DELETE, TRUNCATE ON TABLES TO {1}'.format(admu, cusr))
593       dbexecute(crz, 'ALTER DEFAULT PRIVILEGES FOR ROLE {0} GRANT USAGE, SELECT, UPDATE ON SEQUENCES TO {1}'.format(admu, cusr))
594       dbexecute(crz, 'GRANT TEMP ON DATABASE {0} TO {1}'.format(dbname, cusr))
595       dbexecute(crz, 'GRANT {0} to {1}'.format(cusr, usru))
596       dbexecute(crz, 'GRANT {0} to {1}'.format(cvwr, vwru))
597       crz.close()
598     warn('All done')
599   except Exception as e: # pylint: disable=broad-except
600     ctx.logger.warn("Error: {0}".format(e))
601     ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
602     raise e
603
604 @operation
605 def delete_database(**kwargs): # pylint: disable=unused-argument
606   """
607   dcae.nodes.pgaas.database:
608   Delete a database from a cluster
609   """
610   try:
611     debug("delete_database() invoked")
612     dbname = ctx.node.properties['name']
613     warn("delete_database({0})".format(safestr(dbname)))
614     if not chkdbname(dbname):
615       return
616     debug('delete_database(): dbname checked out')
617     if ctx.node.properties['use_existing']:
618       return
619     debug('delete_database(): !use_existing')
620     dbinfo = dbgetinfo(ctx)
621     debug('Got db server info')
622     with rootconn(dbinfo) as conn:
623       crx = conn.cursor()
624       admu = ctx.instance.runtime_properties['admin']['user']
625       usru = ctx.instance.runtime_properties['user']['user']
626       vwru = ctx.instance.runtime_properties['viewer']['user']
627       cusr = '{0}_common_user_role'.format(dbname)
628       cvwr = '{0}_common_viewer_role'.format(dbname)
629       dbexecute(crx, 'DROP DATABASE IF EXISTS {0}'.format(dbname))
630       for r in [usru, vwru, admu, cusr, cvwr]:
631         dbexecute(crx, 'DROP ROLE IF EXISTS {0}'.format(r))
632     warn('All gone')
633   except Exception as e: # pylint: disable=broad-except
634     ctx.logger.warn("Error: {0}".format(e))
635     ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
636     raise e
637
638 #############################################################
639 # function: update_database                                 #
640 # Purpose: Called as a workflow to change the database      #
641 #          passwords for all the users                      #
642 #                                                           #
643 # Invoked via:                                              #
644 # cfy executions start -d <deployment-id> update_db_passwd  #
645 #                                                           #
646 # Assumptions:                                              #
647 # 1) pgaas_types.yaml must define a work flow e.g.          #
648 #    workflows:                                             #
649 #      update_db_passwd :                                   #
650 #        mapping : pgaas.pgaas.pgaas_plugin.update_database #
651 # 2) DB Blueprint: node_template must have properties:      #
652 #     writerfqdn & name (of DB)                             #
653 #############################################################
654 # pylint: disable=unused-argument
655 @operation
656 def update_database(refctx, **kwargs):
657   """
658   dcae.nodes.pgaas.database:
659   Update the password for a database from a cluster
660   refctx is auto injected into the function when called as a workflow
661   """
662   try:
663     debug("update_database() invoked")
664
665     ################################################
666     # Verify refctx contains the <nodes> attribute.   #
667     # The workflow context might not be consistent #
668     # across different cloudify versions           #
669     ################################################
670     if not hasattr(refctx, 'nodes'):
671       raiseNonRecoverableError('workflow context does not contain attribute=<nodes>. dir(refctx)={}'.format(dir(refctx)))
672
673     ############################################
674     # Verify that refctx.nodes is iterable        #
675     ############################################
676     if not isinstance(refctx.nodes, collections.Iterable):
677       raiseNonRecoverableError("refctx.nodes is not an iterable. Type={}".format(type(refctx.nodes)))
678
679     ctx_node = None
680     ##############################################
681     # Iterate through the nodes until we find    #
682     # one with the properties we are looking for #
683     ##############################################
684     for i in refctx.nodes:
685
686       ############################################
687       # Safeguard: If a given node doesn't have  #
688       #            properties then skip it.      #
689       # Don't cause an exception since the nodes #
690       # entry we are searching might still exist #
691       ############################################
692       if not hasattr(i, 'properties'):
693         warn('Encountered a ctx node that does not have attr=<properties>. dir={}'.format(dir(i)))
694         continue
695
696       debug("ctx node has the following Properties: {}".format(list(i.properties.keys())))
697
698       if ('name' in i.properties) and ('writerfqdn' in i.properties):
699         ctx_node = i
700         break
701
702
703     ###############################################
704     # If none of the nodes have properties:       #
705     # <name> and <writerfqdn> then fatal error    #
706     ###############################################
707     if not ctx_node:
708       raiseNonRecoverableError('Either <name> or <writerfqdn> is not found in refctx.nodes.properties.')
709
710     debug("name is {}".format(ctx_node.properties['name']))
711     debug("host is {}".format(ctx_node.properties['writerfqdn']))
712
713     dbname = ctx_node.properties['name']
714     debug("update_database({0})".format(safestr(dbname)))
715
716     ###########################
717     # dbname must be valid    #
718     ###########################
719     if not chkdbname(dbname):
720       raiseNonRecoverableError('dbname is null')
721
722
723     hostport = ctx_node.properties['writerfqdn']
724     debug('update_database(): wfqdn={}'.format(hostport))
725     dbinfo = dbgetinfo_for_update(hostport)
726
727     #debug('Got db server info={}'.format(dbinfo))
728
729     hostPortDbname = '{0}/{1}:{2}'.format(OPT_MANAGER_RESOURCES_PGAAS, hostport.lower(), dbname.lower())
730
731     debug('update_database(): hostPortDbname={}'.format(hostPortDbname))
732     try:
733       appended = False
734       with open(hostPortDbname, "a") as fp:
735         with open("/dev/urandom", "rb") as rp:
736           b = rp.read(16)
737           print(binascii.hexlify(b).decode('utf-8'), file=fp)
738           appended = True
739       if not appended:
740         ctx.logger.warn("Error: the password for {} {} was not successfully changed".format(hostport, dbname))
741     except Exception as e: # pylint: disable=broad-except
742       ctx.logger.warn("Error: {0}".format(e))
743       ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
744       raise e
745
746     descs = dbdescs(dbinfo, dbname)
747
748     ##########################################
749     # Verify we have expected keys           #
750     # <admin>, <user>, and <viewer> as well  #
751     # as "sub-key" <user>                    #
752     ##########################################
753
754     if not isinstance(descs, dict):
755       raiseNonRecoverableError('db descs has unexpected type=<{}> was expected type dict'.format(type(descs)))
756
757     for key in ("admin", "user", "viewer"):
758       if key not in descs:
759         raiseNonRecoverableError('db descs does not contain key=<{}>. Keys found for descs are: {}'.format(key, list(descs.keys())))
760       if 'user' not in descs[key]:
761         raiseNonRecoverableError('db descs[{}] does not contain key=<user>. Keys found for descs[{}] are: {}'.format(key, key, list(descs[key].keys())))
762
763
764     with rootconn(dbinfo) as conn:
765       crx = conn.cursor()
766
767       admu = descs['admin']['user']
768       usru = descs['user']['user']
769       vwru = descs['viewer']['user']
770
771       for r in [usru, vwru, admu]:
772         dbexecute_trunc_print(crx, "ALTER USER {} WITH PASSWORD '{}'".format(r, getpass(dbinfo, r, hostport, dbname)))
773         #debug("user={} password={}".format(r, getpass(dbinfo, r, hostport, dbname)))
774
775     warn('All users updated for database {}'.format(dbname))
776   except Exception as e: # pylint: disable=broad-except
777     ctx.logger.warn("Error: {0}".format(e))
778     ctx.logger.warn("Stack: {0}".format(traceback.format_exc()))
779     raise e