001 /* 002 * JBoss, Home of Professional Open Source. 003 * Copyright 2008, Red Hat Middleware LLC, and individual contributors 004 * as indicated by the @author tags. See the copyright.txt file in the 005 * distribution for a full listing of individual contributors. 006 * 007 * This is free software; you can redistribute it and/or modify it 008 * under the terms of the GNU Lesser General Public License as 009 * published by the Free Software Foundation; either version 2.1 of 010 * the License, or (at your option) any later version. 011 * 012 * This software is distributed in the hope that it will be useful, 013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 015 * Lesser General Public License for more details. 016 * 017 * You should have received a copy of the GNU Lesser General Public 018 * License along with this software; if not, write to the Free 019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 021 */ 022 package org.jboss.dna.repository.observation; 023 024 import java.util.ArrayList; 025 import java.util.Collections; 026 import java.util.HashSet; 027 import java.util.Iterator; 028 import java.util.List; 029 import java.util.Set; 030 import java.util.concurrent.CopyOnWriteArrayList; 031 import java.util.concurrent.TimeUnit; 032 import java.util.concurrent.atomic.AtomicLong; 033 import java.util.concurrent.locks.ReadWriteLock; 034 import java.util.concurrent.locks.ReentrantReadWriteLock; 035 import javax.jcr.RepositoryException; 036 import javax.jcr.Session; 037 import javax.jcr.UnsupportedRepositoryOperationException; 038 import javax.jcr.observation.Event; 039 import javax.jcr.observation.EventIterator; 040 import javax.jcr.observation.EventListener; 041 import javax.jcr.observation.ObservationManager; 042 import net.jcip.annotations.GuardedBy; 043 import net.jcip.annotations.ThreadSafe; 044 import org.jboss.dna.common.util.CheckArg; 045 import org.jboss.dna.common.util.Logger; 046 import org.jboss.dna.repository.RepositoryI18n; 047 import org.jboss.dna.repository.services.AbstractServiceAdministrator; 048 import org.jboss.dna.repository.services.AdministeredService; 049 import org.jboss.dna.repository.services.ServiceAdministrator; 050 import org.jboss.dna.repository.util.SessionFactory; 051 052 /** 053 * @author Randall Hauch 054 */ 055 public class ObservationService implements AdministeredService { 056 057 /** 058 * Interface to which problems with particular events are logged. 059 * 060 * @author Randall Hauch 061 */ 062 public static interface ProblemLog { 063 064 void error( String repositoryWorkspaceName, 065 Throwable t ); 066 } 067 068 /** 069 * Problem log implementation that records problems in the log. 070 * 071 * @author Randall Hauch 072 */ 073 public class DefaultProblemLog implements ProblemLog { 074 075 /** 076 * {@inheritDoc} 077 */ 078 public void error( String repositoryWorkspaceName, 079 Throwable t ) { 080 getLogger().error(t, RepositoryI18n.errorProcessingEvents, repositoryWorkspaceName); 081 } 082 } 083 084 protected static class NoOpProblemLog implements ProblemLog { 085 086 /** 087 * {@inheritDoc} 088 */ 089 public void error( String repositoryWorkspaceName, 090 Throwable t ) { 091 } 092 } 093 094 public static final ProblemLog NO_OP_PROBLEM_LOG = new NoOpProblemLog(); 095 096 /** 097 * The administrative component for this service. 098 * 099 * @author Randall Hauch 100 */ 101 protected class Administrator extends AbstractServiceAdministrator { 102 103 protected Administrator() { 104 super(RepositoryI18n.observationServiceName, State.STARTED); 105 } 106 107 /** 108 * {@inheritDoc} 109 */ 110 @Override 111 protected void doShutdown( State fromState ) { 112 super.doShutdown(fromState); 113 shutdownService(); 114 } 115 116 /** 117 * {@inheritDoc} 118 */ 119 public boolean awaitTermination( long timeout, 120 TimeUnit unit ) { 121 return true; 122 } 123 124 /** 125 * {@inheritDoc} 126 */ 127 @Override 128 protected boolean doCheckIsTerminated() { 129 return true; 130 } 131 132 } 133 134 private Logger logger = Logger.getLogger(this.getClass()); 135 private ProblemLog problemLog = new DefaultProblemLog(); 136 private final Statistics statistics = new Statistics(); 137 private final SessionFactory sessionFactory; 138 private final CopyOnWriteArrayList<WorkspaceListener> workspaceListeners = new CopyOnWriteArrayList<WorkspaceListener>(); 139 private final CopyOnWriteArrayList<EventListener> eventListeners = new CopyOnWriteArrayList<EventListener>(); 140 private final CopyOnWriteArrayList<NodeChangeListener> nodeChangeListeners = new CopyOnWriteArrayList<NodeChangeListener>(); 141 private final Administrator administrator = new Administrator(); 142 143 public ObservationService( SessionFactory sessionFactory ) { 144 CheckArg.isNotNull(sessionFactory, "session factory"); 145 this.sessionFactory = sessionFactory; 146 } 147 148 /** 149 * {@inheritDoc} 150 */ 151 public ServiceAdministrator getAdministrator() { 152 return this.administrator; 153 } 154 155 /** 156 * @return sessionFactory 157 */ 158 public SessionFactory getSessionFactory() { 159 return this.sessionFactory; 160 } 161 162 /** 163 * Get the statistics for this system. 164 * 165 * @return the statistics, which are updated as the system is used 166 */ 167 public Statistics getStatistics() { 168 return this.statistics; 169 } 170 171 /** 172 * Get the logger for this system 173 * 174 * @return the logger 175 */ 176 public Logger getLogger() { 177 return this.logger; 178 } 179 180 /** 181 * Set the logger for this system. 182 * 183 * @param logger the logger, or null if the standard logging should be used 184 */ 185 public void setLogger( Logger logger ) { 186 this.logger = logger != null ? logger : Logger.getLogger(this.getClass()); 187 } 188 189 /** 190 * @return problemLog 191 */ 192 public ProblemLog getProblemLog() { 193 return this.problemLog; 194 } 195 196 /** 197 * Set the problem log that will be notified of problems handling events. By default, such problems are sent to the log. 198 * 199 * @param problemLog the new problem log implementation; if null, then the default problem log is used 200 */ 201 public void setProblemLog( ProblemLog problemLog ) { 202 this.problemLog = problemLog != null ? problemLog : new DefaultProblemLog(); 203 } 204 205 public boolean addListener( EventListener listener ) { 206 if (listener == null) return false; 207 return this.eventListeners.addIfAbsent(listener); 208 } 209 210 public boolean removeListener( EventListener listener ) { 211 if (listener == null) return false; 212 return this.eventListeners.remove(listener); 213 } 214 215 public boolean addListener( NodeChangeListener listener ) { 216 return this.nodeChangeListeners.addIfAbsent(listener); 217 } 218 219 public boolean removeListener( NodeChangeListener listener ) { 220 if (listener == null) return false; 221 return this.nodeChangeListeners.remove(listener); 222 } 223 224 protected void shutdownService() { 225 // Unregister all listeners ... 226 for (WorkspaceListener listener : this.workspaceListeners) { 227 try { 228 listener.unregister(); 229 } catch (RepositoryException e) { 230 this.logger.error(e, RepositoryI18n.errorUnregisteringWorkspaceListenerWhileShuttingDownObservationService); 231 } 232 } 233 } 234 235 /** 236 * Monitor the supplied workspace for events of the given type on any node at or under the supplied path. 237 * <p> 238 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the 239 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given 240 * repository and workspace name. 241 * </p> 242 * <p> 243 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer 244 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the 245 * workspace and garbage collected. If this service is {@link ServiceAdministrator#shutdown() shutdown} while there are still 246 * active listeners, those listeners will disconnect themselves from this service and the workspace with which they're 247 * registered when they attempt to forward the next events. 248 * </p> 249 * <p> 250 * The set of events that are monitored can be filtered by specifying restrictions based on characteristics of the node 251 * associated with the event. In the case of event types {@link Event#NODE_ADDED NODE_ADDED} and 252 * {@link Event#NODE_REMOVED NODE_REMOVED}, the node associated with an event is the node at (or formerly at) the path 253 * returned by {@link Event#getPath() Event.getPath()}. In the case of event types 254 * {@link Event#PROPERTY_ADDED PROPERTY_ADDED}, {@link Event#PROPERTY_REMOVED PROPERTY_REMOVED} and 255 * {@link Event#PROPERTY_CHANGED PROPERTY_CHANGED}, the node associated with an event is the parent node of the property at 256 * (or formerly at) the path returned by <code>Event.getPath</code>: 257 * <ul> 258 * <li> <code>absolutePath</code>, <code>isDeep</code>: Only events whose associated node is at 259 * <code>absolutePath</code> (or within its subtree, if <code>isDeep</code> is <code>true</code>) will be received. It 260 * is permissible to register a listener for a path where no node currently exists. </li> 261 * <li> <code>uuids</code>: Only events whose associated node has one of the UUIDs in this list will be received. If his 262 * parameter is <code>null</code> then no UUID-related restriction is placed on events received. </li> 263 * <li> <code>nodeTypeNames</code>: Only events whose associated node has one of the node types (or a subtype of one of the 264 * node types) in this list will be received. If this parameter is <code>null</code> then no node type-related restriction 265 * is placed on events received. </li> 266 * </ul> 267 * The restrictions are "ANDed" together. In other words, for a particular node to be "listened to" it must meet all the 268 * restrictions. 269 * </p> 270 * <p> 271 * Additionally, if <code>noLocal</code> is <code>true</code>, then events generated by the session through which the 272 * listener was registered are ignored. Otherwise, they are not ignored. 273 * </p> 274 * <p> 275 * The filters of an already-registered {@link WorkspaceListener} can be changed at runtime by changing the attributes and 276 * {@link WorkspaceListener#reregister() registering}. 277 * </p> 278 * 279 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and 280 * workspace that is to be monitored 281 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes 282 * in the workspace are to be monitored 283 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored 284 * @param isDeep true if events below the node given by the <code>absolutePath</code> or by the <code>uuids</code> are to 285 * be processed, or false if only the events at the node 286 * @param uuids array of UUIDs of nodes that are to be monitored; may be null or empty if the UUIDs are not known 287 * @param nodeTypeNames array of node type names that are to be monitored; may be null or empty if the monitoring has no node 288 * type restrictions 289 * @param noLocal true if the events originating in the supplied workspace are to be ignored, or false if they are also to be 290 * processed. 291 * @return the listener that was created and registered to perform the monitoring 292 * @throws RepositoryException if there is a problem registering the listener 293 */ 294 public WorkspaceListener monitor( String repositoryWorkspaceName, 295 String absolutePath, 296 int eventTypes, 297 boolean isDeep, 298 String[] uuids, 299 String[] nodeTypeNames, 300 boolean noLocal ) throws RepositoryException { 301 WorkspaceListener listener = new WorkspaceListener(repositoryWorkspaceName, eventTypes, absolutePath, isDeep, uuids, 302 nodeTypeNames, noLocal); 303 listener.register(); 304 this.workspaceListeners.add(listener); 305 return listener; 306 } 307 308 /** 309 * Monitor the supplied workspace for {@link WorkspaceListener#DEFAULT_EVENT_TYPES default event types} on any node at or 310 * under the supplied path. 311 * <p> 312 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the 313 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given 314 * repository and workspace name. 315 * </p> 316 * <p> 317 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer 318 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the 319 * workspace and garbage collected. 320 * </p> 321 * 322 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and 323 * workspace that is to be monitored 324 * @param absolutePath the absolute path of the node at or below which changes are to be monitored; may be null if all nodes 325 * in the workspace are to be monitored 326 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no 327 * node type restrictions 328 * @return the listener that was created and registered to perform the monitoring 329 * @throws RepositoryException if there is a problem registering the listener 330 */ 331 public WorkspaceListener monitor( String repositoryWorkspaceName, 332 String absolutePath, 333 String... nodeTypeNames ) throws RepositoryException { 334 return monitor(repositoryWorkspaceName, 335 absolutePath, 336 WorkspaceListener.DEFAULT_EVENT_TYPES, 337 WorkspaceListener.DEFAULT_IS_DEEP, 338 null, 339 nodeTypeNames, 340 WorkspaceListener.DEFAULT_NO_LOCAL); 341 } 342 343 /** 344 * Monitor the supplied workspace for the supplied event types on any node in the workspace. 345 * <p> 346 * Monitoring is accomplished by registering a listener on the workspace, so this monitoring only has access to the 347 * information that visible to the session created by the {@link #getSessionFactory() session factory} for the given 348 * repository and workspace name. 349 * </p> 350 * <p> 351 * The listener returned from this method is not managed by this SequencingService instance. If the listener is no longer 352 * needed, it simply must be {@link ObservationManager#removeEventListener(EventListener) removed} as a listener of the 353 * workspace and garbage collected. 354 * </p> 355 * 356 * @param repositoryWorkspaceName the name to be used with the session factory to obtain a session to the repository and 357 * workspace that is to be monitored 358 * @param eventTypes the bitmask of the {@link Event} types that are to be monitored 359 * @param nodeTypeNames the names of the node types that are to be monitored; may be null or empty if the monitoring has no 360 * node type restrictions 361 * @return the listener that was created and registered to perform the monitoring 362 * @throws RepositoryException if there is a problem registering the listener 363 */ 364 public WorkspaceListener monitor( String repositoryWorkspaceName, 365 int eventTypes, 366 String... nodeTypeNames ) throws RepositoryException { 367 return monitor(repositoryWorkspaceName, 368 WorkspaceListener.DEFAULT_ABSOLUTE_PATH, 369 eventTypes, 370 WorkspaceListener.DEFAULT_IS_DEEP, 371 null, 372 nodeTypeNames, 373 WorkspaceListener.DEFAULT_NO_LOCAL); 374 } 375 376 protected void unregisterListener( WorkspaceListener listener ) { 377 if (listener != null) this.workspaceListeners.remove(listener); 378 } 379 380 /** 381 * From section 2.8.8 of the JSR-170 specification: 382 * <p> 383 * On each persistent change, those listeners that are entitled to receive one or more events will have their onEvent method 384 * called and be passed an EventIterator. The EventIterator will contain the event bundle reflecting the persistent changes 385 * made but excluding those to which that particular listener is not entitled, according to the listeners access permissions 386 * and filters. 387 * </p> 388 * 389 * @param eventIterator 390 * @param listener 391 */ 392 protected void processEvents( EventIterator eventIterator, 393 WorkspaceListener listener ) { 394 if (eventIterator == null) return; 395 List<Event> events = new ArrayList<Event>(); 396 // Copy the events ... 397 while (eventIterator.hasNext()) { 398 events.add((Event)eventIterator.next()); 399 } 400 if (!getAdministrator().isStarted()) { 401 this.statistics.recordIgnoredEventSet(events.size()); 402 return; 403 } 404 405 // Notify the event listeners ... 406 boolean notifiedSomebody = false; 407 List<EventListener> eventListeners = this.eventListeners; // use one consistent snapshot 408 if (!eventListeners.isEmpty()) { 409 DelegatingEventIterator eventIter = new DelegatingEventIterator(events.iterator(), events.size()); 410 for (EventListener eventListener : eventListeners) { 411 eventListener.onEvent(eventIter); 412 } 413 notifiedSomebody = true; 414 } 415 416 // Now create the node change events ... 417 List<NodeChangeListener> nodeChangeListeners = this.nodeChangeListeners; // use one consistent snapshot 418 if (!nodeChangeListeners.isEmpty()) { 419 final String repositoryWorkspaceName = listener.getRepositoryWorkspaceName(); 420 try { 421 NodeChanges nodeChanges = NodeChanges.create(repositoryWorkspaceName, events); 422 423 // And notify the node change listeners ... 424 int nodeChangeCount = nodeChanges.size(); 425 this.statistics.recordNodesChanged(nodeChangeCount); 426 for (NodeChangeListener nodeChangeListener : nodeChangeListeners) { 427 nodeChangeListener.onNodeChanges(nodeChanges); 428 } 429 } catch (Throwable t) { 430 getProblemLog().error(repositoryWorkspaceName, t); 431 } 432 notifiedSomebody = true; 433 } 434 435 if (notifiedSomebody) { 436 this.statistics.recordEventSet(events.size()); 437 } else { 438 this.statistics.recordIgnoredEventSet(events.size()); 439 } 440 } 441 442 protected class DelegatingEventIterator implements EventIterator { 443 444 private final Iterator<Event> events; 445 private final int size; 446 private int position = 0; 447 448 protected DelegatingEventIterator( Iterator<Event> events, 449 int size ) { 450 this.events = events; 451 this.size = size; 452 } 453 454 /** 455 * {@inheritDoc} 456 */ 457 public Event nextEvent() { 458 ++position; 459 return events.next(); 460 } 461 462 /** 463 * {@inheritDoc} 464 */ 465 public long getPosition() { 466 return position; 467 } 468 469 /** 470 * {@inheritDoc} 471 */ 472 public long getSize() { 473 return size; 474 } 475 476 /** 477 * {@inheritDoc} 478 */ 479 public void skip( long skipNum ) { 480 for (int i = 0; i != skipNum; ++i) { 481 next(); 482 } 483 } 484 485 /** 486 * {@inheritDoc} 487 */ 488 public boolean hasNext() { 489 return events.hasNext(); 490 } 491 492 /** 493 * {@inheritDoc} 494 */ 495 public Object next() { 496 return events.next(); 497 } 498 499 /** 500 * {@inheritDoc} 501 */ 502 public void remove() { 503 // does nothing 504 } 505 506 } 507 508 /** 509 * Implementation of the {@link EventListener JCR EventListener} interface, returned by the sequencing system. 510 * 511 * @author Randall Hauch 512 */ 513 @ThreadSafe 514 public class WorkspaceListener implements EventListener { 515 516 public static final boolean DEFAULT_IS_DEEP = true; 517 public static final boolean DEFAULT_NO_LOCAL = false; 518 public static final int DEFAULT_EVENT_TYPES = Event.NODE_ADDED | /* Event.NODE_REMOVED| */Event.PROPERTY_ADDED 519 | Event.PROPERTY_CHANGED /* |Event.PROPERTY_REMOVED */; 520 public static final String DEFAULT_ABSOLUTE_PATH = "/"; 521 522 private final String repositoryWorkspaceName; 523 private final Set<String> uuids; 524 private final Set<String> nodeTypeNames; 525 private final int eventTypes; 526 private final String absolutePath; 527 private final boolean deep; 528 private final boolean noLocal; 529 @GuardedBy( "this" ) 530 private transient Session session; 531 532 protected WorkspaceListener( String repositoryWorkspaceName, 533 int eventTypes, 534 String absPath, 535 boolean isDeep, 536 String[] uuids, 537 String[] nodeTypeNames, 538 boolean noLocal ) { 539 this.repositoryWorkspaceName = repositoryWorkspaceName; 540 this.eventTypes = eventTypes; 541 this.deep = isDeep; 542 this.noLocal = noLocal; 543 this.absolutePath = absPath != null && absPath.trim().length() != 0 ? absPath.trim() : null; 544 // Set the UUIDs ... 545 Set<String> newUuids = new HashSet<String>(); 546 if (uuids != null) { 547 for (String uuid : uuids) { 548 if (uuid != null && uuid.trim().length() != 0) newUuids.add(uuid.trim()); 549 } 550 } 551 this.uuids = Collections.unmodifiableSet(newUuids); 552 // Set the node type names 553 Set<String> newNodeTypeNames = new HashSet<String>(); 554 if (nodeTypeNames != null) { 555 for (String nodeTypeName : nodeTypeNames) { 556 if (nodeTypeName != null && nodeTypeName.trim().length() != 0) newNodeTypeNames.add(nodeTypeName.trim()); 557 } 558 } 559 this.nodeTypeNames = Collections.unmodifiableSet(newNodeTypeNames); 560 } 561 562 /** 563 * @return repositoryWorkspaceName 564 */ 565 public String getRepositoryWorkspaceName() { 566 return this.repositoryWorkspaceName; 567 } 568 569 /** 570 * @return eventTypes 571 */ 572 public int getEventTypes() { 573 return this.eventTypes; 574 } 575 576 /** 577 * @return absolutePath 578 */ 579 public String getAbsolutePath() { 580 return this.absolutePath; 581 } 582 583 /** 584 * @return deep 585 */ 586 public boolean isDeep() { 587 return this.deep; 588 } 589 590 /** 591 * @return noLocal 592 */ 593 public boolean isNoLocal() { 594 return this.noLocal; 595 } 596 597 /** 598 * @return uuids 599 */ 600 public Set<String> getUuids() { 601 return this.uuids; 602 } 603 604 /** 605 * @return nodeTypeNames 606 */ 607 public Set<String> getNodeTypeNames() { 608 return this.nodeTypeNames; 609 } 610 611 public synchronized boolean isRegistered() { 612 if (this.session != null && getAdministrator().isShutdown()) { 613 // This sequencing system has been shutdown, so unregister this listener 614 try { 615 unregister(); 616 } catch (RepositoryException re) { 617 String msg = "Error unregistering workspace listener after sequencing system has been shutdow."; 618 Logger.getLogger(this.getClass()).debug(re, msg); 619 } 620 } 621 return this.session != null; 622 } 623 624 public synchronized WorkspaceListener register() throws UnsupportedRepositoryOperationException, RepositoryException { 625 if (this.session != null) return this; 626 this.session = ObservationService.this.getSessionFactory().createSession(this.repositoryWorkspaceName); 627 String[] uuids = this.uuids.isEmpty() ? null : this.uuids.toArray(new String[this.uuids.size()]); 628 String[] nodeTypeNames = this.nodeTypeNames.isEmpty() ? null : this.nodeTypeNames.toArray(new String[this.nodeTypeNames.size()]); 629 this.session.getWorkspace().getObservationManager().addEventListener(this, 630 eventTypes, 631 absolutePath, 632 deep, 633 uuids, 634 nodeTypeNames, 635 noLocal); 636 return this; 637 } 638 639 public synchronized WorkspaceListener unregister() throws UnsupportedRepositoryOperationException, RepositoryException { 640 if (this.session == null) return this; 641 try { 642 if (this.session.isLive()) { 643 this.session.getWorkspace().getObservationManager().removeEventListener(this); 644 this.session.logout(); 645 } 646 } finally { 647 this.session = null; 648 unregisterListener(this); 649 } 650 return this; 651 } 652 653 public synchronized WorkspaceListener reregister() throws UnsupportedRepositoryOperationException, RepositoryException { 654 unregister(); 655 register(); 656 return this; 657 } 658 659 /** 660 * {@inheritDoc} 661 */ 662 public void onEvent( EventIterator events ) { 663 if (events != null) { 664 if (getAdministrator().isShutdown()) { 665 // This sequencing system has been shutdown, so unregister this listener 666 try { 667 unregister(); 668 } catch (RepositoryException re) { 669 String msg = "Error unregistering workspace listener after sequencing system has been shutdow."; 670 Logger.getLogger(this.getClass()).debug(re, msg); 671 } 672 } else { 673 ObservationService.this.processEvents(events, this); 674 } 675 } 676 } 677 } 678 679 /** 680 * The statistics for the system. Each sequencing system has an instance of this class that is updated. 681 * 682 * @author Randall Hauch 683 */ 684 @ThreadSafe 685 public class Statistics { 686 687 @GuardedBy( "lock" ) 688 private long numberOfEventsIgnored; 689 @GuardedBy( "lock" ) 690 private long numberOfEventsEnqueued; 691 @GuardedBy( "lock" ) 692 private long numberOfEventSetsIgnored; 693 @GuardedBy( "lock" ) 694 private long numberOfEventSetsEnqueued; 695 private final AtomicLong numberOfNodeChangesEnqueued = new AtomicLong(0); 696 private final ReadWriteLock lock = new ReentrantReadWriteLock(); 697 private final AtomicLong startTime; 698 699 protected Statistics() { 700 startTime = new AtomicLong(System.currentTimeMillis()); 701 } 702 703 public Statistics reset() { 704 try { 705 lock.writeLock().lock(); 706 this.startTime.set(System.currentTimeMillis()); 707 this.numberOfEventsIgnored = 0; 708 this.numberOfEventsEnqueued = 0; 709 this.numberOfEventSetsIgnored = 0; 710 this.numberOfEventSetsEnqueued = 0; 711 this.numberOfNodeChangesEnqueued.set(0); 712 } finally { 713 lock.writeLock().unlock(); 714 } 715 return this; 716 } 717 718 /** 719 * @return the system time when the statistics were started 720 */ 721 public long getStartTime() { 722 return this.startTime.get(); 723 } 724 725 /** 726 * @return the number of node changes that were processed 727 */ 728 public long getNumberOfNodeChangesEnqueued() { 729 return this.numberOfNodeChangesEnqueued.get(); 730 } 731 732 /** 733 * @return the number of events that were ignored because the system was not running 734 */ 735 public long getNumberOfEventsIgnored() { 736 try { 737 lock.readLock().lock(); 738 return this.numberOfEventsIgnored; 739 } finally { 740 lock.readLock().unlock(); 741 } 742 } 743 744 /** 745 * @return the number of events that were enqueued for processing 746 */ 747 public long getNumberOfEventsEnqueued() { 748 try { 749 lock.readLock().lock(); 750 return this.numberOfEventsEnqueued; 751 } finally { 752 lock.readLock().unlock(); 753 } 754 } 755 756 /** 757 * @return the number of event sets (transactions) that were enqueued for processing 758 */ 759 public long getNumberOfEventSetsEnqueued() { 760 try { 761 lock.readLock().lock(); 762 return this.numberOfEventSetsEnqueued; 763 } finally { 764 lock.readLock().unlock(); 765 } 766 } 767 768 /** 769 * @return the number of event sets (transactions) that were ignored because the system was not running 770 */ 771 public long getNumberOfEventSetsIgnored() { 772 try { 773 lock.readLock().lock(); 774 return this.numberOfEventSetsIgnored; 775 } finally { 776 lock.readLock().unlock(); 777 } 778 } 779 780 protected void recordNodesChanged( long changeCount ) { 781 this.numberOfNodeChangesEnqueued.addAndGet(changeCount); 782 } 783 784 protected void recordEventSet( long eventsInSet ) { 785 try { 786 lock.writeLock().lock(); 787 this.numberOfEventsEnqueued += eventsInSet; 788 ++this.numberOfEventSetsEnqueued; 789 } finally { 790 lock.writeLock().unlock(); 791 } 792 } 793 794 protected void recordIgnoredEventSet( long eventsInSet ) { 795 try { 796 lock.writeLock().lock(); 797 this.numberOfEventsIgnored += eventsInSet; 798 this.numberOfEventSetsIgnored += 1; 799 ++this.numberOfEventSetsEnqueued; 800 } finally { 801 lock.writeLock().unlock(); 802 } 803 } 804 } 805 }