Add handlers for event messages

This commit is contained in:
Tyler Hobbs
2013-03-27 11:11:46 -05:00
parent 5c9d8ef311
commit 84de3e6a76

View File

@@ -81,6 +81,8 @@ class _Scheduler(object):
t.daemon = True
t.start()
# TODO add a shutdown method to stop processing the queue?
def schedule(self, delay, fn, *args, **kwargs):
run_at = time.time() + delay
self._scheduled.put_nowait((run_at, (fn, args, kwargs)))
@@ -203,6 +205,8 @@ class Cluster(object):
for session in self.sessions:
session.shutdown()
self.executor.shutdown()
def _new_session(self):
session = Session(self, self.metadata.all_hosts())
self.sessions.add(session)
@@ -393,19 +397,41 @@ class ControlConnection(object):
addr, port = event["address"]
if change_type == "NEW_NODE":
# TODO check Host constructor
self.add_host(addr)
self._cluster.scheduler.schedule(1, self.add_host, addr, signal=True)
elif change_type == "REMOVED_NODE":
self.remove_host(self._cluster.metadata.get_host(addr))
host = self._cluster.metadata.get_host(addr)
self._cluster.scheduler.schedule(1, self.remove_host, host)
elif change_type == "MOVED_NODE":
self.refresh_node_list_and_token_map()
self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map)
def _handle_status_change(self, event):
# TODO handled async in Cluster.java
change_type = event["change_type"]
addr, port = event["address"]
if change_type == "UP":
host = self._cluster.metadata.get_host(addr)
if not host:
self._cluster.scheduler.schedule(1, self.add_host, addr, signal=True)
else:
self._cluster.scheduler.schedule(1, self.on_up, host)
elif change_type == "DOWN":
# Ignore down event. Connection will realize a node is dead quicly
# enough when it writes to it, and there is no point in taking the
# risk of marking the node down mistakenly because we didn't
# receive the event in a timely fashion
pass
def _handle_schema_change(self, event):
# TODO handled async in Cluster.java
pass
change_type, ks, cf = event
if change_type in ("CREATED", "DROPPED"):
if not cf:
self._cluster.executor.submit(self.refresh_schema)
else:
self._cluster.executor.submit(self.refresh_schema, ks)
elif change_type == "UPDATED":
if not cf:
self._cluster.executor.submit(self.refresh_schema, ks)
else:
self._cluster.executor.submit(self.refresh_schema, ks, cf)
def wait_for_schema_agreement(self):
# TODO is returning True/False the best option for this? Potentially raise Exception?