allow multiple subscriptions to topic

This commit is contained in:
Tobias Oberstein
2011-12-18 13:12:57 +01:00
parent 8e69f4e919
commit efb1ba3924

View File

@@ -284,7 +284,13 @@ ab.Session = function (wsuri, onopen, onclose) {
console.groupEnd();
}
self._subscriptions[subid](uri2, val);
self._subscriptions[subid].forEach(function (callback) {
callback(uri2, val);
});
}
else {
// ignore unsolicited event!
}
}
else if (o[0] === ab._MESSAGE_TYPEID_WELCOME)
@@ -454,50 +460,79 @@ ab.Session.prototype.subscribe = function (topicuri, callback) {
var self = this;
// subscribe by sending WAMP message when topic not already subscribed
//
var rtopicuri = self._prefixes.resolveOrPass(topicuri);
if (rtopicuri in self._subscriptions) {
throw "already subscribed";
if (!(rtopicuri in self._subscriptions)) {
if (ab._debugpubsub) {
console.group("WAMP Subscribe");
console.info(self._wsuri + " [" + self._session_id + "]");
console.log(topicuri);
console.log(callback);
console.groupEnd();
}
var msg = [ab._MESSAGE_TYPEID_SUBSCRIBE, topicuri];
self._send(msg);
self._subscriptions[rtopicuri] = [];
}
self._subscriptions[rtopicuri] = callback;
if (ab._debugpubsub) {
console.group("WAMP Subscribe");
console.info(self._wsuri + " [" + self._session_id + "]");
console.log(topicuri);
console.log(callback);
console.groupEnd();
// add callback to event listeners list if not already in list
//
var i = self._subscriptions[rtopicuri].indexOf(callback);
if (i === -1) {
self._subscriptions[rtopicuri].push(callback);
}
else {
throw "callback " + callback + " already subscribed for topic " + rtopicuri;
}
var msg = [ab._MESSAGE_TYPEID_SUBSCRIBE, topicuri];
self._send(msg);
};
ab.Session.prototype.unsubscribe = function (topicuri) {
ab.Session.prototype.unsubscribe = function (topicuri, callback) {
var self = this;
var rtopicuri = self._prefixes.resolveOrPass(topicuri);
var callback;
if (!(rtopicuri in self._subscriptions)) {
throw "not subscribed";
} else {
callback = self._subscriptions[rtopicuri];
throw "not subscribed to topic " + rtopicuri;
}
else {
var removed;
if (callback !== undefined) {
var idx = self._subscriptions[rtopicuri].indexOf(callback);
if (idx !== -1) {
removed = callback;
self._subscriptions[rtopicuri].splice(idx, 1);
}
else {
throw "no callback " + callback + " subscribed on topic " + rtopicuri;
}
}
else {
removed = self._subscriptions[rtopicuri].slice();
self._subscriptions[rtopicuri] = [];
}
delete self._subscriptions[rtopicuri];
if (self._subscriptions[rtopicuri].length === 0) {
if (ab._debugpubsub) {
console.group("WAMP Unsubscribe");
console.info(self._wsuri + " [" + self._session_id + "]");
console.log(topicuri);
console.log(callback);
console.groupEnd();
delete self._subscriptions[rtopicuri];
if (ab._debugpubsub) {
console.group("WAMP Unsubscribe");
console.info(self._wsuri + " [" + self._session_id + "]");
console.log(topicuri);
console.log(removed);
console.groupEnd();
}
var msg = [ab._MESSAGE_TYPEID_UNSUBSCRIBE, topicuri];
self._send(msg);
}
}
var msg = [ab._MESSAGE_TYPEID_UNSUBSCRIBE, topicuri];
self._send(msg);
};