001    /*
002     * JBoss, Home of Professional Open Source.
003     * Copyright 2008, Red Hat Middleware LLC, and individual contributors
004     * as indicated by the @author tags. See the copyright.txt file in the
005     * distribution for a full listing of individual contributors. 
006     *
007     * This is free software; you can redistribute it and/or modify it
008     * under the terms of the GNU Lesser General Public License as
009     * published by the Free Software Foundation; either version 2.1 of
010     * the License, or (at your option) any later version.
011     *
012     * This software is distributed in the hope that it will be useful,
013     * but WITHOUT ANY WARRANTY; without even the implied warranty of
014     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015     * Lesser General Public License for more details.
016     *
017     * You should have received a copy of the GNU Lesser General Public
018     * License along with this software; if not, write to the Free
019     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021     */
022    package org.jboss.dna.repository.observation;
023    
024    import java.util.ArrayList;
025    import java.util.Collections;
026    import java.util.HashSet;
027    import java.util.Iterator;
028    import java.util.List;
029    import java.util.Set;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.TimeUnit;
032    import java.util.concurrent.atomic.AtomicLong;
033    import java.util.concurrent.locks.ReadWriteLock;
034    import java.util.concurrent.locks.ReentrantReadWriteLock;
035    import javax.jcr.RepositoryException;
036    import javax.jcr.Session;
037    import javax.jcr.UnsupportedRepositoryOperationException;
038    import javax.jcr.observation.Event;
039    import javax.jcr.observation.EventIterator;
040    import javax.jcr.observation.EventListener;
041    import javax.jcr.observation.ObservationManager;
042    import net.jcip.annotations.GuardedBy;
043    import net.jcip.annotations.ThreadSafe;
044    import org.jboss.dna.common.util.CheckArg;
045    import org.jboss.dna.common.util.Logger;
046    import org.jboss.dna.repository.RepositoryI18n;
047    import org.jboss.dna.repository.services.AbstractServiceAdministrator;
048    import org.jboss.dna.repository.services.AdministeredService;
049    import org.jboss.dna.repository.services.ServiceAdministrator;
050    import org.jboss.dna.repository.util.SessionFactory;
051    
052    /**
053     * @author Randall Hauch
054     */
055    public class ObservationService implements AdministeredService {
056    
057        /**
058         * Interface to which problems with particular events are logged.
059         * 
060         * @author Randall Hauch
061         */
062        public static interface ProblemLog {
063    
064            void error( String repositoryWorkspaceName,
065                        Throwable t );
066        }
067    
068        /**
069         * Problem log implementation that records problems in the log.
070         * 
071         * @author Randall Hauch
072         */
073        public class DefaultProblemLog implements ProblemLog {
074    
075            /**
076             * {@inheritDoc}
077             */
078            public void error( String repositoryWorkspaceName,
079                               Throwable t ) {
080                getLogger().error(t, RepositoryI18n.errorProcessingEvents, repositoryWorkspaceName);
081            }
082        }
083    
084        protected static class NoOpProblemLog implements ProblemLog {
085    
086            /**
087             * {@inheritDoc}
088             */
089            public void error( String repositoryWorkspaceName,
090                               Throwable t ) {
091            }
092        }
093    
094        public static final ProblemLog NO_OP_PROBLEM_LOG = new NoOpProblemLog();
095    
096        /**
097         * The administrative component for this service.
098         * 
099         * @author Randall Hauch
100         */
101        protected class Administrator extends AbstractServiceAdministrator {
102    
103            protected Administrator() {
104                super(RepositoryI18n.observationServiceName, State.STARTED);
105            }
106    
107            /**
108             * {@inheritDoc}
109             */
110            @Override
111            protected void doShutdown( State fromState ) {
112                super.doShutdown(fromState);
113                shutdownService();
114            }
115    
116            /**
117             * {@inheritDoc}
118             */
119            public boolean awaitTermination( long timeout,
120                                             TimeUnit unit ) {
121                return true;
122            }
123    
124            /**
125             * {@inheritDoc}
126             */
127            @Override
128            protected boolean doCheckIsTerminated() {
129                return true;
130            }
131    
132        }
133    
134        private Logger logger = Logger.getLogger(this.getClass());
135        private ProblemLog problemLog = new DefaultProblemLog();
136        private final Statistics statistics = new Statistics();
137        private final SessionFactory sessionFactory;
138        private final CopyOnWriteArrayList<WorkspaceListener> workspaceListeners = new CopyOnWriteArrayList<WorkspaceListener>();
139        private final CopyOnWriteArrayList<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>();
140        private final CopyOnWriteArrayList<NodeChangeListener> nodeChangeListeners = new CopyOnWriteArrayList<NodeChangeListener>();
141        private final Administrator administrator = new Administrator();
142    
143        public ObservationService( SessionFactory sessionFactory ) {
144            CheckArg.isNotNull(sessionFactory, "session factory");
145            this.sessionFactory = sessionFactory;
146        }
147    
148        /**
149         * {@inheritDoc}
150         */
151        public ServiceAdministrator getAdministrator() {
152            return this.administrator;
153        }
154    
155        /**
156         * @return sessionFactory
157         */
158        public SessionFactory getSessionFactory() {
159            return this.sessionFactory;
160        }
161    
162        /**
163         * Get the statistics for this system.
164         * 
165         * @return the statistics, which are updated as the system is used
166         */
167        public Statistics getStatistics() {
168            return this.statistics;
169        }
170    
171        /**
172         * Get the logger for this system
173         * 
174         * @return the logger
175         */
176        public Logger getLogger() {
177            return this.logger;
178        }
179    
180        /**
181         * Set the logger for this system.
182         * 
183         * @param logger the logger, or null if the standard logging should be used
184         */
185        public void setLogger( Logger logger ) {
186            this.logger = logger != null ? logger : Logger.getLogger(this.getClass());
187        }
188    
189        /**
190         * @return problemLog
191         */
192        public ProblemLog getProblemLog() {
193            return this.problemLog;
194        }
195    
196        /**
197         * Set the problem log that will be notified of problems handling events. By default, such problems are sent to the log.
198         * 
199         * @param problemLog the new problem log implementation; if null, then the default problem log is used
200         */
201        public void setProblemLog( ProblemLog problemLog ) {
202            this.problemLog = problemLog != null ? problemLog : new DefaultProblemLog();
203        }
204    
205        public boolean addListener( EventListener listener ) {
206            if (listener == null) return false;
207            return this.eventListeners.addIfAbsent(listener);
208        }
209    
210        public boolean removeListener( EventListener listener ) {
211            if (listener == null) return false;
212            return this.eventListeners.remove(listener);
213        }
214    
215        public boolean addListener( NodeChangeListener listener ) {
216            return this.nodeChangeListeners.addIfAbsent(listener);
217        }
218    
219        public boolean removeListener( NodeChangeListener listener ) {
220            if (listener == null) return false;
221            return this.nodeChangeListeners.remove(listener);
222        }
223    
224        protected void shutdownService() {
225            // Unregister all listeners ...
226            for (WorkspaceListener listener : this.workspaceListeners) {
227                try {
228                    listener.unregister();
229                } catch (RepositoryException e) {
230                    this.logger.error(e, RepositoryI18n.errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService);
231                }
232            }
233        }
234    
235        /**
236         * Monitor the supplied workspace for events of the given type on any node at or under the supplied path.
237         * <p>
238         * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
239         * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
240         * repository and workspace name.
241         * </p>
242         * <p>
243         * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
244         * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
245         * workspace and garbage collected. If this service is {@link ServiceAdministrator#shutdown() shutdown} while there are still
246         * active listeners, those listeners will disconnect themselves from this service and the workspace with which they're
247         * registered when they attempt to forward the next events.
248         * </p>
249         * <p>
250         * The set of events that are monitored can be filtered by specifying restrictions based on characteristics of the node
251         * associated with the event. In the case of event types {@link Event#NODE_ADDED NODE_ADDED} and
252         * {@link Event#NODE_REMOVED NODE_REMOVED}, the node associated with an event is the node at (or formerly at) the path
253         * returned by {@link Event#getPath() Event.getPath()}. In the case of event types
254         * {@link Event#PROPERTY_ADDED PROPERTY_ADDED}, {@link Event#PROPERTY_REMOVED PROPERTY_REMOVED} and
255         * {@link Event#PROPERTY_CHANGED PROPERTY_CHANGED}, the node associated with an event is the parent node of the property at
256         * (or formerly at) the path returned by <code>Event.getPath</code>:
257         * <ul>
258         * <li> <code>absolutePath</code>, <code>isDeep</code>: Only events whose associated node is at
259         * <code>absolutePath</code> (or within its subtree, if <code>isDeep</code> is <code>true</code>) will be received. It
260         * is permissible to register a listener for a path where no node currently exists. </li>
261         * <li> <code>uuids</code>: Only events whose associated node has one of the UUIDs in this list will be received. If his
262         * parameter is <code>null</code> then no UUID-related restriction is placed on events received. </li>
263         * <li> <code>nodeTypeNames</code>: Only events whose associated node has one of the node types (or a subtype of one of the
264         * node types) in this list will be received. If this parameter is <code>null</code> then no node type-related restriction
265         * is placed on events received. </li>
266         * </ul>
267         * The restrictions are "ANDed" together. In other words, for a particular node to be "listened to" it must meet all the
268         * restrictions.
269         * </p>
270         * <p>
271         * Additionally, if <code>noLocal</code> is <code>true</code>, then events generated by the session through which the
272         * listener was registered are ignored. Otherwise, they are not ignored.
273         * </p>
274         * <p>
275         * The filters of an already-registered {@link WorkspaceListener} can be changed at runtime by changing the attributes and
276         * {@link WorkspaceListener#reregister() registering}.
277         * </p>
278         * 
279         * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
280         *        workspace that is to be monitored
281         * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
282         *        in the workspace are to be monitored
283         * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
284         * @param isDeep true if events below the node given by the <code>absolutePath</code> or by the <code>uuids</code> are to
285         *        be processed, or false if only the events at the node
286         * @param uuids array of UUIDs of nodes that are to be monitored; may be null or empty if the UUIDs are not known
287         * @param nodeTypeNames array of node type names that are to be monitored; may be null or empty if the monitoring has no node
288         *        type restrictions
289         * @param noLocal true if the events originating in the supplied workspace are to be ignored, or false if they are also to be
290         *        processed.
291         * @return the listener that was created and registered to perform the monitoring
292         * @throws RepositoryException if there is a problem registering the listener
293         */
294        public WorkspaceListener monitor( String repositoryWorkspaceName,
295                                          String absolutePath,
296                                          int eventTypes,
297                                          boolean isDeep,
298                                          String[] uuids,
299                                          String[] nodeTypeNames,
300                                          boolean noLocal ) throws RepositoryException {
301            WorkspaceListener listener = new WorkspaceListener(repositoryWorkspaceName, eventTypes, absolutePath, isDeep, uuids,
302                                                               nodeTypeNames, noLocal);
303            listener.register();
304            this.workspaceListeners.add(listener);
305            return listener;
306        }
307    
308        /**
309         * Monitor the supplied workspace for {@link WorkspaceListener#DEFAULT_EVENT_TYPES default event types} on any node at or
310         * under the supplied path.
311         * <p>
312         * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
313         * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
314         * repository and workspace name.
315         * </p>
316         * <p>
317         * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
318         * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
319         * workspace and garbage collected.
320         * </p>
321         * 
322         * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
323         *        workspace that is to be monitored
324         * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes
325         *        in the workspace are to be monitored
326         * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
327         *        node type restrictions
328         * @return the listener that was created and registered to perform the monitoring
329         * @throws RepositoryException if there is a problem registering the listener
330         */
331        public WorkspaceListener monitor( String repositoryWorkspaceName,
332                                          String absolutePath,
333                                          String... nodeTypeNames ) throws RepositoryException {
334            return monitor(repositoryWorkspaceName,
335                           absolutePath,
336                           WorkspaceListener.DEFAULT_EVENT_TYPES,
337                           WorkspaceListener.DEFAULT_IS_DEEP,
338                           null,
339                           nodeTypeNames,
340                           WorkspaceListener.DEFAULT_NO_LOCAL);
341        }
342    
343        /**
344         * Monitor the supplied workspace for the supplied event types on any node in the workspace.
345         * <p>
346         * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the
347         * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given
348         * repository and workspace name.
349         * </p>
350         * <p>
351         * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer
352         * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the
353         * workspace and garbage collected.
354         * </p>
355         * 
356         * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and
357         *        workspace that is to be monitored
358         * @param eventTypes the bitmask of the {@link Event} types that are to be monitored
359         * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no
360         *        node type restrictions
361         * @return the listener that was created and registered to perform the monitoring
362         * @throws RepositoryException if there is a problem registering the listener
363         */
364        public WorkspaceListener monitor( String repositoryWorkspaceName,
365                                          int eventTypes,
366                                          String... nodeTypeNames ) throws RepositoryException {
367            return monitor(repositoryWorkspaceName,
368                           WorkspaceListener.DEFAULT_ABSOLUTE_PATH,
369                           eventTypes,
370                           WorkspaceListener.DEFAULT_IS_DEEP,
371                           null,
372                           nodeTypeNames,
373                           WorkspaceListener.DEFAULT_NO_LOCAL);
374        }
375    
376        protected void unregisterListener( WorkspaceListener listener ) {
377            if (listener != null) this.workspaceListeners.remove(listener);
378        }
379    
380        /**
381         * From section 2.8.8 of the JSR-170 specification:
382         * <p>
383         * On each persistent change, those listeners that are entitled to receive one or more events will have their onEvent method
384         * called and be passed an EventIterator. The EventIterator will contain the event bundle reflecting the persistent changes
385         * made but excluding those to which that particular listener is not entitled, according to the listeners access permissions
386         * and filters.
387         * </p>
388         * 
389         * @param eventIterator
390         * @param listener
391         */
392        protected void processEvents( EventIterator eventIterator,
393                                      WorkspaceListener listener ) {
394            if (eventIterator == null) return;
395            List<Event> events = new ArrayList<Event>();
396            // Copy the events ...
397            while (eventIterator.hasNext()) {
398                events.add((Event)eventIterator.next());
399            }
400            if (!getAdministrator().isStarted()) {
401                this.statistics.recordIgnoredEventSet(events.size());
402                return;
403            }
404    
405            // Notify the event listeners ...
406            boolean notifiedSomebody = false;
407            List<EventListener> eventListeners = this.eventListeners; // use one consistent snapshot
408            if (!eventListeners.isEmpty()) {
409                DelegatingEventIterator eventIter = new DelegatingEventIterator(events.iterator(), events.size());
410                for (EventListener eventListener : eventListeners) {
411                    eventListener.onEvent(eventIter);
412                }
413                notifiedSomebody = true;
414            }
415    
416            // Now create the node change events ...
417            List<NodeChangeListener> nodeChangeListeners = this.nodeChangeListeners; // use one consistent snapshot
418            if (!nodeChangeListeners.isEmpty()) {
419                final String repositoryWorkspaceName = listener.getRepositoryWorkspaceName();
420                try {
421                    NodeChanges nodeChanges = NodeChanges.create(repositoryWorkspaceName, events);
422    
423                    // And notify the node change listeners ...
424                    int nodeChangeCount = nodeChanges.size();
425                    this.statistics.recordNodesChanged(nodeChangeCount);
426                    for (NodeChangeListener nodeChangeListener : nodeChangeListeners) {
427                        nodeChangeListener.onNodeChanges(nodeChanges);
428                    }
429                } catch (Throwable t) {
430                    getProblemLog().error(repositoryWorkspaceName, t);
431                }
432                notifiedSomebody = true;
433            }
434    
435            if (notifiedSomebody) {
436                this.statistics.recordEventSet(events.size());
437            } else {
438                this.statistics.recordIgnoredEventSet(events.size());
439            }
440        }
441    
442        protected class DelegatingEventIterator implements EventIterator {
443    
444            private final Iterator<Event> events;
445            private final int size;
446            private int position = 0;
447    
448            protected DelegatingEventIterator( Iterator<Event> events,
449                                               int size ) {
450                this.events = events;
451                this.size = size;
452            }
453    
454            /**
455             * {@inheritDoc}
456             */
457            public Event nextEvent() {
458                ++position;
459                return events.next();
460            }
461    
462            /**
463             * {@inheritDoc}
464             */
465            public long getPosition() {
466                return position;
467            }
468    
469            /**
470             * {@inheritDoc}
471             */
472            public long getSize() {
473                return size;
474            }
475    
476            /**
477             * {@inheritDoc}
478             */
479            public void skip( long skipNum ) {
480                for (int i = 0; i != skipNum; ++i) {
481                    next();
482                }
483            }
484    
485            /**
486             * {@inheritDoc}
487             */
488            public boolean hasNext() {
489                return events.hasNext();
490            }
491    
492            /**
493             * {@inheritDoc}
494             */
495            public Object next() {
496                return events.next();
497            }
498    
499            /**
500             * {@inheritDoc}
501             */
502            public void remove() {
503                // does nothing
504            }
505    
506        }
507    
508        /**
509         * Implementation of the {@link EventListener JCR EventListener} interface, returned by the sequencing system.
510         * 
511         * @author Randall Hauch
512         */
513        @ThreadSafe
514        public class WorkspaceListener implements EventListener {
515    
516            public static final boolean DEFAULT_IS_DEEP = true;
517            public static final boolean DEFAULT_NO_LOCAL = false;
518            public static final int DEFAULT_EVENT_TYPES = Event.NODE_ADDED | /* Event.NODE_REMOVED| */Event.PROPERTY_ADDED
519                                                          | Event.PROPERTY_CHANGED /* |Event.PROPERTY_REMOVED */;
520            public static final String DEFAULT_ABSOLUTE_PATH = "/";
521    
522            private final String repositoryWorkspaceName;
523            private final Set<String> uuids;
524            private final Set<String> nodeTypeNames;
525            private final int eventTypes;
526            private final String absolutePath;
527            private final boolean deep;
528            private final boolean noLocal;
529            @GuardedBy( "this" )
530            private transient Session session;
531    
532            protected WorkspaceListener( String repositoryWorkspaceName,
533                                         int eventTypes,
534                                         String absPath,
535                                         boolean isDeep,
536                                         String[] uuids,
537                                         String[] nodeTypeNames,
538                                         boolean noLocal ) {
539                this.repositoryWorkspaceName = repositoryWorkspaceName;
540                this.eventTypes = eventTypes;
541                this.deep = isDeep;
542                this.noLocal = noLocal;
543                this.absolutePath = absPath != null && absPath.trim().length() != 0 ? absPath.trim() : null;
544                // Set the UUIDs ...
545                Set<String> newUuids = new HashSet<String>();
546                if (uuids != null) {
547                    for (String uuid : uuids) {
548                        if (uuid != null && uuid.trim().length() != 0) newUuids.add(uuid.trim());
549                    }
550                }
551                this.uuids = Collections.unmodifiableSet(newUuids);
552                // Set the node type names
553                Set<String> newNodeTypeNames = new HashSet<String>();
554                if (nodeTypeNames != null) {
555                    for (String nodeTypeName : nodeTypeNames) {
556                        if (nodeTypeName != null && nodeTypeName.trim().length() != 0) newNodeTypeNames.add(nodeTypeName.trim());
557                    }
558                }
559                this.nodeTypeNames = Collections.unmodifiableSet(newNodeTypeNames);
560            }
561    
562            /**
563             * @return repositoryWorkspaceName
564             */
565            public String getRepositoryWorkspaceName() {
566                return this.repositoryWorkspaceName;
567            }
568    
569            /**
570             * @return eventTypes
571             */
572            public int getEventTypes() {
573                return this.eventTypes;
574            }
575    
576            /**
577             * @return absolutePath
578             */
579            public String getAbsolutePath() {
580                return this.absolutePath;
581            }
582    
583            /**
584             * @return deep
585             */
586            public boolean isDeep() {
587                return this.deep;
588            }
589    
590            /**
591             * @return noLocal
592             */
593            public boolean isNoLocal() {
594                return this.noLocal;
595            }
596    
597            /**
598             * @return uuids
599             */
600            public Set<String> getUuids() {
601                return this.uuids;
602            }
603    
604            /**
605             * @return nodeTypeNames
606             */
607            public Set<String> getNodeTypeNames() {
608                return this.nodeTypeNames;
609            }
610    
611            public synchronized boolean isRegistered() {
612                if (this.session != null && getAdministrator().isShutdown()) {
613                    // This sequencing system has been shutdown, so unregister this listener
614                    try {
615                        unregister();
616                    } catch (RepositoryException re) {
617                        String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
618                        Logger.getLogger(this.getClass()).debug(re, msg);
619                    }
620                }
621                return this.session != null;
622            }
623    
624            public synchronized WorkspaceListener register() throws UnsupportedRepositoryOperationException, RepositoryException {
625                if (this.session != null) return this;
626                this.session = ObservationService.this.getSessionFactory().createSession(this.repositoryWorkspaceName);
627                String[] uuids = this.uuids.isEmpty() ? null : this.uuids.toArray(new String[this.uuids.size()]);
628                String[] nodeTypeNames = this.nodeTypeNames.isEmpty() ? null : this.nodeTypeNames.toArray(new String[this.nodeTypeNames.size()]);
629                this.session.getWorkspace().getObservationManager().addEventListener(this,
630                                                                                     eventTypes,
631                                                                                     absolutePath,
632                                                                                     deep,
633                                                                                     uuids,
634                                                                                     nodeTypeNames,
635                                                                                     noLocal);
636                return this;
637            }
638    
639            public synchronized WorkspaceListener unregister() throws UnsupportedRepositoryOperationException, RepositoryException {
640                if (this.session == null) return this;
641                try {
642                    if (this.session.isLive()) {
643                        this.session.getWorkspace().getObservationManager().removeEventListener(this);
644                        this.session.logout();
645                    }
646                } finally {
647                    this.session = null;
648                    unregisterListener(this);
649                }
650                return this;
651            }
652    
653            public synchronized WorkspaceListener reregister() throws UnsupportedRepositoryOperationException, RepositoryException {
654                unregister();
655                register();
656                return this;
657            }
658    
659            /**
660             * {@inheritDoc}
661             */
662            public void onEvent( EventIterator events ) {
663                if (events != null) {
664                    if (getAdministrator().isShutdown()) {
665                        // This sequencing system has been shutdown, so unregister this listener
666                        try {
667                            unregister();
668                        } catch (RepositoryException re) {
669                            String msg = "Error unregistering workspace listener after sequencing system has been shutdow.";
670                            Logger.getLogger(this.getClass()).debug(re, msg);
671                        }
672                    } else {
673                        ObservationService.this.processEvents(events, this);
674                    }
675                }
676            }
677        }
678    
679        /**
680         * The statistics for the system. Each sequencing system has an instance of this class that is updated.
681         * 
682         * @author Randall Hauch
683         */
684        @ThreadSafe
685        public class Statistics {
686    
687            @GuardedBy( "lock" )
688            private long numberOfEventsIgnored;
689            @GuardedBy( "lock" )
690            private long numberOfEventsEnqueued;
691            @GuardedBy( "lock" )
692            private long numberOfEventSetsIgnored;
693            @GuardedBy( "lock" )
694            private long numberOfEventSetsEnqueued;
695            private final AtomicLong numberOfNodeChangesEnqueued = new AtomicLong(0);
696            private final ReadWriteLock lock = new ReentrantReadWriteLock();
697            private final AtomicLong startTime;
698    
699            protected Statistics() {
700                startTime = new AtomicLong(System.currentTimeMillis());
701            }
702    
703            public Statistics reset() {
704                try {
705                    lock.writeLock().lock();
706                    this.startTime.set(System.currentTimeMillis());
707                    this.numberOfEventsIgnored = 0;
708                    this.numberOfEventsEnqueued = 0;
709                    this.numberOfEventSetsIgnored = 0;
710                    this.numberOfEventSetsEnqueued = 0;
711                    this.numberOfNodeChangesEnqueued.set(0);
712                } finally {
713                    lock.writeLock().unlock();
714                }
715                return this;
716            }
717    
718            /**
719             * @return the system time when the statistics were started
720             */
721            public long getStartTime() {
722                return this.startTime.get();
723            }
724    
725            /**
726             * @return the number of node changes that were processed
727             */
728            public long getNumberOfNodeChangesEnqueued() {
729                return this.numberOfNodeChangesEnqueued.get();
730            }
731    
732            /**
733             * @return the number of events that were ignored because the system was not running
734             */
735            public long getNumberOfEventsIgnored() {
736                try {
737                    lock.readLock().lock();
738                    return this.numberOfEventsIgnored;
739                } finally {
740                    lock.readLock().unlock();
741                }
742            }
743    
744            /**
745             * @return the number of events that were enqueued for processing
746             */
747            public long getNumberOfEventsEnqueued() {
748                try {
749                    lock.readLock().lock();
750                    return this.numberOfEventsEnqueued;
751                } finally {
752                    lock.readLock().unlock();
753                }
754            }
755    
756            /**
757             * @return the number of event sets (transactions) that were enqueued for processing
758             */
759            public long getNumberOfEventSetsEnqueued() {
760                try {
761                    lock.readLock().lock();
762                    return this.numberOfEventSetsEnqueued;
763                } finally {
764                    lock.readLock().unlock();
765                }
766            }
767    
768            /**
769             * @return the number of event sets (transactions) that were ignored because the system was not running
770             */
771            public long getNumberOfEventSetsIgnored() {
772                try {
773                    lock.readLock().lock();
774                    return this.numberOfEventSetsIgnored;
775                } finally {
776                    lock.readLock().unlock();
777                }
778            }
779    
780            protected void recordNodesChanged( long changeCount ) {
781                this.numberOfNodeChangesEnqueued.addAndGet(changeCount);
782            }
783    
784            protected void recordEventSet( long eventsInSet ) {
785                try {
786                    lock.writeLock().lock();
787                    this.numberOfEventsEnqueued += eventsInSet;
788                    ++this.numberOfEventSetsEnqueued;
789                } finally {
790                    lock.writeLock().unlock();
791                }
792            }
793    
794            protected void recordIgnoredEventSet( long eventsInSet ) {
795                try {
796                    lock.writeLock().lock();
797                    this.numberOfEventsIgnored += eventsInSet;
798                    this.numberOfEventSetsIgnored += 1;
799                    ++this.numberOfEventSetsEnqueued;
800                } finally {
801                    lock.writeLock().unlock();
802                }
803            }
804        }
805    }