Source code for LMIConnection

# Copyright (C) 2012-2013 Peter Hatina <phatina@redhat.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.

import os
import sys
import atexit
import pywbem
import readline
import urlparse

import OpenSSL.SSL

from LMIBaseClient import LMIBaseClient
from LMIShellClient import LMIShellClient
from LMINamespace import LMINamespaceRoot
from LMIInstance import LMIInstance
from LMIReturnValue import LMIReturnValue
from LMISubscription import LMISubscription

from LMIExceptions import LMIIndicationError
from LMIExceptions import LMINamespaceNotFound

from LMIUtil import lmi_raise_or_dump_exception
from LMIUtil import lmi_get_use_exceptions
from LMIUtil import lmi_set_use_exceptions

def __lmi_raw_input(prompt, use_echo=True):
    """
    Reads a string from the standard input.

    :param string prompt: input prompt
    :param bool use_echo: whether to echo the input on the command line
    :returns: input string
    :raises: :py:exc:`EOFError`
    """
    if not sys.stdout.isatty() and sys.stderr.isatty():
        # read the input with prompt printed to stderr
        def get_input(prompt):
            sys.stderr.write(prompt)
            return raw_input()
        stream = sys.stderr
    else:
        # read the input with prompt printed to stdout
        # NOTE: raw_input uses stdout only if the readline module is imported
        get_input = raw_input
        stream = sys.stdout
    if not sys.stderr.isatty() and not sys.stdout.isatty():
        LOG.warn('both stdout and stderr are detached from terminal,'
            ' using stdout for prompt')
    if not use_echo:
        os.system("stty -echo")
    try:
        result = get_input(prompt)
    except EOFError, e:
        if not use_echo:
            os.system("stty echo")
        stream.write("\n")
        return None
    except KeyboardInterrupt, e:
        if not use_echo:
            os.system("stty echo")
        raise e
    if not use_echo:
        os.system("stty echo")
        stream.write("\n")
    if result:
        cur_hist_len = readline.get_current_history_length()
        if cur_hist_len > 1:
            readline.remove_history_item(cur_hist_len - 1)

    return result

