# Copyright (C) 2012-2013 Peter Hatina <phatina@redhat.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import sys
import pywbem
import signal
import socket
import urlparse
import threading
import collections
from LMIBaseObject import LMIWrapperBaseObject
from LMIBaseClient import LMIBaseClient
from LMIObjectFactory import LMIObjectFactory
from LMIFormatter import LMIMofFormatter
from LMIReturnValue import LMIReturnValue
from LMIConstantValues import LMIConstantValuesParamProp
from LMIConstantValues import LMIConstantValuesMethodReturnType
from LMIIndicationListener import LMIIndicationListener
from LMIJob import lmi_is_job_finished
from LMIJob import lmi_is_job_completed
from LMIJob import lmi_is_job_terminated
from LMIJob import lmi_is_job_killed
from LMIJob import lmi_is_job_exception
from LMIJob import JOB_NOT_FINISHED
from LMIJob import JOB_FINISH_DELAYED
from LMIJob import JOB_FINISH_EARLY
from LMIUtil import LMIPassByRef
from LMIUtil import lmi_raise_or_dump_exception
from LMIUtil import lmi_transform_to_cim_param
from LMIUtil import lmi_transform_to_lmi
from LMIExceptions import LMIIndicationListenerError
from LMIExceptions import LMIMethodCallError
from LMIExceptions import LMISynchroMethodCallError
from LMIExceptions import LMISynchroMethodCallFilterError
from LMIExceptions import LMIUnknownParameterError
from LMIExceptions import LMIHandlerNamePatternError
[docs]class LMISignalHelper(object):
"""
Helper class, which takes care of signal (de)registration and handling.
"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super(LMISignalHelper, cls).__new__(cls)
LMISignalHelper.reset(cls._instance)
return cls._instance
[docs] def reset(self):
"""
Resets the single instance into default state.
"""
self._handler_sigint = None
self._handler_sigterm = None
self._signal_handled = False
self._instance._callbacks = collections.OrderedDict()
[docs] def signal_attach(self):
"""
Registers *SIGINT* and *SIGTERM* signals to local handler in which, the flags for
each signal are modified, if such signal is caught.
"""
self._signal_handled = False
self._handler_sigint = signal.signal(signal.SIGINT, LMISignalHelper.__signal_handler)
self._handler_sigterm = signal.signal(signal.SIGTERM, LMISignalHelper.__signal_handler)
[docs] def signal_detach(self):
"""
Unregisters *SIGINT* and *SIGTERM* handler and removes all the attached callbacks.
"""
signal.signal(signal.SIGINT, self._handler_sigint)
signal.signal(signal.SIGTERM, self._handler_sigterm)
[docs] def signal_handled(self):
"""
:returns: True, if any of *SIGINT* or *SIGTERM* has been caught; False otherwise
"""
return self._signal_handled
[docs] def callback_attach(self, cb_name, cb):
"""
Registers a callback, which will be called when a *SIGINT* or *SIGTERM* is caught.
:param string cb_name: callback name
:param cb: callable object, which takes zero arguments
"""
self._callbacks[cb_name] = cb
[docs] def callback_detach(self, cb_name):
"""
Removes a callback from the callback dictionary.
:param string cb_name: callback name
"""
self._callbacks.pop(cb_name)
@staticmethod
def __signal_handler(signo, frame):
"""
Signal handler, which is called, when *SIGINT* and *SIGTERM* are sent to
the LMIShell.
:param int signo: signal number
:param frame: -- stack frame
**NOTE:** see help(signal)
"""
if signo in (signal.SIGINT, signal.SIGTERM):
LMISignalHelper._instance._signal_handled = True
for cb in LMISignalHelper._instance._callbacks.values():
cb()
[docs]class LMIMethod(LMIWrapperBaseObject):
"""
LMI wrapper class representing :py:class:`CIMMethod`.
:param LMIConnection conn: connection object
:param LMIInstance lmi_instance: :py:class:`LMIInstance` object, on which the method
call will be issued
:param string method: method name
:param bool sync_method: flag indicating, if we are trying to perform a
synchronous method call
"""
# 15 seconds sleep timeout for main waiting thread
_COND_WAIT_TIME = 15
# Wake count of main thread, when the GetInstance is performed to check,
# if the job object is present. Prevents infinite waiting for indication
# delivery. Maximum waiting time, before the GetInstance for job object
# will be called is: _COND_WAIT_TIME * _COND_WAIT_WAKE_CNT
_COND_WAIT_WAKE_CNT = 8
# Default tcp port, where the indications will be delivered.
# TODO: create a configuration option for the port
_INDICATION_DESTINATION_PORT = 10240
# Job classes, which can be used for synchro method calls
# TODO: create a configuration option for the static filters' classnames
_INDICATION_JOB_CLASSNAMES = (
"LMI_StorageJob",
"LMI_SoftwareInstallationJob",
"LMI_SoftwareVerificationJob",
"LMI_NetworkJob"
)
# Default namespace where the indication subscriptions, used for synchronous
# method calls, will be registered.
_INDICATION_NAMESPACE = "root/interop"
# When performing a synchronous method call and using the polling method to
# get a job object status, the sleep time between 2 polls doubles if it is
# less than _POLLING_ADAPT_MAX_WAITING_TIME.
_POLLING_ADAPT_MAX_WAITING_TIME = 128
def __init__(self, conn, lmi_instance, method, sync_method):
super(LMIMethod, self).__init__(conn)
self._lmi_instance = lmi_instance
self._method = lmi_instance._lmi_class._cim_class.methods[method]
self._sync_method = sync_method
# Store the constant values as a list. This can consume some time, if computed on demand.
self._valuemap_parameters_list = [k for (k, v) in self._method.parameters.iteritems() \
if "ValueMap" in v.qualifiers]
# For simplicity, we add return value constants to the same list
if "ValueMap" in self._method.qualifiers:
self._valuemap_parameters_list.append(self._method.name)
def __return_synchro_method_call(self, job_inst, job_refresh=True):
"""
Returns a :py:class:`LMIReturnValue` object with Job output parameters set.
:param LMIInstance job_inst: job returned from a synchronous method call
:param bool job_refresh: flag, which indicates, if the ``job_inst``
needs to be refreshed
:returns: :py:class:`LMIReturnValue` object with ``rval`` set to Job output parameters
"""
# Adjust return value from the job object
if job_refresh:
job_inst.refresh()
rval = None
rparams = pywbem.NocaseDict()
if not job_inst.JobOutParameters is None:
rparams = pywbem.NocaseDict({
k: x.value
for k, x in job_inst.JobOutParameters.properties.iteritems()})
rval = rparams["__ReturnValue"]
del rparams["__ReturnValue"] # NocaseDict has no pop()
errorstr = ""
# Is job in exception state? If so, adjust corresponding error string
# from job.GetError() instance
if lmi_is_job_exception(job_inst):
(refreshed, _, errorstr) = job_inst.refresh()
if not refreshed:
raise LMISynchroMethodCallError(errorstr)
(exc_rval, exc_rparams, exc_errorstr) = job_inst.GetError()
error_inst = exc_rparams.get("error", None)
if not error_inst:
raise LMISynchroMethodCallError("Could not get Job error message")
errorstr = error_inst.Message
return LMIReturnValue(rval=rval, rparams=rparams, errorstr=errorstr)
def __handle_synchro_method_call_indication(self, job_inst):
"""
Handles a synchronous call for asynchronous methods returning a job object. This
method uses static filters installed by each OpenLMI provider, which is capable of
using jobs.
:param LMIInstance job_inst: job object returned from a synchronous
method call
:returns: :py:class:`LMIReturnValue` object with ``rval`` set to 0 and ``rparams``
set to job's output parameters
:raises: :py:exc:`.LMIIndicationListenerError`,
:py:exc:`.LMISynchroMethodCallError`,
:py:exc:`.LMISynchroMethodCallFilterError`
**NOTE:** Static filters' names need to be in format
"``LMI:<job_class_name>:Changed``"
"""
def handle_job(ind, cond, job_finished, job_exception):
"""
:py:class:`LMIListener` handler for synchronous method call, which uses
indication means of waiting for the job. This function is called, when a job
changes its state.
:param threading.Condition cond: condition object used for thread synchronization
:param LMIPassByRef job_finished: used for synchronization, whether the
job has finished
:param LMIPassByRef job_exception: contains an exception object, if any
exception was raised
"""
cond.acquire()
try:
exp_obj = ind.exported_objects()[0]
src_inst = exp_obj["SourceInstance"]
if lmi_is_job_finished(src_inst):
# Job has just finished
job_finished.value = JOB_FINISH_DELAYED
except Exception, e:
# Notify main thread, we are not able to work with such objects
job_finished.value = JOB_FINISH_DELAYED
job_exception.value = e
finally:
# XXX: Let's be defensive, always notify+release main thread
cond.notify()
cond.release()
# Start indication listener
cond = threading.Condition()
job_finished = LMIPassByRef(JOB_NOT_FINISHED)
job_exception = LMIPassByRef(None)
# There needs to be a pattern of at least 8 "X" in a row at the end of the indication_name
indication_name = "synchro-method-call-XXXXXXXX"
listener = LMIIndicationListener("0.0.0.0", LMIMethod._INDICATION_DESTINATION_PORT)
indication_name = listener.add_handler(indication_name, handle_job, cond, job_finished,
job_exception)
if not listener.start():
raise LMIIndicationListenerError("Can not start indication listener")
# Search for necessary static filter
filter_name = "LMI:%s:Changed" % job_inst.classname
(cim_filters, _, err) = self._conn._client._get_instances("CIM_IndicationFilter",
LMIMethod._INDICATION_NAMESPACE, {"Name" : filter_name})
if not cim_filters:
listener.stop()
errorstr = "Can not find proper CIM_IndicationFilter for this method call"
raise LMISynchroMethodCallFilterError(errorstr)
cim_filter = cim_filters[0]
# Create handler object
netloc = urlparse.urlparse(self._conn.uri).netloc
if not netloc:
listener.stop()
errorstr = "Can not determine netloc from client's uri"
raise LMISynchroMethodCallError(errorstr)
netloc = netloc.split(":")[0]
# NOTE: This will work only on a local area network. Complicated networks may require
# additional configuration to make this work. See LMIMethod() and PreferPolling.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect((netloc, LMIMethod._INDICATION_DESTINATION_PORT))
except socket.gaierror, e:
listener.stop()
errorstr = "Can not determine IP address of this machine"
raise LMISynchroMethodCallError(errorstr)
destination = s.getsockname()[0]
s.close()
# NOTE: For now, we are using insecure HTTP as a transport protocol.
# TODO: Switch to HTTPS
cim_handler_props = {
"Name" : indication_name,
"Destination" : "http://%s:%d/%s" % (destination,
LMIMethod._INDICATION_DESTINATION_PORT, indication_name),
}
(cim_handler, _, err) = self._conn._client._create_instance("CIM_IndicationHandlerCIMXML",
LMIMethod._INDICATION_NAMESPACE, cim_handler_props)
if not cim_handler:
listener.stop()
errorstr = "Can not create CIM_IndicationHandlerCIMXML object"
raise LMISynchroMethodCallError(errorstr)
# Create indication subscription object
cim_subscription_props = {
"Filter" : cim_filter.path,
"Handler" : cim_handler.path
}
(cim_subscription, _, err) = self._conn._client._create_instance("CIM_IndicationSubscription",
LMIMethod._INDICATION_NAMESPACE, cim_subscription_props)
if not cim_subscription:
self._conn._client._delete_instance(cim_handler)
listener.stop()
errorstr = "Can not create CIM_IndicationSubscription object"
raise LMISynchroMethodCallError(errorstr)
# Check, if the job is not already in finished state,
# while we were subscribing for the indications
job_inst.refresh()
if lmi_is_job_finished(job_inst):
job_finished.value = JOB_FINISH_EARLY
# Register signal callback for SIGINT, SIGTERM with callback,
# which awakes waiting thread for immediate return.
LMISignalHelper().callback_attach("indication", lambda: LMIMethod.__wake(cond))
LMISignalHelper().signal_attach()
# Wait for the job to finish
wake_cnt = 0
cond.acquire()
while not LMISignalHelper().signal_handled() and \
not job_finished.value and \
not lmi_is_job_finished(job_inst):
cond.wait(LMIMethod._COND_WAIT_TIME)
wake_cnt += 1
# XXX: threading.Condition.wait() does not inform about timeout or being awaken by
# notify call. There is a counting to 4 sleep cycles before we actually check for
# job status manually. This number can be increased, so we rely more on indications,
# rather then on manual polling.
if wake_cnt >= LMIMethod._COND_WAIT_WAKE_CNT and \
not job_finished.value:
wake_cnt = 0
try:
(refreshed, _, errorstr) = job_inst.refresh()
except pywbem.cim_operations.CIMError, e:
job_exception.value = e
break
if not refreshed:
job_exception.value = LMISynchroMethodCallError(errorstr)
break
# Unregister signal handler
LMISignalHelper().signal_detach()
LMISignalHelper().callback_detach("indication")
cond.release()
# Cleanup
listener.stop()
self._conn._client._delete_instance(cim_subscription.path)
self._conn._client._delete_instance(cim_handler.path)
if job_exception.value:
raise job_exception.value
if LMISignalHelper().signal_handled() and not job_finished.value:
# We got SIGINT or SIGTERM, when waiting for the job, cancelling the job
sys.stderr.write("Cancelling a job '%s'\n" % job_inst.Name)
job_inst.RequestStateChange(
RequestedState=job_inst.RequestStateChange.RequestedStateValues.Terminate
)
return LMIReturnValue(rval=None)
# Return the job return values, refresh the job_inst object, if we got notified
# about the job finish state by indication.
return self.__return_synchro_method_call(job_inst, job_finished.value == JOB_FINISH_DELAYED)
def __handle_synchro_method_call_polling(self, job_inst):
"""
Handles a synchronous call for asynchronous methods returning a job object.
This call uses polling method to wait for the job to finish.
:param LMIInstance job_inst: job object
:returns: :py:class:`LMIReturnValue` object with ``rval`` set to 0 and ``rparams``
set to job's output parameters.
:raises: :py:exc:`.LMISynchroMethodCallError`
"""
# Register signal callback for SIGINT, SIGTERM with callback,
# which awakes waiting thread for immediate return.
LMISignalHelper().callback_attach("polling", lambda: LMIMethod.__wake(cond))
LMISignalHelper().signal_attach()
cond = threading.Condition()
cond.acquire()
job_exception = None
try:
sleep_time = 1
while not LMISignalHelper().signal_handled() and \
not lmi_is_job_finished(job_inst):
# Sleep, a bit longer in every iteration
cond.wait(sleep_time)
if sleep_time < LMIMethod._POLLING_ADAPT_MAX_WAITING_TIME:
sleep_time *= 2
(refreshed, _, errorstr) = job_inst.refresh()
if not refreshed:
job_exception = LMISynchroMethodCallError(errorstr)
break
except pywbem.cim_operations.CIMError, e:
job_exception = LMISynchroMethodCallError(e.message)
finally:
cond.release()
# Unregister signal handler and callback
LMISignalHelper().signal_detach()
LMISignalHelper().callback_detach("polling")
if LMISignalHelper().signal_handled() and not lmi_is_job_finished(job_inst):
# We got SIGINT or SIGTERM, when waiting for the job, cancelling the job
sys.stderr.write("Cancelling a job '%s'\n" % job_inst.Name)
job_inst.RequestStateChange(
RequestedState=job_inst.RequestStateChange.RequestedStateValues.Terminate
)
return LMIReturnValue(rval=None)
if not job_exception is None:
raise job_exception
# Return the job return values. No need to refresh the job instance, we already
# have a "fresh" one.
return self.__return_synchro_method_call(job_inst, False)
def __call__(self, method_args=None, **kwargs):
"""
Perform a method call.
Method arguments are preferably passed by dictionary (parameter : value). Using
former means of passing arguments, by keyword arguments, to a method call works
too.
If performing a synchronous method call, passing PreferPolling can be used
to select which method should be used -- either subscribing to an indication
or polling method.
:param dictionary method_args: method arguments
:param dictionary kwargs: keyword method arguments (``method_args`` preffered)
* **RefreshInstance** (*bool*) flag, which tells the LMIShell, whether
the instance should be refreshed after a method call. Default
value is False.
:returns: :py:class:`LMIReturnValue` object with ``rval`` set to method's return
value; ``rparams`` set to method's return parameters; ``errorstr`` set
method's error string in case of failure
:raises: :py:exc:`.LMIUnknownParameterError`,
:py:exc:`.LMIMethodCallError`
:py:exc:`.LMISynchroMethodCallError`
**Usage:** :ref:`instances_methods`.
"""
synchro_method_polling = kwargs.pop("PreferPolling", False)
refresh_requested = kwargs.pop("RefreshInstance", False)
if method_args is None:
method_args = {}
method_args.update(kwargs)
for (param, value) in method_args.iteritems():
if param in self._method.parameters:
# Cast input parameters into acceptable CIM types
t = self._method.parameters[param].type
method_args[param] = lmi_transform_to_cim_param(t, value)
else:
# NOTE: maybe we could check for pywbem type and not to exit prematurely
errorstr = "Unknown parameter '%s' supplied for method '%s'" % (param, self._method.name)
lmi_raise_or_dump_exception(LMIUnknownParameterError(errorstr))
return LMIReturnValue(rval=-1, errorstr=errorstr)
(rval, call_rparams, call_errorstr) = self._conn._client._call_method_raw(self._lmi_instance,
self._method.name, **method_args)
rval = lmi_transform_to_lmi(self._conn, rval)
if call_rparams:
call_rparams = lmi_transform_to_lmi(self._conn, call_rparams)
if not call_rparams:
# NOTE: this is wrong! What should we do?
errorstr = "Could not perform CIM -> LMI object transformation"
lmi_raise_or_dump_exception(LMIMethodCallError(errorstr))
return LMIReturnValue(rval=rval, errorstr=errorstr)
# Check if we can perform synchronous method call
job = call_rparams.get("job", None)
can_perform_sync_call = job.classname in LMIMethod._INDICATION_JOB_CLASSNAMES if job else False
if self._sync_method and job and can_perform_sync_call:
# Synchronous method calls
job_inst = call_rparams['job'].to_instance()
# At first, try to wait for the call to finish by subscribing to an indication
handled_by_indication = False
if not synchro_method_polling:
try:
(rval, call_rparams, call_errorstr) = self.__handle_synchro_method_call_indication(job_inst)
handled_by_indication = True
except pywbem.CIMError, e:
lmi_raise_or_dump_exception(e)
return LMIReturnValue(rval=-1, errorstr=e.args[1])
except LMISynchroMethodCallError, e:
lmi_raise_or_dump_exception(e)
return LMIReturnValue(rval=-1, errorstr=e.message)
# Fall through, try to handle the synchro call by polling
except LMIHandlerNamePatternError, e:
handled_by_indication = False
except LMISynchroMethodCallFilterError, e:
handled_by_indication = False
except LMIIndicationListenerError, e:
handled_by_indication = False
if not handled_by_indication:
# Executed, when LMIListener can not be started
try:
(rval, call_rparams, call_errorstr) = self.__handle_synchro_method_call_polling(job_inst)
except LMISynchroMethodCallError, e:
lmi_raise_or_dump_exception(e)
return LMIReturnValue(rval=-1, errorstr=e.message)
call_rparams = lmi_transform_to_lmi(self._conn, call_rparams)
if refresh_requested and not self._lmi_instance.refresh():
# NOTE: this is wrong! What should we do?
errorstr = "Could not update an LMI object after a method call"
lmi_raise_or_dump_exception(LMIMethodCallError(errorstr))
return LMIReturnValue(rval=rval, errorstr=errorstr)
return LMIReturnValue(rval=rval, rparams=call_rparams, errorstr=call_errorstr)
def __getattr__(self, name):
"""
Returns either a class member, or a constant value.
:param string name: class member, or the constant value name
"""
if name in self.__dict__:
return self.__dict__[name]
if name.endswith("Values"):
parameter_name = name[:-6]
if parameter_name in self._method.parameters:
return LMIConstantValuesParamProp(self._method.parameters[parameter_name])
elif parameter_name == self._method.name:
return LMIConstantValuesMethodReturnType(self._method)
raise AttributeError(name)
@staticmethod
def __wake(cond):
"""
Helper function used for manual :py:attr:`threading.Condition` wakeup.
:param threading.Condition cond: condition object
"""
cond.acquire()
cond.notify()
cond.release()
@property
[docs] def return_type(self):
"""
:returns: string of the method call's return type
"""
return self._method.return_type
[docs] def tomof(self):
"""
Prints out a message with MOF representation of :py:class:`CIMMethod`. If the
LMIShell is run in a interactive mode, the output will be redirected to a pager
set by environment variable :envvar:`PAGER`. If there is not :envvar:`PAGER` set,
less or more will be used as a fall-back.
"""
LMIMofFormatter(self._method.tomof()).fancy_format(self._conn._client.interactive)
[docs] def valuemap_parameters(self):
"""
:returns: list of strings of the constant names
"""
return self._valuemap_parameters_list
[docs] def print_valuemap_parameters(self):
"""
Prints out the list of strings of constant names.
"""
for i in self._valuemap_parameters_list:
sys.stdout.write("%s\n" % i)
[docs] def parameters(self):
"""
:returns: list of strings of :py:class:`CIMMethod`'s parameters
"""
return self._method.parameters
[docs] def print_parameters(self):
"""
Prints out :py:class:`CIMMethod`'s parameters.
"""
for (param, value) in self._method.parameters.iteritems():
sys.stdout.write("%s %s%s\n" % (value.type, param, "[]" if value.is_array else ""))
@property
[docs] def wrapped_object(self):
"""
:returns: wrapped :py:class:`CIMmethod` object
"""
return self._method