diff --git a/cassandra/cluster.py b/cassandra/cluster.py index ebce728e..77a4f932 100644 --- a/cassandra/cluster.py +++ b/cassandra/cluster.py @@ -81,6 +81,8 @@ class _Scheduler(object): t.daemon = True t.start() + # TODO add a shutdown method to stop processing the queue? + def schedule(self, delay, fn, *args, **kwargs): run_at = time.time() + delay self._scheduled.put_nowait((run_at, (fn, args, kwargs))) @@ -203,6 +205,8 @@ class Cluster(object): for session in self.sessions: session.shutdown() + self.executor.shutdown() + def _new_session(self): session = Session(self, self.metadata.all_hosts()) self.sessions.add(session) @@ -393,19 +397,41 @@ class ControlConnection(object): addr, port = event["address"] if change_type == "NEW_NODE": # TODO check Host constructor - self.add_host(addr) + self._cluster.scheduler.schedule(1, self.add_host, addr, signal=True) elif change_type == "REMOVED_NODE": - self.remove_host(self._cluster.metadata.get_host(addr)) + host = self._cluster.metadata.get_host(addr) + self._cluster.scheduler.schedule(1, self.remove_host, host) elif change_type == "MOVED_NODE": - self.refresh_node_list_and_token_map() + self._cluster.scheduler.schedule(1, self.refresh_node_list_and_token_map) def _handle_status_change(self, event): - # TODO handled async in Cluster.java - pass + change_type = event["change_type"] + addr, port = event["address"] + if change_type == "UP": + host = self._cluster.metadata.get_host(addr) + if not host: + self._cluster.scheduler.schedule(1, self.add_host, addr, signal=True) + else: + self._cluster.scheduler.schedule(1, self.on_up, host) + elif change_type == "DOWN": + # Ignore down event. Connection will realize a node is dead quicly + # enough when it writes to it, and there is no point in taking the + # risk of marking the node down mistakenly because we didn't + # receive the event in a timely fashion + pass def _handle_schema_change(self, event): - # TODO handled async in Cluster.java - pass + change_type, ks, cf = event + if change_type in ("CREATED", "DROPPED"): + if not cf: + self._cluster.executor.submit(self.refresh_schema) + else: + self._cluster.executor.submit(self.refresh_schema, ks) + elif change_type == "UPDATED": + if not cf: + self._cluster.executor.submit(self.refresh_schema, ks) + else: + self._cluster.executor.submit(self.refresh_schema, ks, cf) def wait_for_schema_agreement(self): # TODO is returning True/False the best option for this? Potentially raise Exception?