[docs]def connect(uri, username="", password="", **kwargs): """ Creates a connection object with provided URI and credentials. :param string uri: URI of the CIMOM :param string username: account, under which, the CIM calls will be performed :param string password: user's password :param bool interactive: flag indicating, if the LMIShell client is running in the interactive mode; default value is False. :param bool use_cache: flag indicating, if the LMIShell client should use cache for :py:class:`CIMClass` objects. This saves lot's of communication, if there are :func:`EnumerateInstances` and :func:`EnumerateClasses` intrinsic methods often issued. Default value is True. :param string key_file: path to x509 key file; default value is None :param string cert_file: path to x509 cert file; default value is None :param bool verify_server_cert: flag indicating, whether a server side certificate needs to be verified, if SSL used; default value is True. :param string prompt_prefix: username and password prompt prefix in case the user is asked for credentials. Default value is empty string. :returns: :py:class:`LMIConnection` object or None, if LMIShell does not use exceptions :raises: :py:exc:`pywbem.AuthError` **NOTE:** If interactive is set to True, LMIShell will: * prompt for username and password, if missing and connection via Unix socket can not be established. * use pager for the output of: :py:meth:`.LMIInstance.doc`, :py:meth:`.LMIClass.doc`, :py:meth:`.LMIInstance.tomof` and :py:meth:`.LMIMethod.tomof` **Usage:** :ref:`startup_connection`. """ # Set remaining arguments interactive = kwargs.pop("interactive", False) use_cache = kwargs.pop("use_cache", True) key_file = kwargs.pop("key_file", None) cert_file = kwargs.pop("cert_file", None) verify_server_cert = kwargs.pop("verify_server_cert", True) prompt_prefix = kwargs.pop("prompt_prefix", "") if kwargs: raise TypeError("connect() got an unexpected keyword arguments: %s" % ", ".join(kwargs.keys())) connection = None netloc = urlparse.urlparse(uri).netloc destination = netloc if netloc else uri if os.getuid() == 0 and destination in ("localhost", "127.0.0.1", "::1") and \ os.path.exists("/var/run/tog-pegasus/cimxml.socket") and \ not username and not password: connection = LMIConnection(uri, None, None, interactive=interactive, use_cache=use_cache, conn_type=LMIBaseClient.CONN_TYPE_PEGASUS_UDS, verify_server_cert=verify_server_cert) if not connection.verify_credentials(): connection = None if connection is None: if interactive and not key_file and not cert_file: try: if not username: username = __lmi_raw_input(prompt_prefix+"username: ", True) if not password: password = __lmi_raw_input(prompt_prefix+"password: ", False) except KeyboardInterrupt, e: sys.stdout.write("\n") return None connection = LMIConnection(uri, username, password, interactive=interactive, use_cache=use_cache, conn_type=LMIBaseClient.CONN_TYPE_WBEM, key_file=key_file, cert_file=cert_file, verify_server_cert=verify_server_cert) if not connection.verify_credentials(): return None return connection
[docs]class LMIConnection(object): """ Class representing a connection object. Each desired connection to separate CIMOM should have its own connection object created. This class provides an entry point to the namespace/classes/instances/methods hierarchy present in the LMIShell. :param string uri: URI of the CIMOM :param string username: account, under which, the CIM calls will be performed :param string password: user's password :param bool interactive: flag indicating, if the LMIShell client is running in the interactive mode; default value is False. :param bool use_cache: flag indicating, if the LMIShell client should use cache for CIMClass objects. This saves lot's of communication, if there are :func:`EnumerateInstances` and :func:`EnumerateClasses` intrinsic methods often issued. Default value is True. :param conn_type: type of connection; can be of 2 values: * :py:attr:`LMIBaseClient.CONN_TYPE_WBEM` -- WBEM connection, * :py:attr:`LMIBaseClient.CONN_TYPE_PEGASUS_UDS` -- applicable only for Tog-Pegasus CIMOM, it uses Unix socket for the connection; default value is :py:attr:`LMIBaseClient.CONN_TYPE_WBEM` :param string key_file: path to x509 key file; default value is None :param string cert_file: path to x509 cert file; default value is None :param bool verify_server_cert: flag indicating, whether a server side certificate needs to be verified, if SSL used; default value is True **NOTE:** If interactive is set to True, LMIShell will: * prompt for username and password, if missing and connection via Unix socket can not be established. * use pager for the output of: :py:meth:`.LMIInstance.doc`, :py:meth:`.LMIClass.doc`, :py:meth:`.LMIInstance.tomof` and :py:meth:`.LMIMethod.tomof` """ def __init__(self, uri, username="", password="", **kwargs): # Set remaining arguments interactive = kwargs.pop("interactive", False) use_cache = kwargs.pop("use_cache", True) conn_type = kwargs.pop("conn_type", LMIBaseClient.CONN_TYPE_WBEM) key_file = kwargs.pop("key_file", None) cert_file = kwargs.pop("cert_file", None) verify_server_cert = kwargs.pop("verify_server_cert", True) if kwargs: raise TypeError("__init__() got an unexpected keyword arguments: %s" % ", ".join(kwargs.keys())) self._client = LMIShellClient(uri, username, password, interactive=interactive, use_cache=use_cache, conn_type=conn_type, key_file=key_file, cert_file=cert_file, verify_server_cert=verify_server_cert) self._indications = {} # Register LMIConnection.__unsubscribe_all_indications() to be called at LMIShell's exit. atexit.register(lambda: self.__unsubscribe_all_indications()) def __repr__(self): """ :returns: pretty string for the object. """ return "%s(URI='%s', user='%s'...)" % (self.__class__.__name__, self._client.uri, self._client.username) @property
[docs] def uri(self): """ :returns: URI of the CIMOM :rtype: string """ return self._client.uri
@property
[docs] def namespaces(self): """ :returns: list of all available namespaces **Usage:** :ref:`namespaces_available_namespaces`. """ return ["root"]
@property
[docs] def root(self): """ :returns: :py:class:`.LMINamespaceRoot` object for *root* namespace """ return LMINamespaceRoot(self)
[docs] def print_namespaces(self): """ Prints out all available namespaces. """ sys.stdout.write("root\n")
[docs] def get_namespace(self, namespace): """ :param string namespace: namespace path (eg. `root/cimv2`) :returns: :py:class:`LMINamespace` object :raises: :py:exc:`.LMINamespaceNotFound` """ def get_namespace_priv(namespace, namespace_path): if not namespace_path: return namespace ns = namespace_path.pop(0) return get_namespace_priv(getattr(namespace, ns), namespace_path) namespace_path = namespace.split("/") ns = namespace_path.pop(0) if not ns in self.namespaces: raise LMINamespaceNotFound(ns) return get_namespace_priv(getattr(self, ns), namespace_path)
[docs] def clear_cache(self): """ Clears the cache. """ self._client._cache.clear()
[docs] def use_cache(self, active=True): """ Sets a bool flag, which defines, if the LMIShell should use a cache. :param bool active: whether the LMIShell's cache should be used """ self._client._cache.active = active
[docs] def verify_credentials(self): """ Verifies credentials by performing a "dummy" :func:`GetClass` call on "SomeNonExistingClass". Provided credentials are OK, if the LMIShell obtains :py:exc:`pywbem.CIMError` exception with the flag ``CIM_ERR_NOT_FOUND`` set. Otherwise, the should receive :py:exc:`pywbem.AuthError`. :returns: True if provided credentials are OK; False otherwise """ try: use_exceptions = lmi_get_use_exceptions() lmi_set_use_exceptions(True) try: self._client._get_class("SomeNonExistingClass") except: raise finally: lmi_set_use_exceptions(use_exceptions) except pywbem.cim_operations.CIMError, e: if e.args[0] == pywbem.cim_constants.CIM_ERR_NOT_FOUND: return True lmi_raise_or_dump_exception(e) except pywbem.cim_http.AuthError, e: lmi_raise_or_dump_exception(e) except OpenSSL.SSL.Error, e: lmi_raise_or_dump_exception(e) return False
[docs] def subscribe_indication(self, **kwargs): """ Subscribes to an indication. Indication is formed by 3 objects, where 2 of them (filter and handler) can be provided, if the LMIShell should not create those 2 by itself. **NOTE:** Currently the call registers :py:mod:`atexit` hook, which auto-deletes all subscribed indications by the LMIShell. :param dictionary kwargs: parameters for the indication subscription * **Filter** (*LMIInstance*) -- if provided, the :py:class:`LMIInstance` object will be used instead of creating a new one; **optional** * **Handler** (*LMIInstance*) -- if provided, the :py:class:`LMIInstance` object will be used instead of creating a new one; **optional** * **Query** (*string*) -- string containing a query for the indications filtering * **QueryLanguage** (*string*) -- query language; eg. *WQL*, or *DMTF:CQL*. This parameter is optional, default value is *DMTF:CQL*. * **Name** (*string*) -- indication name * **CreationNamespace** (*string*) -- creation namespace. This parameter is optional, default value is *root/interop*. * **SubscriptionCreationClassName** (*string*) -- subscription object class name. This parameter is optional, default value is *CIM_IndicationSubscription*. * **Permanent** (*bool*) -- whether to preserve the created subscription on LMIShell's quit. Default value is False. * **FilterCreationClassName** (*string*) -- creation class name of the filter object. This parameter is options, default value is *CIM_IndicationFilter*. * **FilterSystemCreationClassName** (*string*) -- system creation class name of the filter object. This parameter is optional, default value is *CIM_ComputerSystem*. * **FilterSourceNamespace** (*string*) -- local namespace where the indications originate. This parameter is optional, default value is *root/cimv2*. * **HandlerCreationClassName** (*string*) -- creation class name of the handler object. This parameter is optional, default value is *CIM_IndicationHandlerCIMXML*. * **HandlerSystemCreationClassName** (*string*) -- system creation name of the handler object. This parameter is optional, default value is *CIM_ComputerSystem*. * **Destination** (*string*) -- destination URI, where the indications should be delivered :returns: :py:class:`LMIReturnValue` object with ``rval`` set to True, if indication was subscribed; False otherwise. If a error occurs, ``errorstr`` is set to appropriate error string. """ try: indication_namespace = kwargs.get("CreationNamespace", "root/interop") cim_filter_provided = "Filter" in kwargs if cim_filter_provided: filt = kwargs["Filter"] cim_filter = None if isinstance(filt, LMIInstance): cim_filter = filt._cim_instance elif isinstance(filt, pywbem.CIMInstance): cim_filter = filt else: errorstr = "Filter argument accepts instances of CIMInstance or LMIInstance" lmi_raise_or_dump_exception(LMIIndicationError(errorstr)) return LMIReturnValue(rval=False, errorstr=errorstr) else: cim_filter_props = { "CreationClassName" : kwargs.get( "FilterCreationClassName", "CIM_IndicationFilter"), "SystemCreationClassName" : kwargs.get( "FilterSystemCreationClassName", "CIM_ComputerSystem"), "SourceNamespace" : kwargs.get( "FilterSourceNamespace", "root/cimv2"), "SystemName" : self._client.uri, "Query" : kwargs["Query"], "QueryLanguage" : kwargs.get( "QueryLanguage", LMIBaseClient.QUERY_LANG_CQL), "Name" : kwargs["Name"] + "-filter" } (cim_filter, _, errorstr) = self._client._create_instance( cim_filter_props["CreationClassName"], indication_namespace, cim_filter_props ) if not cim_filter: lmi_raise_or_dump_exception(LMIIndicationError(errorstr)) return LMIReturnValue(rval=False, errorstr=errorstr) cim_handler_provided = "Handler" in kwargs if cim_handler_provided: cim_handler = kwargs["Handler"]._cim_instance else: cim_handler_props = { "CreationClassName" : kwargs.get( "HandlerCreationClassName", "CIM_IndicationHandlerCIMXML"), "SystemCreationClassName" : kwargs.get( "HandlerSystemCreationClassName", "CIM_ComputerSystem"), "SystemName" : self._client.uri, "Destination" : kwargs["Destination"] + "/" + kwargs["Name"], "Name" : kwargs["Name"] + "-handler" } (cim_handler, _, errorstr) = self._client._create_instance( cim_handler_props["CreationClassName"], indication_namespace, cim_handler_props) if not cim_handler: if not "Filter" in kwargs: self._client._delete_instance(cim_filter.path) lmi_raise_or_dump_exception(LMIIndicationError(errorstr)) return LMIReturnValue(rval=False, errorstr=errorstr) cim_subscription_props = { "Filter" : cim_filter.path, "Handler" : cim_handler.path } (cim_subscription, _, errorstr) = self._client._create_instance( kwargs.get( "SubscriptionCreationClassName", "CIM_IndicationSubscription"), indication_namespace, cim_subscription_props) if not cim_subscription: if not "Filter" in kwargs: self._client._delete_instance(cim_filter.path) if not "Handler" in kwargs: self._client._delete_instance(cim_handler.path) lmi_raise_or_dump_exception(LMIIndicationError(errorstr)) return LMIReturnValue(rval=False, errorstr=errorstr) # XXX: Should we auto-delete all the indications? permanent = kwargs.get("Permanent", False) self._indications[kwargs["Name"]] = LMISubscription( self._client, (cim_filter, not cim_filter_provided), (cim_handler, not cim_handler_provided), cim_subscription, permanent) except KeyError, e: errorstr = "Not all necessary parameters provided, missing: %s" % e lmi_raise_or_dump_exception(LMIIndicationError(errorstr)) return LMIReturnValue(rval=False, errorstr=errorstr) return LMIReturnValue(rval=True)
[docs] def unsubscribe_indication(self, name): """ Unsubscribes an indication. :param string name: indication name :returns: :py:class:`LMIReturnValue` object with ``rval`` set to True, if unsubscribed; False otherwise """ if not name in self._indications: errorstr = "No such indication" lmi_raise_or_dump_exception(LMIIndicationError(errorstr)) return LMIReturnValue(rval=False, errorstr=errorstr) indication = self._indications.pop(name) indication.delete() return LMIReturnValue(rval=True)
def __unsubscribe_all_indications(self): """ Unsubscribes all the indications, which were not marked as Permanent. """ def delete_subscription(subscription): if subscription.permanent: return subscription.delete() map(lambda obj: delete_subscription(obj), self._indications.values()) self._indications = {}
[docs] def unsubscribe_all_indications(self): """ Unsubscribes all the indications. This call ignores *Permanent* flag, which may be provided in :py:meth:`.LMIConnection.subscribe_indication`, and deletes all the subscribed indications. """ map(lambda obj: obj.delete(), self._indications.values()) self._indications = {}
[docs] def print_subscribed_indications(self): """ Prints out all the subscribed indications. """ for i in self._indications.keys(): sys.stdout.write("%s\n" % i)
[docs] def subscribed_indications(self): """ :returns: list of all the subscribed indications """ return self._indications.keys()