Merge "Define an extension for user scoped event listeners"
This commit is contained in:
		| @@ -381,10 +381,16 @@ listeners. | |||||||
|  |  | ||||||
| * `com.google.gerrit.common.EventListener`: | * `com.google.gerrit.common.EventListener`: | ||||||
| + | + | ||||||
| Allows to listen to events. These are the same | Allows to listen to events without user visibility restrictions. These | ||||||
| link:cmd-stream-events.html#events[events] that are also streamed by | are the same link:cmd-stream-events.html#events[events] that are also streamed by | ||||||
| the link:cmd-stream-events.html[gerrit stream-events] command. | the link:cmd-stream-events.html[gerrit stream-events] command. | ||||||
|  |  | ||||||
|  | * `com.google.gerrit.common.UserScopedEventListener`: | ||||||
|  | + | ||||||
|  | Allows to listen to events visible to the specified user. These are the | ||||||
|  | same link:cmd-stream-events.html#events[events] that are also streamed | ||||||
|  | by the link:cmd-stream-events.html[gerrit stream-events] command. | ||||||
|  |  | ||||||
| * `com.google.gerrit.extensions.events.LifecycleListener`: | * `com.google.gerrit.extensions.events.LifecycleListener`: | ||||||
| + | + | ||||||
| Plugin start and stop | Plugin start and stop | ||||||
|   | |||||||
| @@ -31,8 +31,7 @@ import com.google.gerrit.acceptance.AbstractDaemonTest; | |||||||
| import com.google.gerrit.acceptance.NoHttpd; | import com.google.gerrit.acceptance.NoHttpd; | ||||||
| import com.google.gerrit.acceptance.PushOneCommit; | import com.google.gerrit.acceptance.PushOneCommit; | ||||||
| import com.google.gerrit.acceptance.TestProjectInput; | import com.google.gerrit.acceptance.TestProjectInput; | ||||||
| import com.google.gerrit.common.EventListener; | import com.google.gerrit.common.UserScopedEventListener; | ||||||
| import com.google.gerrit.common.EventSource; |  | ||||||
| import com.google.gerrit.extensions.api.changes.SubmitInput; | import com.google.gerrit.extensions.api.changes.SubmitInput; | ||||||
| import com.google.gerrit.extensions.api.projects.BranchInfo; | import com.google.gerrit.extensions.api.projects.BranchInfo; | ||||||
| import com.google.gerrit.extensions.api.projects.ProjectInput; | import com.google.gerrit.extensions.api.projects.ProjectInput; | ||||||
| @@ -43,6 +42,8 @@ import com.google.gerrit.extensions.client.SubmitType; | |||||||
| import com.google.gerrit.extensions.common.ChangeInfo; | import com.google.gerrit.extensions.common.ChangeInfo; | ||||||
| import com.google.gerrit.extensions.common.ChangeMessageInfo; | import com.google.gerrit.extensions.common.ChangeMessageInfo; | ||||||
| import com.google.gerrit.extensions.common.LabelInfo; | import com.google.gerrit.extensions.common.LabelInfo; | ||||||
|  | import com.google.gerrit.extensions.registration.DynamicSet; | ||||||
|  | import com.google.gerrit.extensions.registration.RegistrationHandle; | ||||||
| import com.google.gerrit.extensions.restapi.ResourceConflictException; | import com.google.gerrit.extensions.restapi.ResourceConflictException; | ||||||
| import com.google.gerrit.extensions.restapi.RestApiException; | import com.google.gerrit.extensions.restapi.RestApiException; | ||||||
| import com.google.gerrit.extensions.webui.UiAction; | import com.google.gerrit.extensions.webui.UiAction; | ||||||
| @@ -106,9 +107,9 @@ public abstract class AbstractSubmit extends AbstractDaemonTest { | |||||||
|   private Submit submitHandler; |   private Submit submitHandler; | ||||||
|  |  | ||||||
|   @Inject |   @Inject | ||||||
|   EventSource source; |   DynamicSet<UserScopedEventListener> eventListeners; | ||||||
|  |  | ||||||
|   private EventListener eventListener; |   private RegistrationHandle eventListenerRegistration; | ||||||
|  |  | ||||||
|   private String systemTimeZone; |   private String systemTimeZone; | ||||||
|  |  | ||||||
| @@ -127,26 +128,30 @@ public abstract class AbstractSubmit extends AbstractDaemonTest { | |||||||
|   @Before |   @Before | ||||||
|   public void setUp() throws Exception { |   public void setUp() throws Exception { | ||||||
|     mergeResults = Maps.newHashMap(); |     mergeResults = Maps.newHashMap(); | ||||||
|     CurrentUser listenerUser = factory.create(user.id); |     eventListenerRegistration = | ||||||
|     eventListener = new EventListener() { |         eventListeners.add(new UserScopedEventListener() { | ||||||
|       @Override |           @Override | ||||||
|       public void onEvent(Event event) { |           public void onEvent(Event event) { | ||||||
|         if (!(event instanceof ChangeMergedEvent)) { |             if (!(event instanceof ChangeMergedEvent)) { | ||||||
|           return; |               return; | ||||||
|         } |             } | ||||||
|         ChangeMergedEvent e = (ChangeMergedEvent) event; |             ChangeMergedEvent e = (ChangeMergedEvent) event; | ||||||
|         ChangeAttribute c = e.change.get(); |             ChangeAttribute c = e.change.get(); | ||||||
|         PatchSetAttribute ps = e.patchSet.get(); |             PatchSetAttribute ps = e.patchSet.get(); | ||||||
|         log.debug("Merged {},{} as {}", ps.number, c.number, e.newRev); |             log.debug("Merged {},{} as {}", ps.number, c.number, e.newRev); | ||||||
|         mergeResults.put(e.change.get().number, e.newRev); |             mergeResults.put(e.change.get().number, e.newRev); | ||||||
|       } |           } | ||||||
|     }; |  | ||||||
|     source.addEventListener(eventListener, listenerUser); |           @Override | ||||||
|  |           public CurrentUser getUser() { | ||||||
|  |             return factory.create(user.id); | ||||||
|  |           } | ||||||
|  |         }); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   @After |   @After | ||||||
|   public void cleanup() { |   public void cleanup() { | ||||||
|     source.removeEventListener(eventListener); |     eventListenerRegistration.remove(); | ||||||
|     db.close(); |     db.close(); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -92,7 +92,6 @@ import java.util.Map; | |||||||
| import java.util.Map.Entry; | import java.util.Map.Entry; | ||||||
| import java.util.Set; | import java.util.Set; | ||||||
| import java.util.concurrent.Callable; | import java.util.concurrent.Callable; | ||||||
| import java.util.concurrent.ConcurrentHashMap; |  | ||||||
| import java.util.concurrent.ExecutorService; | import java.util.concurrent.ExecutorService; | ||||||
| import java.util.concurrent.Executors; | import java.util.concurrent.Executors; | ||||||
| import java.util.concurrent.FutureTask; | import java.util.concurrent.FutureTask; | ||||||
| @@ -102,7 +101,7 @@ import java.util.concurrent.TimeoutException; | |||||||
| /** Spawns local executables when a hook action occurs. */ | /** Spawns local executables when a hook action occurs. */ | ||||||
| @Singleton | @Singleton | ||||||
| public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | ||||||
|   EventSource, LifecycleListener, NewProjectCreatedListener { |     LifecycleListener, NewProjectCreatedListener { | ||||||
|     /** A logger for this class. */ |     /** A logger for this class. */ | ||||||
|     private static final Logger log = LoggerFactory.getLogger(ChangeHookRunner.class); |     private static final Logger log = LoggerFactory.getLogger(ChangeHookRunner.class); | ||||||
|  |  | ||||||
| @@ -112,22 +111,11 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|         bind(ChangeHookRunner.class); |         bind(ChangeHookRunner.class); | ||||||
|         bind(ChangeHooks.class).to(ChangeHookRunner.class); |         bind(ChangeHooks.class).to(ChangeHookRunner.class); | ||||||
|         bind(EventDispatcher.class).to(ChangeHookRunner.class); |         bind(EventDispatcher.class).to(ChangeHookRunner.class); | ||||||
|         bind(EventSource.class).to(ChangeHookRunner.class); |  | ||||||
|         DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ChangeHookRunner.class); |         DynamicSet.bind(binder(), NewProjectCreatedListener.class).to(ChangeHookRunner.class); | ||||||
|         listener().to(ChangeHookRunner.class); |         listener().to(ChangeHookRunner.class); | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     private static class EventListenerHolder { |  | ||||||
|       final EventListener listener; |  | ||||||
|       final CurrentUser user; |  | ||||||
|  |  | ||||||
|       EventListenerHolder(EventListener l, CurrentUser u) { |  | ||||||
|         listener = l; |  | ||||||
|         user = u; |  | ||||||
|       } |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /** Container class used to hold the return code and output of script hook execution */ |     /** Container class used to hold the return code and output of script hook execution */ | ||||||
|     public static class HookResult { |     public static class HookResult { | ||||||
|       private int exitValue = -1; |       private int exitValue = -1; | ||||||
| @@ -177,9 +165,8 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     /** Listeners to receive changes as they happen (limited by visibility |     /** Listeners to receive changes as they happen (limited by visibility | ||||||
|      *  of holder's user). */ |      *  of user). */ | ||||||
|     private final Map<EventListener, EventListenerHolder> listeners = |     private final DynamicSet<UserScopedEventListener> listeners; | ||||||
|         new ConcurrentHashMap<>(); |  | ||||||
|  |  | ||||||
|     /** Listeners to receive all changes as they happen. */ |     /** Listeners to receive all changes as they happen. */ | ||||||
|     private final DynamicSet<EventListener> unrestrictedListeners; |     private final DynamicSet<EventListener> unrestrictedListeners; | ||||||
| @@ -268,6 +255,7 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|       ProjectCache projectCache, |       ProjectCache projectCache, | ||||||
|       AccountCache accountCache, |       AccountCache accountCache, | ||||||
|       EventFactory eventFactory, |       EventFactory eventFactory, | ||||||
|  |       DynamicSet<UserScopedEventListener> listeners, | ||||||
|       DynamicSet<EventListener> unrestrictedListeners, |       DynamicSet<EventListener> unrestrictedListeners, | ||||||
|       ChangeNotes.Factory notesFactory) { |       ChangeNotes.Factory notesFactory) { | ||||||
|         this.anonymousCowardName = anonymousCowardName; |         this.anonymousCowardName = anonymousCowardName; | ||||||
| @@ -277,6 +265,7 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|         this.accountCache = accountCache; |         this.accountCache = accountCache; | ||||||
|         this.eventFactory = eventFactory; |         this.eventFactory = eventFactory; | ||||||
|         this.sitePaths = sitePath; |         this.sitePaths = sitePath; | ||||||
|  |         this.listeners = listeners; | ||||||
|         this.unrestrictedListeners = unrestrictedListeners; |         this.unrestrictedListeners = unrestrictedListeners; | ||||||
|         this.notesFactory = notesFactory; |         this.notesFactory = notesFactory; | ||||||
|  |  | ||||||
| @@ -319,16 +308,6 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|       return Files.exists(p) ? Optional.of(p) : Optional.<Path>absent(); |       return Files.exists(p) ? Optional.of(p) : Optional.<Path>absent(); | ||||||
|     } |     } | ||||||
|  |  | ||||||
|     @Override |  | ||||||
|     public void addEventListener(EventListener listener, CurrentUser user) { |  | ||||||
|       listeners.put(listener, new EventListenerHolder(listener, user)); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     @Override |  | ||||||
|     public void removeEventListener(EventListener listener) { |  | ||||||
|       listeners.remove(listener); |  | ||||||
|     } |  | ||||||
|  |  | ||||||
|     /** |     /** | ||||||
|      * Get the Repository for the given project name, or null on error. |      * Get the Repository for the given project name, or null on error. | ||||||
|      * |      * | ||||||
| @@ -923,9 +902,9 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|  |  | ||||||
|     private void fireEvent(Change change, ChangeEvent event, ReviewDb db) |     private void fireEvent(Change change, ChangeEvent event, ReviewDb db) | ||||||
|         throws OrmException { |         throws OrmException { | ||||||
|       for (EventListenerHolder holder : listeners.values()) { |       for (UserScopedEventListener listener : listeners) { | ||||||
|         if (isVisibleTo(change, holder.user, db)) { |         if (isVisibleTo(change, listener.getUser(), db)) { | ||||||
|           holder.listener.onEvent(event); |           listener.onEvent(event); | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|  |  | ||||||
| @@ -933,9 +912,9 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     private void fireEvent(Project.NameKey project, ProjectEvent event) { |     private void fireEvent(Project.NameKey project, ProjectEvent event) { | ||||||
|       for (EventListenerHolder holder : listeners.values()) { |       for (UserScopedEventListener listener : listeners) { | ||||||
|         if (isVisibleTo(project, holder.user)) { |         if (isVisibleTo(project, listener.getUser())) { | ||||||
|           holder.listener.onEvent(event); |           listener.onEvent(event); | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|  |  | ||||||
| @@ -943,9 +922,9 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     private void fireEvent(Branch.NameKey branchName, RefEvent event) { |     private void fireEvent(Branch.NameKey branchName, RefEvent event) { | ||||||
|       for (EventListenerHolder holder : listeners.values()) { |       for (UserScopedEventListener listener : listeners) { | ||||||
|         if (isVisibleTo(branchName, holder.user)) { |         if (isVisibleTo(branchName, listener.getUser())) { | ||||||
|           holder.listener.onEvent(event); |           listener.onEvent(event); | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|  |  | ||||||
| @@ -954,9 +933,9 @@ public class ChangeHookRunner implements ChangeHooks, EventDispatcher, | |||||||
|  |  | ||||||
|     private void fireEvent(com.google.gerrit.server.events.Event event, |     private void fireEvent(com.google.gerrit.server.events.Event event, | ||||||
|         ReviewDb db) throws OrmException { |         ReviewDb db) throws OrmException { | ||||||
|       for (EventListenerHolder holder : listeners.values()) { |       for (UserScopedEventListener listener : listeners) { | ||||||
|         if (isVisibleTo(event, holder.user, db)) { |         if (isVisibleTo(event, listener.getUser(), db)) { | ||||||
|           holder.listener.onEvent(event); |           listener.onEvent(event); | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|  |  | ||||||
|   | |||||||
| @@ -21,7 +21,6 @@ import com.google.gerrit.reviewdb.client.Change; | |||||||
| import com.google.gerrit.reviewdb.client.PatchSet; | import com.google.gerrit.reviewdb.client.PatchSet; | ||||||
| import com.google.gerrit.reviewdb.client.Project; | import com.google.gerrit.reviewdb.client.Project; | ||||||
| import com.google.gerrit.reviewdb.server.ReviewDb; | import com.google.gerrit.reviewdb.server.ReviewDb; | ||||||
| import com.google.gerrit.server.CurrentUser; |  | ||||||
| import com.google.gerrit.server.events.ChangeEvent; | import com.google.gerrit.server.events.ChangeEvent; | ||||||
| import com.google.gerrit.server.events.Event; | import com.google.gerrit.server.events.Event; | ||||||
| import com.google.gerrit.server.events.ProjectEvent; | import com.google.gerrit.server.events.ProjectEvent; | ||||||
| @@ -34,12 +33,7 @@ import java.util.Map; | |||||||
| import java.util.Set; | import java.util.Set; | ||||||
|  |  | ||||||
| /** Does not invoke hooks. */ | /** Does not invoke hooks. */ | ||||||
| public final class DisabledChangeHooks implements ChangeHooks, EventDispatcher, | public final class DisabledChangeHooks implements ChangeHooks, EventDispatcher { | ||||||
|     EventSource { |  | ||||||
|   @Override |  | ||||||
|   public void addEventListener(EventListener listener, CurrentUser user) { |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   @Override |   @Override | ||||||
|   public void doChangeAbandonedHook(Change change, Account account, |   public void doChangeAbandonedHook(Change change, Account account, | ||||||
|       PatchSet patchSet, String reason, ReviewDb db) { |       PatchSet patchSet, String reason, ReviewDb db) { | ||||||
| @@ -105,10 +99,6 @@ public final class DisabledChangeHooks implements ChangeHooks, EventDispatcher, | |||||||
|       Set<String> removed, Set<String> hashtags, ReviewDb db) { |       Set<String> removed, Set<String> hashtags, ReviewDb db) { | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   @Override |  | ||||||
|   public void removeEventListener(EventListener listener) { |  | ||||||
|   } |  | ||||||
|  |  | ||||||
|   @Override |   @Override | ||||||
|   public HookResult doRefUpdateHook(Project project, String refName, |   public HookResult doRefUpdateHook(Project project, String refName, | ||||||
|       Account uploader, ObjectId oldId, ObjectId newId) { |       Account uploader, ObjectId oldId, ObjectId newId) { | ||||||
|   | |||||||
| @@ -17,6 +17,10 @@ package com.google.gerrit.common; | |||||||
| import com.google.gerrit.extensions.annotations.ExtensionPoint; | import com.google.gerrit.extensions.annotations.ExtensionPoint; | ||||||
| import com.google.gerrit.server.events.Event; | import com.google.gerrit.server.events.Event; | ||||||
|  |  | ||||||
|  | /** | ||||||
|  |  * Allows to listen to events without user visibility restrictions. To listen to | ||||||
|  |  * events visible to a specific user, use {@link UserScopedEventListener}. | ||||||
|  |  */ | ||||||
| @ExtensionPoint | @ExtensionPoint | ||||||
| public interface EventListener { | public interface EventListener { | ||||||
|   void onEvent(Event event); |   void onEvent(Event event); | ||||||
|   | |||||||
| @@ -1,4 +1,4 @@ | |||||||
| // Copyright (C) 2014 The Android Open Source Project | // Copyright (C) 2016 The Android Open Source Project | ||||||
| // | // | ||||||
| // Licensed under the Apache License, Version 2.0 (the "License"); | // Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
| // you may not use this file except in compliance with the License. | // you may not use this file except in compliance with the License. | ||||||
| @@ -11,14 +11,16 @@ | |||||||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
| // See the License for the specific language governing permissions and | // See the License for the specific language governing permissions and | ||||||
| // limitations under the License. | // limitations under the License. | ||||||
| 
 |  | ||||||
| package com.google.gerrit.common; | package com.google.gerrit.common; | ||||||
| 
 | 
 | ||||||
|  | import com.google.gerrit.extensions.annotations.ExtensionPoint; | ||||||
| import com.google.gerrit.server.CurrentUser; | import com.google.gerrit.server.CurrentUser; | ||||||
| 
 | 
 | ||||||
| /** Distributes Events to ChangeListeners.  Register listeners here. */ | /** | ||||||
| public interface EventSource { |  * Allows to listen to events visible to the specified user. To listen to events | ||||||
|   void addEventListener(EventListener listener, CurrentUser user); |  * without user visibility restrictions, use {@link EventListener}. | ||||||
| 
 |  */ | ||||||
|   void removeEventListener(EventListener listener); | @ExtensionPoint | ||||||
|  | public interface UserScopedEventListener extends EventListener { | ||||||
|  |   CurrentUser getUser(); | ||||||
| } | } | ||||||
| @@ -19,6 +19,7 @@ import static com.google.inject.Scopes.SINGLETON; | |||||||
| import com.google.common.cache.Cache; | import com.google.common.cache.Cache; | ||||||
| import com.google.gerrit.audit.AuditModule; | import com.google.gerrit.audit.AuditModule; | ||||||
| import com.google.gerrit.common.EventListener; | import com.google.gerrit.common.EventListener; | ||||||
|  | import com.google.gerrit.common.UserScopedEventListener; | ||||||
| import com.google.gerrit.extensions.auth.oauth.OAuthLoginProvider; | import com.google.gerrit.extensions.auth.oauth.OAuthLoginProvider; | ||||||
| import com.google.gerrit.extensions.config.CapabilityDefinition; | import com.google.gerrit.extensions.config.CapabilityDefinition; | ||||||
| import com.google.gerrit.extensions.config.CloneCommand; | import com.google.gerrit.extensions.config.CloneCommand; | ||||||
| @@ -281,6 +282,7 @@ public class GerritGlobalModule extends FactoryModule { | |||||||
|         .to(ProjectConfigEntry.UpdateChecker.class); |         .to(ProjectConfigEntry.UpdateChecker.class); | ||||||
|     DynamicSet.setOf(binder(), EventListener.class); |     DynamicSet.setOf(binder(), EventListener.class); | ||||||
|     DynamicSet.bind(binder(), EventListener.class).to(EventsMetrics.class); |     DynamicSet.bind(binder(), EventListener.class).to(EventsMetrics.class); | ||||||
|  |     DynamicSet.setOf(binder(), UserScopedEventListener.class); | ||||||
|     DynamicSet.setOf(binder(), CommitValidationListener.class); |     DynamicSet.setOf(binder(), CommitValidationListener.class); | ||||||
|     DynamicSet.setOf(binder(), RefOperationValidationListener.class); |     DynamicSet.setOf(binder(), RefOperationValidationListener.class); | ||||||
|     DynamicSet.setOf(binder(), MergeValidationListener.class); |     DynamicSet.setOf(binder(), MergeValidationListener.class); | ||||||
|   | |||||||
| @@ -18,10 +18,12 @@ import static com.google.gerrit.sshd.CommandMetaData.Mode.MASTER; | |||||||
| import static java.nio.charset.StandardCharsets.UTF_8; | import static java.nio.charset.StandardCharsets.UTF_8; | ||||||
|  |  | ||||||
| import com.google.common.base.Supplier; | import com.google.common.base.Supplier; | ||||||
| import com.google.gerrit.common.EventListener; | import com.google.gerrit.common.UserScopedEventListener; | ||||||
| import com.google.gerrit.common.EventSource; |  | ||||||
| import com.google.gerrit.common.data.GlobalCapability; | import com.google.gerrit.common.data.GlobalCapability; | ||||||
| import com.google.gerrit.extensions.annotations.RequiresCapability; | import com.google.gerrit.extensions.annotations.RequiresCapability; | ||||||
|  | import com.google.gerrit.extensions.registration.DynamicSet; | ||||||
|  | import com.google.gerrit.extensions.registration.RegistrationHandle; | ||||||
|  | import com.google.gerrit.server.CurrentUser; | ||||||
| import com.google.gerrit.server.IdentifiedUser; | import com.google.gerrit.server.IdentifiedUser; | ||||||
| import com.google.gerrit.server.events.Event; | import com.google.gerrit.server.events.Event; | ||||||
| import com.google.gerrit.server.events.EventTypes; | import com.google.gerrit.server.events.EventTypes; | ||||||
| @@ -63,7 +65,7 @@ final class StreamEvents extends BaseCommand { | |||||||
|   private IdentifiedUser currentUser; |   private IdentifiedUser currentUser; | ||||||
|  |  | ||||||
|   @Inject |   @Inject | ||||||
|   private EventSource source; |   private DynamicSet<UserScopedEventListener> eventListeners; | ||||||
|  |  | ||||||
|   @Inject |   @Inject | ||||||
|   @StreamCommandExecutor |   @StreamCommandExecutor | ||||||
| @@ -75,6 +77,8 @@ final class StreamEvents extends BaseCommand { | |||||||
|  |  | ||||||
|   private Gson gson; |   private Gson gson; | ||||||
|  |  | ||||||
|  |   private RegistrationHandle eventListenerRegistration; | ||||||
|  |  | ||||||
|   /** Special event to notify clients they missed other events. */ |   /** Special event to notify clients they missed other events. */ | ||||||
|   private static final class DroppedOutputEvent extends Event { |   private static final class DroppedOutputEvent extends Event { | ||||||
|     private final static String TYPE = "dropped-output"; |     private final static String TYPE = "dropped-output"; | ||||||
| @@ -87,16 +91,6 @@ final class StreamEvents extends BaseCommand { | |||||||
|     EventTypes.register(DroppedOutputEvent.TYPE, DroppedOutputEvent.class); |     EventTypes.register(DroppedOutputEvent.TYPE, DroppedOutputEvent.class); | ||||||
|   } |   } | ||||||
|  |  | ||||||
|   private final EventListener listener = new EventListener() { |  | ||||||
|     @Override |  | ||||||
|     public void onEvent(final Event event) { |  | ||||||
|       if (subscribedToEvents.isEmpty() |  | ||||||
|           || subscribedToEvents.contains(event.getType())) { |  | ||||||
|         offer(event); |  | ||||||
|       } |  | ||||||
|     } |  | ||||||
|   }; |  | ||||||
|  |  | ||||||
|   private final CancelableRunnable writer = new CancelableRunnable() { |   private final CancelableRunnable writer = new CancelableRunnable() { | ||||||
|     @Override |     @Override | ||||||
|     public void run() { |     public void run() { | ||||||
| @@ -150,7 +144,21 @@ final class StreamEvents extends BaseCommand { | |||||||
|     } |     } | ||||||
|  |  | ||||||
|     stdout = toPrintWriter(out); |     stdout = toPrintWriter(out); | ||||||
|     source.addEventListener(listener, currentUser); |     eventListenerRegistration = | ||||||
|  |         eventListeners.add(new UserScopedEventListener() { | ||||||
|  |           @Override | ||||||
|  |           public void onEvent(final Event event) { | ||||||
|  |             if (subscribedToEvents.isEmpty() | ||||||
|  |                 || subscribedToEvents.contains(event.getType())) { | ||||||
|  |               offer(event); | ||||||
|  |             } | ||||||
|  |           } | ||||||
|  |  | ||||||
|  |           @Override | ||||||
|  |           public CurrentUser getUser() { | ||||||
|  |             return currentUser; | ||||||
|  |           } | ||||||
|  |         }); | ||||||
|  |  | ||||||
|     gson = new GsonBuilder() |     gson = new GsonBuilder() | ||||||
|         .registerTypeAdapter(Supplier.class, new SupplierSerializer()) |         .registerTypeAdapter(Supplier.class, new SupplierSerializer()) | ||||||
| @@ -159,7 +167,7 @@ final class StreamEvents extends BaseCommand { | |||||||
|  |  | ||||||
|   @Override |   @Override | ||||||
|   protected void onExit(final int rc) { |   protected void onExit(final int rc) { | ||||||
|     source.removeEventListener(listener); |     eventListenerRegistration.remove(); | ||||||
|  |  | ||||||
|     synchronized (taskLock) { |     synchronized (taskLock) { | ||||||
|       done = true; |       done = true; | ||||||
| @@ -170,7 +178,7 @@ final class StreamEvents extends BaseCommand { | |||||||
|  |  | ||||||
|   @Override |   @Override | ||||||
|   public void destroy() { |   public void destroy() { | ||||||
|     source.removeEventListener(listener); |     eventListenerRegistration.remove(); | ||||||
|  |  | ||||||
|     final boolean exit; |     final boolean exit; | ||||||
|     synchronized (taskLock) { |     synchronized (taskLock) { | ||||||
| @@ -218,7 +226,7 @@ final class StreamEvents extends BaseCommand { | |||||||
|         // destroy() above, or it closed the stream and is no longer |         // destroy() above, or it closed the stream and is no longer | ||||||
|         // accepting output. Either way terminate this instance. |         // accepting output. Either way terminate this instance. | ||||||
|         // |         // | ||||||
|         source.removeEventListener(listener); |         eventListenerRegistration.remove(); | ||||||
|         flush(); |         flush(); | ||||||
|         onExit(0); |         onExit(0); | ||||||
|         return; |         return; | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user
	 Saša Živkov
					Saša Živkov