"""Kazoo testing harnesses"""
import logging
import os
import uuid
import unittest
from kazoo import python2atexit as atexit
from kazoo.client import KazooClient
from kazoo.exceptions import KazooException
from kazoo.protocol.connection import _CONNECTION_DROP, _SESSION_EXPIRED
from kazoo.protocol.states import (
KazooState
)
from kazoo.testing.common import ZookeeperCluster
log = logging.getLogger(__name__)
CLUSTER = None
CLUSTER_CONF = None
CLUSTER_DEFAULTS = {
"ZOOKEEPER_PORT_OFFSET": 20000,
"ZOOKEEPER_CLUSTER_SIZE": 3,
"ZOOKEEPER_OBSERVER_START_ID": -1,
}
def get_global_cluster():
global CLUSTER, CLUSTER_CONF
cluster_conf = {
k: os.environ.get(k, CLUSTER_DEFAULTS.get(k))
for k in ["ZOOKEEPER_PATH",
"ZOOKEEPER_CLASSPATH",
"ZOOKEEPER_PORT_OFFSET",
"ZOOKEEPER_CLUSTER_SIZE",
"ZOOKEEPER_VERSION",
"ZOOKEEPER_OBSERVER_START_ID",
"ZOOKEEPER_JAAS_AUTH"]
}
if CLUSTER is not None:
if CLUSTER_CONF == cluster_conf:
return CLUSTER
else:
log.info('Config change detected. Reconfiguring cluster...')
CLUSTER.terminate()
CLUSTER = None
# Create a new cluster
ZK_HOME = cluster_conf.get("ZOOKEEPER_PATH")
ZK_CLASSPATH = cluster_conf.get("ZOOKEEPER_CLASSPATH")
ZK_PORT_OFFSET = int(cluster_conf.get("ZOOKEEPER_PORT_OFFSET"))
ZK_CLUSTER_SIZE = int(cluster_conf.get("ZOOKEEPER_CLUSTER_SIZE"))
ZK_VERSION = cluster_conf.get("ZOOKEEPER_VERSION")
if '-' in ZK_VERSION:
# Ignore pre-release markers like -alpha
ZK_VERSION = ZK_VERSION.split('-')[0]
ZK_VERSION = tuple([int(n) for n in ZK_VERSION.split('.')])
ZK_OBSERVER_START_ID = int(cluster_conf.get("ZOOKEEPER_OBSERVER_START_ID"))
assert ZK_HOME or ZK_CLASSPATH or ZK_VERSION, (
"Either ZOOKEEPER_PATH or ZOOKEEPER_CLASSPATH or "
"ZOOKEEPER_VERSION environment variable must be defined.\n"
"For deb package installations this is /usr/share/java")
if ZK_VERSION >= (3, 5):
additional_configuration_entries = [
"4lw.commands.whitelist=*",
"reconfigEnabled=true"
]
# If defined, this sets the superuser password to "test"
additional_java_system_properties = [
"-Dzookeeper.DigestAuthenticationProvider.superDigest="
"super:D/InIHSb7yEEbrWz8b9l71RjZJU="
]
else:
additional_configuration_entries = []
additional_java_system_properties = []
ZOOKEEPER_JAAS_AUTH = cluster_conf.get("ZOOKEEPER_JAAS_AUTH")
if ZOOKEEPER_JAAS_AUTH == "digest":
jaas_config = """
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_super="super_secret"
user_jaasuser="jaas_password";
};"""
elif ZOOKEEPER_JAAS_AUTH == "gssapi":
# Configure Zookeeper to use our test KDC.
additional_java_system_properties += [
"-Djava.security.krb5.conf=%s" % os.path.expandvars(
"${KRB5_CONFIG}"
),
"-Dsun.security.krb5.debug=true",
]
jaas_config = """
Server {
com.sun.security.auth.module.Krb5LoginModule required
debug=true
isInitiator=false
useKeyTab=true
keyTab="%s"
storeKey=true
useTicketCache=false
principal="zookeeper/127.0.0.1@KAZOOTEST.ORG";
};""" % os.path.expandvars("${KRB5_TEST_ENV}/server.keytab")
else:
jaas_config = None
CLUSTER = ZookeeperCluster(
install_path=ZK_HOME,
classpath=ZK_CLASSPATH,
port_offset=ZK_PORT_OFFSET,
size=ZK_CLUSTER_SIZE,
observer_start_id=ZK_OBSERVER_START_ID,
configuration_entries=additional_configuration_entries,
java_system_properties=additional_java_system_properties,
jaas_config=jaas_config
)
CLUSTER_CONF = cluster_conf
atexit.register(lambda cluster: cluster.terminate(), CLUSTER)
return CLUSTER
[docs]class KazooTestHarness(unittest.TestCase):
"""Harness for testing code that uses Kazoo
This object can be used directly or as a mixin. It supports starting
and stopping a complete ZooKeeper cluster locally and provides an
API for simulating errors and expiring sessions.
Example::
class MyTestCase(KazooTestHarness):
def setUp(self):
self.setup_zookeeper()
# additional test setup
def tearDown(self):
self.teardown_zookeeper()
def test_something(self):
something_that_needs_a_kazoo_client(self.client)
def test_something_else(self):
something_that_needs_zk_servers(self.servers)
"""
DEFAULT_CLIENT_TIMEOUT = 15
def __init__(self, *args, **kw):
super(KazooTestHarness, self).__init__(*args, **kw)
self.client = None
self._clients = []
@property
def cluster(self):
return get_global_cluster()
@property
def servers(self):
return ",".join([s.address for s in self.cluster])
def _get_nonchroot_client(self):
c = KazooClient(self.servers)
self._clients.append(c)
return c
def _get_client(self, **client_options):
if 'timeout' not in client_options:
client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
c = KazooClient(self.hosts, **client_options)
self._clients.append(c)
return c
def lose_connection(self, event_factory):
"""Force client to lose connection with server"""
self.__break_connection(_CONNECTION_DROP, KazooState.SUSPENDED,
event_factory)
def expire_session(self, event_factory):
"""Force ZK to expire a client session"""
self.__break_connection(_SESSION_EXPIRED, KazooState.LOST,
event_factory)
def setup_zookeeper(self, **client_options):
"""Create a ZK cluster and chrooted :class:`KazooClient`
The cluster will only be created on the first invocation and won't be
fully torn down until exit.
"""
do_start = False
for s in self.cluster:
if not s.running:
do_start = True
if do_start:
self.cluster.start()
namespace = "/kazootests" + uuid.uuid4().hex
self.hosts = self.servers + namespace
if 'timeout' not in client_options:
client_options['timeout'] = self.DEFAULT_CLIENT_TIMEOUT
self.client = self._get_client(**client_options)
self.client.start()
self.client.ensure_path("/")
def teardown_zookeeper(self):
"""Reset and cleanup the zookeeper cluster that was started."""
while self._clients:
c = self._clients.pop()
try:
c.stop()
except KazooException:
log.exception("Failed stopping client %s", c)
finally:
c.close()
self.client = None
def __break_connection(self, break_event, expected_state, event_factory):
"""Break ZooKeeper connection using the specified event."""
lost = event_factory()
safe = event_factory()
def watch_loss(state):
if state == expected_state:
lost.set()
elif lost.is_set() and state == KazooState.CONNECTED:
safe.set()
return True
self.client.add_listener(watch_loss)
self.client._call(break_event, None)
lost.wait(5)
if not lost.isSet():
raise Exception("Failed to get notified of broken connection.")
safe.wait(15)
if not safe.isSet():
raise Exception("Failed to see client reconnect.")
self.client.retry(self.client.get_async, '/')
[docs]class KazooTestCase(KazooTestHarness):
def setUp(self):
self.setup_zookeeper()
def tearDown(self):
self.teardown_zookeeper()