001 /* 002 * JBoss DNA (http://www.jboss.org/dna) 003 * See the COPYRIGHT.txt file distributed with this work for information 004 * regarding copyright ownership. Some portions may be licensed 005 * to Red Hat, Inc. under one or more contributor license agreements. 006 * See the AUTHORS.txt file in the distribution for a full listing of 007 * individual contributors. 008 * 009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA 010 * is licensed to you under the terms of the GNU Lesser General Public License as 011 * published by the Free Software Foundation; either version 2.1 of 012 * the License, or (at your option) any later version. 013 * 014 * JBoss DNA is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 017 * Lesser General Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser General Public 020 * License along with this software; if not, write to the Free 021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 023 */ 024 package org.jboss.dna.repository.sequencer; 025 026 import java.util.ArrayList; 027 import java.util.HashMap; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 import java.util.concurrent.ExecutorService; 033 import java.util.concurrent.Executors; 034 import java.util.concurrent.RejectedExecutionException; 035 import java.util.concurrent.TimeUnit; 036 import java.util.concurrent.atomic.AtomicLong; 037 import javax.jcr.Node; 038 import javax.jcr.Repository; 039 import javax.jcr.RepositoryException; 040 import javax.jcr.Session; 041 import javax.jcr.observation.Event; 042 import net.jcip.annotations.Immutable; 043 import net.jcip.annotations.ThreadSafe; 044 import org.jboss.dna.common.collection.SimpleProblems; 045 import org.jboss.dna.common.component.ClassLoaderFactory; 046 import org.jboss.dna.common.component.ComponentLibrary; 047 import org.jboss.dna.common.component.StandardClassLoaderFactory; 048 import org.jboss.dna.common.util.CheckArg; 049 import org.jboss.dna.common.util.HashCode; 050 import org.jboss.dna.common.util.Logger; 051 import org.jboss.dna.repository.RepositoryI18n; 052 import org.jboss.dna.repository.observation.NodeChange; 053 import org.jboss.dna.repository.observation.NodeChangeListener; 054 import org.jboss.dna.repository.observation.NodeChanges; 055 import org.jboss.dna.repository.service.AbstractServiceAdministrator; 056 import org.jboss.dna.repository.service.AdministeredService; 057 import org.jboss.dna.repository.service.ServiceAdministrator; 058 import org.jboss.dna.repository.util.JcrExecutionContext; 059 import org.jboss.dna.repository.util.RepositoryNodePath; 060 061 /** 062 * A sequencing system is used to monitor changes in the content of {@link Repository JCR repositories} and to sequence the 063 * content to extract or to generate structured information. 064 * 065 * @author Randall Hauch 066 * @author John Verhaeg 067 */ 068 public class SequencingService implements AdministeredService, NodeChangeListener { 069 070 /** 071 * Interface used to select the set of {@link Sequencer} instances that should be run. 072 * 073 * @author Randall Hauch 074 */ 075 public static interface Selector { 076 077 /** 078 * Select the sequencers that should be used to sequence the supplied node. 079 * 080 * @param sequencers the list of all sequencers available at the moment; never null 081 * @param node the node to be sequenced; never null 082 * @param nodeChange the set of node changes; never null 083 * @return the list of sequencers that should be used; may not be null 084 */ 085 List<Sequencer> selectSequencers( List<Sequencer> sequencers, 086 Node node, 087 NodeChange nodeChange ); 088 } 089 090 /** 091 * The default {@link Selector} implementation that selects every sequencer every time it's called, regardless of the node (or 092 * logger) supplied. 093 * 094 * @author Randall Hauch 095 */ 096 protected static class DefaultSelector implements Selector { 097 098 public List<Sequencer> selectSequencers( List<Sequencer> sequencers, 099 Node node, 100 NodeChange nodeChange ) { 101 return sequencers; 102 } 103 } 104 105 /** 106 * Interface used to determine whether a {@link NodeChange} should be processed. 107 * 108 * @author Randall Hauch 109 */ 110 public static interface NodeFilter { 111 112 /** 113 * Determine whether the node represented by the supplied change should be submitted for sequencing. 114 * 115 * @param nodeChange the node change event 116 * @return true if the node should be submitted for sequencing, or false if the change should be ignored 117 */ 118 boolean accept( NodeChange nodeChange ); 119 } 120 121 /** 122 * The default filter implementation, which accepts only new nodes or nodes that have new or changed properties. 123 * 124 * @author Randall Hauch 125 */ 126 protected static class DefaultNodeFilter implements NodeFilter { 127 128 public boolean accept( NodeChange nodeChange ) { 129 // Only care about new nodes or nodes that have new/changed properies ... 130 return nodeChange.includesEventTypes(Event.NODE_ADDED, Event.PROPERTY_ADDED, Event.PROPERTY_CHANGED); 131 } 132 } 133 134 /** 135 * The default {@link Selector} that considers every {@link Sequencer} to be used for every node. 136 * 137 * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector) 138 */ 139 public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector(); 140 /** 141 * The default {@link NodeFilter} that accepts new nodes or nodes that have new/changed properties. 142 * 143 * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector) 144 */ 145 public static final NodeFilter DEFAULT_NODE_FILTER = new DefaultNodeFilter(); 146 147 /** 148 * Class loader factory instance that always returns the {@link Thread#getContextClassLoader() current thread's context class 149 * loader} (if not null) or component library's class loader. 150 */ 151 protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory( 152 SequencingService.class.getClassLoader()); 153 154 /** 155 * The administrative component for this service. 156 * 157 * @author Randall Hauch 158 */ 159 protected class Administrator extends AbstractServiceAdministrator { 160 161 protected Administrator() { 162 super(RepositoryI18n.sequencingServiceName, State.PAUSED); 163 } 164 165 /** 166 * {@inheritDoc} 167 */ 168 @Override 169 protected void doStart( State fromState ) { 170 super.doStart(fromState); 171 startService(); 172 } 173 174 /** 175 * {@inheritDoc} 176 */ 177 @Override 178 protected void doShutdown( State fromState ) { 179 super.doShutdown(fromState); 180 shutdownService(); 181 } 182 183 /** 184 * {@inheritDoc} 185 */ 186 @Override 187 protected boolean doCheckIsTerminated() { 188 return isServiceTerminated(); 189 } 190 191 /** 192 * {@inheritDoc} 193 */ 194 public boolean awaitTermination( long timeout, 195 TimeUnit unit ) throws InterruptedException { 196 return doAwaitTermination(timeout, unit); 197 } 198 199 } 200 201 private JcrExecutionContext executionContext; 202 private SequencerLibrary sequencerLibrary = new SequencerLibrary(); 203 private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR; 204 private NodeFilter nodeFilter = DEFAULT_NODE_FILTER; 205 private ExecutorService executorService; 206 private final Statistics statistics = new Statistics(); 207 private final Administrator administrator = new Administrator(); 208 209 /** 210 * Create a new sequencing system, configured with no sequencers and not monitoring any workspaces. Upon construction, the 211 * system is {@link ServiceAdministrator#isPaused() paused} and must be configured and then 212 * {@link ServiceAdministrator#start() started}. 213 */ 214 public SequencingService() { 215 this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY); 216 } 217 218 /** 219 * Return the administrative component for this service. 220 * 221 * @return the administrative component; never null 222 */ 223 public ServiceAdministrator getAdministrator() { 224 return this.administrator; 225 } 226 227 /** 228 * Get the statistics for this system. 229 * 230 * @return statistics 231 */ 232 public Statistics getStatistics() { 233 return this.statistics; 234 } 235 236 /** 237 * @return sequencerLibrary 238 */ 239 protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() { 240 return this.sequencerLibrary; 241 } 242 243 /** 244 * Add the configuration for a sequencer, or update any existing one that represents the 245 * {@link SequencerConfig#equals(Object) same configuration} 246 * 247 * @param config the new configuration 248 * @return true if the sequencer was added, or false if there already was an existing and 249 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration 250 * @throws IllegalArgumentException if <code>config</code> is null 251 * @see #updateSequencer(SequencerConfig) 252 * @see #removeSequencer(SequencerConfig) 253 */ 254 public boolean addSequencer( SequencerConfig config ) { 255 return this.sequencerLibrary.add(config); 256 } 257 258 /** 259 * Update the configuration for a sequencer, or add it if there is no {@link SequencerConfig#equals(Object) matching 260 * configuration}. 261 * 262 * @param config the updated (or new) configuration 263 * @return true if the sequencer was updated, or false if there already was an existing and 264 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration 265 * @throws IllegalArgumentException if <code>config</code> is null 266 * @see #addSequencer(SequencerConfig) 267 * @see #removeSequencer(SequencerConfig) 268 */ 269 public boolean updateSequencer( SequencerConfig config ) { 270 return this.sequencerLibrary.update(config); 271 } 272 273 /** 274 * Remove the configuration for a sequencer. 275 * 276 * @param config the configuration to be removed 277 * @return true if the sequencer was removed, or false if there was no existing sequencer 278 * @throws IllegalArgumentException if <code>config</code> is null 279 * @see #addSequencer(SequencerConfig) 280 * @see #updateSequencer(SequencerConfig) 281 */ 282 public boolean removeSequencer( SequencerConfig config ) { 283 return this.sequencerLibrary.remove(config); 284 } 285 286 /** 287 * @return executionContext 288 */ 289 public JcrExecutionContext getExecutionContext() { 290 return this.executionContext; 291 } 292 293 /** 294 * @param executionContext Sets executionContext to the specified value. 295 */ 296 public void setExecutionContext( JcrExecutionContext executionContext ) { 297 CheckArg.isNotNull(executionContext, "execution context"); 298 if (this.getAdministrator().isStarted()) { 299 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text()); 300 } 301 this.executionContext = executionContext; 302 this.sequencerLibrary.setClassLoaderFactory(executionContext); 303 } 304 305 /** 306 * Get the executor service used to run the sequencers. 307 * 308 * @return the executor service 309 * @see #setExecutorService(ExecutorService) 310 */ 311 public ExecutorService getExecutorService() { 312 return this.executorService; 313 } 314 315 /** 316 * Set the executor service that should be used by this system. By default, the system is set up with a 317 * {@link Executors#newSingleThreadExecutor() executor that uses a single thread}. 318 * 319 * @param executorService the executor service 320 * @see #getExecutorService() 321 * @see Executors#newCachedThreadPool() 322 * @see Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory) 323 * @see Executors#newFixedThreadPool(int) 324 * @see Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory) 325 * @see Executors#newScheduledThreadPool(int) 326 * @see Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory) 327 * @see Executors#newSingleThreadExecutor() 328 * @see Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory) 329 * @see Executors#newSingleThreadScheduledExecutor() 330 * @see Executors#newSingleThreadScheduledExecutor(java.util.concurrent.ThreadFactory) 331 */ 332 public void setExecutorService( ExecutorService executorService ) { 333 CheckArg.isNotNull(executorService, "executor service"); 334 if (this.getAdministrator().isStarted()) { 335 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text()); 336 } 337 this.executorService = executorService; 338 } 339 340 /** 341 * Override this method to creates a different kind of default executor service. This method is called when the system is 342 * {@link #startService() started} without an executor service being {@link #setExecutorService(ExecutorService) set}. 343 * <p> 344 * This method creates a {@link Executors#newSingleThreadExecutor() single-threaded executor}. 345 * </p> 346 * 347 * @return the executor service 348 */ 349 protected ExecutorService createDefaultExecutorService() { 350 return Executors.newSingleThreadExecutor(); 351 } 352 353 protected void startService() { 354 if (this.getExecutionContext() == null) { 355 throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text()); 356 } 357 if (this.executorService == null) { 358 this.executorService = createDefaultExecutorService(); 359 } 360 assert this.executorService != null; 361 assert this.sequencerSelector != null; 362 assert this.nodeFilter != null; 363 assert this.sequencerLibrary != null; 364 } 365 366 protected void shutdownService() { 367 if (this.executorService != null) { 368 this.executorService.shutdown(); 369 } 370 } 371 372 protected boolean isServiceTerminated() { 373 if (this.executorService != null) { 374 return this.executorService.isTerminated(); 375 } 376 return true; 377 } 378 379 protected boolean doAwaitTermination( long timeout, 380 TimeUnit unit ) throws InterruptedException { 381 if (this.executorService == null || this.executorService.isTerminated()) return true; 382 return this.executorService.awaitTermination(timeout, unit); 383 } 384 385 /** 386 * Get the sequencing selector used by this system. 387 * 388 * @return the sequencing selector 389 */ 390 public Selector getSequencerSelector() { 391 return this.sequencerSelector; 392 } 393 394 /** 395 * Set the sequencer selector, or null if the {@link #DEFAULT_SEQUENCER_SELECTOR default sequencer selector} should be used. 396 * 397 * @param sequencerSelector the selector 398 */ 399 public void setSequencerSelector( Selector sequencerSelector ) { 400 this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR; 401 } 402 403 /** 404 * Get the node filter used by this system. 405 * 406 * @return the node filter 407 */ 408 public NodeFilter getNodeFilter() { 409 return this.nodeFilter; 410 } 411 412 /** 413 * Set the filter that checks which nodes are to be sequenced, or null if the {@link #DEFAULT_NODE_FILTER default node filter} 414 * should be used. 415 * 416 * @param nodeFilter the new node filter 417 */ 418 public void setNodeFilter( NodeFilter nodeFilter ) { 419 this.nodeFilter = nodeFilter != null ? nodeFilter : DEFAULT_NODE_FILTER; 420 } 421 422 /** 423 * {@inheritDoc} 424 */ 425 public void onNodeChanges( NodeChanges changes ) { 426 NodeFilter filter = this.getNodeFilter(); 427 for (final NodeChange changedNode : changes) { 428 // Only care about new nodes or nodes that have new/changed properies ... 429 if (filter.accept(changedNode)) { 430 try { 431 this.executorService.execute(new Runnable() { 432 433 public void run() { 434 processChangedNode(changedNode); 435 } 436 }); 437 } catch (RejectedExecutionException e) { 438 // The executor service has been shut down, so do nothing with this set of changes 439 } 440 } 441 } 442 } 443 444 /** 445 * Do the work of processing by sequencing the node. This method is called by the {@link #executorService executor service} 446 * when it performs it's work on the enqueued {@link NodeChange NodeChange runnable objects}. 447 * 448 * @param changedNode the node to be processed. 449 */ 450 protected void processChangedNode( NodeChange changedNode ) { 451 final JcrExecutionContext context = this.getExecutionContext(); 452 final Logger logger = context.getLogger(getClass()); 453 assert logger != null; 454 try { 455 final String repositoryWorkspaceName = changedNode.getRepositoryWorkspaceName(); 456 Session session = null; 457 try { 458 // Figure out which sequencers accept this path, 459 // and track which output nodes should be passed to each sequencer... 460 final String nodePath = changedNode.getAbsolutePath(); 461 Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>(); 462 List<Sequencer> allSequencers = this.sequencerLibrary.getInstances(); 463 List<Sequencer> sequencers = new ArrayList<Sequencer>(allSequencers.size()); 464 for (Sequencer sequencer : allSequencers) { 465 final SequencerConfig config = sequencer.getConfiguration(); 466 for (SequencerPathExpression pathExpression : config.getPathExpressions()) { 467 for (String propertyName : changedNode.getModifiedProperties()) { 468 String path = nodePath + "/@" + propertyName; 469 SequencerPathExpression.Matcher matcher = pathExpression.matcher(path); 470 if (matcher.matches()) { 471 // String selectedPath = matcher.getSelectedPath(); 472 RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(), 473 repositoryWorkspaceName); 474 SequencerCall call = new SequencerCall(sequencer, propertyName); 475 // Record the output path ... 476 Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call); 477 if (outputPaths == null) { 478 outputPaths = new HashSet<RepositoryNodePath>(); 479 sequencerCalls.put(call, outputPaths); 480 } 481 outputPaths.add(outputPath); 482 sequencers.add(sequencer); 483 break; 484 } 485 } 486 } 487 } 488 489 Node node = null; 490 if (!sequencers.isEmpty()) { 491 // Create a session that we'll use for all sequencing ... 492 session = context.getSessionFactory().createSession(repositoryWorkspaceName); 493 494 // Find the changed node ... 495 String relPath = changedNode.getAbsolutePath().replaceAll("^/+", ""); 496 node = session.getRootNode().getNode(relPath); 497 498 // Figure out which sequencers should run ... 499 sequencers = this.sequencerSelector.selectSequencers(sequencers, node, changedNode); 500 } 501 if (sequencers.isEmpty()) { 502 this.statistics.recordNodeSkipped(); 503 if (logger.isDebugEnabled()) { 504 logger.trace("Skipping '{0}': no sequencers matched this condition", changedNode); 505 } 506 } else { 507 // Run each of those sequencers ... 508 for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) { 509 final SequencerCall sequencerCall = entry.getKey(); 510 final Set<RepositoryNodePath> outputPaths = entry.getValue(); 511 final Sequencer sequencer = sequencerCall.getSequencer(); 512 final String sequencerName = sequencer.getConfiguration().getName(); 513 final String propertyName = sequencerCall.getSequencedPropertyName(); 514 515 // Get the paths to the nodes where the sequencer should write it's output ... 516 assert outputPaths != null && outputPaths.size() != 0; 517 518 // Create a new execution context for each sequencer 519 final SimpleProblems problems = new SimpleProblems(); 520 JcrExecutionContext sequencerContext = context.clone(); 521 try { 522 sequencer.execute(node, propertyName, changedNode, outputPaths, sequencerContext, problems); 523 } catch (RepositoryException e) { 524 logger.error(e, RepositoryI18n.errorInRepositoryWhileSequencingNode, sequencerName, changedNode); 525 } catch (SequencerException e) { 526 logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, changedNode); 527 } finally { 528 try { 529 // Save the changes made by each sequencer ... 530 if (session != null) session.save(); 531 } finally { 532 // And always close the context. 533 // This closes all sessions that may have been created by the sequencer. 534 sequencerContext.close(); 535 } 536 } 537 } 538 this.statistics.recordNodeSequenced(); 539 } 540 } finally { 541 if (session != null) session.logout(); 542 } 543 } catch (RepositoryException e) { 544 logger.error(e, RepositoryI18n.errorInRepositoryWhileFindingSequencersToRunAgainstNode, changedNode); 545 } catch (Throwable e) { 546 logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, changedNode); 547 } 548 } 549 550 /** 551 * The statistics for the system. Each sequencing system has an instance of this class that is updated. 552 * 553 * @author Randall Hauch 554 */ 555 @ThreadSafe 556 public class Statistics { 557 558 private final AtomicLong numberOfNodesSequenced = new AtomicLong(0); 559 private final AtomicLong numberOfNodesSkipped = new AtomicLong(0); 560 private final AtomicLong startTime; 561 562 protected Statistics() { 563 startTime = new AtomicLong(System.currentTimeMillis()); 564 } 565 566 public Statistics reset() { 567 this.startTime.set(System.currentTimeMillis()); 568 this.numberOfNodesSequenced.set(0); 569 this.numberOfNodesSkipped.set(0); 570 return this; 571 } 572 573 /** 574 * @return the system time when the statistics were started 575 */ 576 public long getStartTime() { 577 return this.startTime.get(); 578 } 579 580 /** 581 * @return the number of nodes that were sequenced 582 */ 583 public long getNumberOfNodesSequenced() { 584 return this.numberOfNodesSequenced.get(); 585 } 586 587 /** 588 * @return the number of nodes that were skipped because no sequencers applied 589 */ 590 public long getNumberOfNodesSkipped() { 591 return this.numberOfNodesSkipped.get(); 592 } 593 594 protected void recordNodeSequenced() { 595 this.numberOfNodesSequenced.incrementAndGet(); 596 } 597 598 protected void recordNodeSkipped() { 599 this.numberOfNodesSkipped.incrementAndGet(); 600 } 601 } 602 603 @Immutable 604 protected class SequencerCall { 605 606 private final Sequencer sequencer; 607 private final String sequencerName; 608 private final String sequencedPropertyName; 609 private final int hc; 610 611 protected SequencerCall( Sequencer sequencer, 612 String sequencedPropertyName ) { 613 this.sequencer = sequencer; 614 this.sequencerName = sequencer.getConfiguration().getName(); 615 this.sequencedPropertyName = sequencedPropertyName; 616 this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName); 617 } 618 619 /** 620 * @return sequencer 621 */ 622 public Sequencer getSequencer() { 623 return this.sequencer; 624 } 625 626 /** 627 * @return sequencedPropertyName 628 */ 629 public String getSequencedPropertyName() { 630 return this.sequencedPropertyName; 631 } 632 633 /** 634 * {@inheritDoc} 635 */ 636 @Override 637 public int hashCode() { 638 return this.hc; 639 } 640 641 /** 642 * {@inheritDoc} 643 */ 644 @Override 645 public boolean equals( Object obj ) { 646 if (obj == this) return true; 647 if (obj instanceof SequencerCall) { 648 SequencerCall that = (SequencerCall)obj; 649 if (!this.sequencerName.equals(that.sequencerName)) return false; 650 if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false; 651 return true; 652 } 653 return false; 654 } 655 } 656 }