Source code for kazoo.handlers.utils

"""Kazoo handler helpers"""

HAS_FNCTL = True
try:
    import fcntl
except ImportError:  # pragma: nocover
    HAS_FNCTL = False
import functools
import os


def _set_default_tcpsock_options(module, sock):
    sock.setsockopt(module.IPPROTO_TCP, module.TCP_NODELAY, 1)
    if HAS_FNCTL:
        flags = fcntl.fcntl(sock, fcntl.F_GETFD)
        fcntl.fcntl(sock, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
    return sock


[docs]def create_pipe(): """Create a non-blocking read/write pipe. """ r, w = os.pipe() if HAS_FNCTL: fcntl.fcntl(r, fcntl.F_SETFL, os.O_NONBLOCK) fcntl.fcntl(w, fcntl.F_SETFL, os.O_NONBLOCK) return r, w
[docs]def create_tcp_socket(module): """Create a TCP socket with the CLOEXEC flag set. """ type_ = module.SOCK_STREAM if hasattr(module, 'SOCK_CLOEXEC'): # pragma: nocover # if available, set cloexec flag during socket creation type_ |= module.SOCK_CLOEXEC sock = module.socket(module.AF_INET, type_) _set_default_tcpsock_options(module, sock) return sock
def create_tcp_connection(module, address, timeout=None): if timeout is None: # thanks to create_connection() developers for # this ugliness... timeout = module._GLOBAL_DEFAULT_TIMEOUT sock = module.create_connection(address, timeout) _set_default_tcpsock_options(module, sock) return sock
[docs]def capture_exceptions(async_result): """Return a new decorated function that propagates the exceptions of the wrapped function to an async_result. :param async_result: An async result implementing :class:`IAsyncResult` """ def capture(function): @functools.wraps(function) def captured_function(*args, **kwargs): try: return function(*args, **kwargs) except Exception as exc: async_result.set_exception(exc) return captured_function return capture
[docs]def wrap(async_result): """Return a new decorated function that propagates the return value or exception of wrapped function to an async_result. NOTE: Only propagates a non-None return value. :param async_result: An async result implementing :class:`IAsyncResult` """ def capture(function): @capture_exceptions(async_result) def captured_function(*args, **kwargs): value = function(*args, **kwargs) if value is not None: async_result.set(value) return value return captured_function return capture