# Copyright (C) 2012-2013 Peter Hatina <phatina@redhat.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, see <http://www.gnu.org/licenses/>.
import sys
import ssl
import time
import pywbem
import random
import socket
import string
import threading
from SocketServer import BaseRequestHandler
from SocketServer import ThreadingMixIn
from BaseHTTPServer import HTTPServer
from BaseHTTPServer import BaseHTTPRequestHandler
from LMIIndication import LMIIndication
from LMIExceptions import LMIHandlerNamePatternError
[docs]class LMIIndicationHandlerCallback(object):
"""
Helper class, which stores indication handler callback with its arguments and keyword
arguments.
:param callback: callback, which is called, when a indication arrives
:param tuple args: positional arguments for callback
:param kwargs: keyword arguments for callback
"""
def __init__(self, callback, *args, **kwargs):
self.callback = callback
self.args = args
self.kwargs = kwargs
[docs]class LMIIndicationHandler(ThreadingMixIn, BaseHTTPRequestHandler):
"""
Class representing indication handler. The class is derived from
:py:class:`BaseHTTPRequestHandler`, because the indications are transported by http
protocol, and from :py:class:`ThreadingMixIn`; the indication listener is designed as
concurrent server to properly handle each incoming indication.
"""
[docs] def do_POST(self):
"""
Overridden method, which is called, when a indication is received. It parses the
indication message and searches for a matching handler for the indication name.
Each subscribed indication should have it's Destination set something similar:
``<schema>://<destination>/<indication_name>``
where the ``indication_name`` is a string, which properly defines the indication.
"""
msg = self.rfile.read()
tt = pywbem.parse_cim(pywbem.xml_to_tupletree(msg))
message = tt[2]
export_methods = {}
if message[0].upper() != "MESSAGE":
return
message_params = message[2]
if not message_params:
return
for param in message_params:
if param[0].upper() != "SIMPLEEXPREQ":
continue
export_method_call = param[2]
export_method_name = export_method_call[1]["NAME"]
exp_params = export_method_call[2]
export_method_params = {}
for method_param in exp_params:
export_method_params[method_param[0]] = method_param[1]
export_methods[export_method_name] = export_method_params
ind = LMIIndication(export_methods)
path = self.path[1:]
if path in self.server._handlers:
cb = self.server._handlers[path]
cb.callback(ind, *cb.args, **cb.kwargs)
[docs]class LMIIndicationServer(ThreadingMixIn, HTTPServer):
"""
Class representing indication server, derived from HTTPServer and designed as
concurrent server.
"""
daemon_threads = True
[docs]class LMIIndicationListener(object):
"""
Class representing indication listener, which provides a unified API for the server
startup and shutdown and for registering an indication handler.
:param string hostname: hostname or address of the machine, where the
indications will be delivered
:param int port: TCP port, where the server should listen for incoming messages
:param string certfile: path to certificate file, if SSL used
:param string keyfile: path to key file, if SSL used
"""
# Minimum replacable "X" characters in handler pattern string.
HANDLER_MINIMUM_REPL_CHARS_COUNT = 8
def __init__(self, hostname, port, certfile=None, keyfile=None):
self._handlers = {}
self._hostname = hostname
self._port = port
self._certfile = certfile
self._keyfile = keyfile
def __create_handler_name(self, handler_name_pattern):
"""
Returns unique handler name by replacing "**X**" characters for random characters
at the end of the handler_name_pattern.
:param string handler_name_pattern: string containing replaceable characters at the end
:returns: unique handler name
:rtype: string
"""
x_cnt = 0
pattern_len = len(handler_name_pattern)
while True:
while handler_name_pattern[pattern_len - 1 - x_cnt] == "X":
x_cnt += 1
if x_cnt < LMIIndicationListener.HANDLER_MINIMUM_REPL_CHARS_COUNT:
raise LMIHandlerNamePatternError("Not enough replaceable characters provided")
unique_str = handler_name_pattern[:pattern_len - x_cnt] + \
"".join(random.choice(string.ascii_uppercase + string.digits) for x in range(x_cnt))
if not unique_str in self._handlers:
break
return unique_str
[docs] def start(self):
"""
Starts a indication listener.
The indication listener runs in a newly-created thread.
:returns: True, if the indication listener started with no errors; False otherwise
"""
try:
self._server = LMIIndicationServer((self._hostname, self._port), LMIIndicationHandler)
except socket.error, e:
return False
self._server._handlers = self._handlers
self._server_thread = threading.Thread(target=self._server.serve_forever)
self._server_thread.daemon = True
if self._certfile:
self._server.socket = ssl.wrap_socket(
self._server.socket,
certfile=self._certfile,
keyfile=self._keyfile,
server_side=True)
self._server_thread.start()
return True
[docs] def stop(self):
"""
Stops the indication listener.
This method will also terminate the listener thread.
"""
self._server.shutdown()
self._server_thread.join()
@property
[docs] def is_alive(self):
"""
:returns: flag indicating, if the indication listener is running
:rtype: bool
"""
return self._server_thread.is_alive()
@property
[docs] def hostname(self):
"""
:returns: hostname or address, where the indication listener is waiting for
messages
:rtype: string
"""
return self._hostname
@property
[docs] def port(self):
"""
:returns: port, where the indication listener is waiting for messages
:rtype: int
"""
return self._port
[docs] def add_handler(self, handler_name_pattern, handler, *args, **kwargs):
"""
Registers a handler into the indication listener. Returns a string, which is used
for the indication recognition, when a message arrives.
:param string handler_name_pattern: string, which may contain set of "**X**"
characters at the end of the string. The "**X**" characters will be replaced by
random characters and the whole string will form a unique string.
:param handler: callable, which will be executed, when a indication is received
:param tuple args: positional arguments for the handler
:param dictionary kwargs: keyword arguments for the handler
:returns: handler unique name
:rtype: string
"""
handler_name = self.__create_handler_name(handler_name_pattern + "X" * 16)
self._handlers[handler_name] = LMIIndicationHandlerCallback(handler, *args, **kwargs)
return handler_name
[docs] def remove_handler(self, name):
"""
Removes a specified handler from the indication listener database.
:param string name: indication name; returned by
:py:meth:`LMIIndicationListener.add_handler`
"""
if not name in self._handlers:
return
self._handlers.pop(name)