001 /* 002 * JBoss DNA (http://www.jboss.org/dna) 003 * See the COPYRIGHT.txt file distributed with this work for information 004 * regarding copyright ownership. Some portions may be licensed 005 * to Red Hat, Inc. under one or more contributor license agreements. 006 * See the AUTHORS.txt file in the distribution for a full listing of 007 * individual contributors. 008 * 009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA 010 * is licensed to you under the terms of the GNU Lesser General Public License as 011 * published by the Free Software Foundation; either version 2.1 of 012 * the License, or (at your option) any later version. 013 * 014 * JBoss DNA is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 017 * Lesser General Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser General Public 020 * License along with this software; if not, write to the Free 021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 023 */ 024 package org.jboss.dna.repository.sequencer; 025 026 import java.util.ArrayList; 027 import java.util.HashMap; 028 import java.util.HashSet; 029 import java.util.List; 030 import java.util.Map; 031 import java.util.Set; 032 import java.util.concurrent.ExecutorService; 033 import java.util.concurrent.Executors; 034 import java.util.concurrent.RejectedExecutionException; 035 import java.util.concurrent.TimeUnit; 036 import java.util.concurrent.atomic.AtomicLong; 037 import net.jcip.annotations.Immutable; 038 import net.jcip.annotations.ThreadSafe; 039 import org.jboss.dna.common.collection.SimpleProblems; 040 import org.jboss.dna.common.component.ClassLoaderFactory; 041 import org.jboss.dna.common.component.ComponentLibrary; 042 import org.jboss.dna.common.component.StandardClassLoaderFactory; 043 import org.jboss.dna.common.util.CheckArg; 044 import org.jboss.dna.common.util.HashCode; 045 import org.jboss.dna.common.util.Logger; 046 import org.jboss.dna.graph.ExecutionContext; 047 import org.jboss.dna.graph.Graph; 048 import org.jboss.dna.graph.Node; 049 import org.jboss.dna.graph.connector.RepositorySource; 050 import org.jboss.dna.graph.observe.ChangeObserver; 051 import org.jboss.dna.graph.observe.NetChangeObserver; 052 import org.jboss.dna.graph.observe.NetChangeObserver.NetChange; 053 import org.jboss.dna.graph.property.Name; 054 import org.jboss.dna.graph.property.Path; 055 import org.jboss.dna.graph.property.Property; 056 import org.jboss.dna.repository.RepositoryI18n; 057 import org.jboss.dna.repository.RepositoryLibrary; 058 import org.jboss.dna.repository.service.AbstractServiceAdministrator; 059 import org.jboss.dna.repository.service.AdministeredService; 060 import org.jboss.dna.repository.service.ServiceAdministrator; 061 import org.jboss.dna.repository.util.RepositoryNodePath; 062 063 /** 064 * A sequencing system is used to monitor changes in the content of DNA repositories and to sequence the content to extract or to 065 * generate structured information. 066 * 067 * @author Randall Hauch 068 * @author John Verhaeg 069 */ 070 public class SequencingService implements AdministeredService { 071 072 /** 073 * Interface used to select the set of {@link Sequencer} instances that should be run. 074 * 075 * @author Randall Hauch 076 */ 077 public static interface Selector { 078 079 /** 080 * Select the sequencers that should be used to sequence the supplied node. 081 * 082 * @param sequencers the list of all sequencers available at the moment; never null 083 * @param node the node to be sequenced; never null 084 * @param nodeChange the set of node changes; never null 085 * @return the list of sequencers that should be used; may not be null 086 */ 087 List<Sequencer> selectSequencers( List<Sequencer> sequencers, 088 Node node, 089 NetChange nodeChange ); 090 } 091 092 /** 093 * The default {@link Selector} implementation that selects every sequencer every time it's called, regardless of the node (or 094 * logger) supplied. 095 * 096 * @author Randall Hauch 097 */ 098 protected static class DefaultSelector implements Selector { 099 100 public List<Sequencer> selectSequencers( List<Sequencer> sequencers, 101 Node node, 102 NetChange nodeChange ) { 103 return sequencers; 104 } 105 } 106 107 /** 108 * The default {@link Selector} that considers every {@link Sequencer} to be used for every node. 109 * 110 * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector) 111 */ 112 public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector(); 113 114 /** 115 * Class loader factory instance that always returns the {@link Thread#getContextClassLoader() current thread's context class 116 * loader} (if not null) or component library's class loader. 117 */ 118 protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory( 119 SequencingService.class.getClassLoader()); 120 121 /** 122 * The administrative component for this service. 123 * 124 * @author Randall Hauch 125 */ 126 protected class Administrator extends AbstractServiceAdministrator { 127 128 protected Administrator() { 129 super(RepositoryI18n.sequencingServiceName, State.PAUSED); 130 } 131 132 /** 133 * {@inheritDoc} 134 */ 135 @Override 136 protected void doStart( State fromState ) { 137 super.doStart(fromState); 138 startService(); 139 } 140 141 /** 142 * {@inheritDoc} 143 */ 144 @Override 145 protected void doShutdown( State fromState ) { 146 super.doShutdown(fromState); 147 shutdownService(); 148 } 149 150 /** 151 * {@inheritDoc} 152 */ 153 @Override 154 protected boolean doCheckIsTerminated() { 155 return isServiceTerminated(); 156 } 157 158 /** 159 * {@inheritDoc} 160 */ 161 public boolean awaitTermination( long timeout, 162 TimeUnit unit ) throws InterruptedException { 163 return doAwaitTermination(timeout, unit); 164 } 165 166 } 167 168 private ExecutionContext executionContext; 169 private SequencerLibrary sequencerLibrary = new SequencerLibrary(); 170 private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR; 171 private ExecutorService executorService; 172 private RepositoryLibrary repositoryLibrary; 173 private ChangeObserver repositoryObserver; 174 private final Statistics statistics = new Statistics(); 175 private final Administrator administrator = new Administrator(); 176 177 /** 178 * Create a new sequencing system, configured with no sequencers and not monitoring any workspaces. Upon construction, the 179 * system is {@link ServiceAdministrator#isPaused() paused} and must be configured and then 180 * {@link ServiceAdministrator#start() started}. 181 */ 182 public SequencingService() { 183 this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY); 184 } 185 186 /** 187 * Return the administrative component for this service. 188 * 189 * @return the administrative component; never null 190 */ 191 public ServiceAdministrator getAdministrator() { 192 return this.administrator; 193 } 194 195 /** 196 * Get the statistics for this system. 197 * 198 * @return statistics 199 */ 200 public Statistics getStatistics() { 201 return this.statistics; 202 } 203 204 /** 205 * @return sequencerLibrary 206 */ 207 protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() { 208 return this.sequencerLibrary; 209 } 210 211 /** 212 * Add the configuration for a sequencer, or update any existing one that represents the 213 * {@link SequencerConfig#equals(Object) same configuration} 214 * 215 * @param config the new configuration 216 * @return true if the sequencer was added, or false if there already was an existing and 217 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration 218 * @throws IllegalArgumentException if <code>config</code> is null 219 * @see #updateSequencer(SequencerConfig) 220 * @see #removeSequencer(SequencerConfig) 221 */ 222 public boolean addSequencer( SequencerConfig config ) { 223 return this.sequencerLibrary.add(config); 224 } 225 226 /** 227 * Update the configuration for a sequencer, or add it if there is no {@link SequencerConfig#equals(Object) matching 228 * configuration}. 229 * 230 * @param config the updated (or new) configuration 231 * @return true if the sequencer was updated, or false if there already was an existing and 232 * {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration 233 * @throws IllegalArgumentException if <code>config</code> is null 234 * @see #addSequencer(SequencerConfig) 235 * @see #removeSequencer(SequencerConfig) 236 */ 237 public boolean updateSequencer( SequencerConfig config ) { 238 return this.sequencerLibrary.update(config); 239 } 240 241 /** 242 * Remove the configuration for a sequencer. 243 * 244 * @param config the configuration to be removed 245 * @return true if the sequencer was removed, or false if there was no existing sequencer 246 * @throws IllegalArgumentException if <code>config</code> is null 247 * @see #addSequencer(SequencerConfig) 248 * @see #updateSequencer(SequencerConfig) 249 */ 250 public boolean removeSequencer( SequencerConfig config ) { 251 return this.sequencerLibrary.remove(config); 252 } 253 254 /** 255 * @return executionContext 256 */ 257 public ExecutionContext getExecutionContext() { 258 return this.executionContext; 259 } 260 261 /** 262 * @param executionContext Sets executionContext to the specified value. 263 */ 264 public void setExecutionContext( ExecutionContext executionContext ) { 265 CheckArg.isNotNull(executionContext, "execution context"); 266 if (this.getAdministrator().isStarted()) { 267 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text()); 268 } 269 this.executionContext = executionContext; 270 this.sequencerLibrary.setClassLoaderFactory(executionContext); 271 } 272 273 /** 274 * Get the repository library to be used for repository lookup 275 * 276 * @return the repository library 277 */ 278 public RepositoryLibrary getRepositoryLibrary() { 279 return this.repositoryLibrary; 280 } 281 282 public void setRepositoryLibrary( RepositoryLibrary repositoryLibrary ) { 283 this.repositoryLibrary = repositoryLibrary; 284 } 285 286 /** 287 * Get the executor service used to run the sequencers. 288 * 289 * @return the executor service 290 * @see #setExecutorService(ExecutorService) 291 */ 292 public ExecutorService getExecutorService() { 293 return this.executorService; 294 } 295 296 /** 297 * Set the executor service that should be used by this system. By default, the system is set up with a 298 * {@link Executors#newSingleThreadExecutor() executor that uses a single thread}. 299 * 300 * @param executorService the executor service 301 * @see #getExecutorService() 302 * @see Executors#newCachedThreadPool() 303 * @see Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory) 304 * @see Executors#newFixedThreadPool(int) 305 * @see Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory) 306 * @see Executors#newScheduledThreadPool(int) 307 * @see Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory) 308 * @see Executors#newSingleThreadExecutor() 309 * @see Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory) 310 * @see Executors#newSingleThreadScheduledExecutor() 311 * @see Executors#newSingleThreadScheduledExecutor(java.util.concurrent.ThreadFactory) 312 */ 313 public void setExecutorService( ExecutorService executorService ) { 314 CheckArg.isNotNull(executorService, "executor service"); 315 if (this.getAdministrator().isStarted()) { 316 throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text()); 317 } 318 this.executorService = executorService; 319 } 320 321 /** 322 * Override this method to creates a different kind of default executor service. This method is called when the system is 323 * {@link #startService() started} without an executor service being {@link #setExecutorService(ExecutorService) set}. 324 * <p> 325 * This method creates a {@link Executors#newSingleThreadExecutor() single-threaded executor}. 326 * </p> 327 * 328 * @return the executor service 329 */ 330 protected ExecutorService createDefaultExecutorService() { 331 return Executors.newSingleThreadExecutor(); 332 } 333 334 protected void startService() { 335 if (this.getExecutionContext() == null) { 336 throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text()); 337 } 338 if (this.executorService == null) { 339 this.executorService = createDefaultExecutorService(); 340 } 341 assert this.executorService != null; 342 assert this.sequencerSelector != null; 343 assert this.sequencerLibrary != null; 344 assert this.repositoryLibrary != null; 345 this.repositoryObserver = new RepositoryObserver(); 346 // Register the observer ... 347 this.repositoryLibrary.register(this.repositoryObserver); 348 } 349 350 protected void shutdownService() { 351 // Unregister our observer ... 352 if (this.repositoryObserver != null) this.repositoryObserver.unregister(); 353 // And shut down the executor service .. 354 if (this.executorService != null) { 355 this.executorService.shutdown(); 356 } 357 } 358 359 protected boolean isServiceTerminated() { 360 if (this.executorService != null) { 361 return this.executorService.isTerminated(); 362 } 363 return true; 364 } 365 366 protected boolean doAwaitTermination( long timeout, 367 TimeUnit unit ) throws InterruptedException { 368 if (this.executorService == null || this.executorService.isTerminated()) return true; 369 return this.executorService.awaitTermination(timeout, unit); 370 } 371 372 /** 373 * Get the sequencing selector used by this system. 374 * 375 * @return the sequencing selector 376 */ 377 public Selector getSequencerSelector() { 378 return this.sequencerSelector; 379 } 380 381 /** 382 * Set the sequencer selector, or null if the {@link #DEFAULT_SEQUENCER_SELECTOR default sequencer selector} should be used. 383 * 384 * @param sequencerSelector the selector 385 */ 386 public void setSequencerSelector( Selector sequencerSelector ) { 387 this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR; 388 } 389 390 /** 391 * Do the work of processing by sequencing the node. This method is called by the {@link #executorService executor service} 392 * when it performs it's work on the enqueued {@link NetChange NetChange runnable objects}. 393 * 394 * @param change the change describing the node to be processed. 395 */ 396 protected void processChange( NetChange change ) { 397 final ExecutionContext context = this.getExecutionContext(); 398 final Logger logger = context.getLogger(getClass()); 399 assert logger != null; 400 401 try { 402 final String repositorySourceName = change.getRepositorySourceName(); 403 final String repositoryWorkspaceName = change.getRepositoryWorkspaceName(); 404 405 // Figure out which sequencers accept this path, 406 // and track which output nodes should be passed to each sequencer... 407 final Path nodePath = change.getPath(); 408 final String nodePathStr = context.getValueFactories().getStringFactory().create(nodePath); 409 Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>(); 410 List<Sequencer> allSequencers = this.sequencerLibrary.getInstances(); 411 List<Sequencer> sequencers = new ArrayList<Sequencer>(allSequencers.size()); 412 for (Sequencer sequencer : allSequencers) { 413 final SequencerConfig config = sequencer.getConfiguration(); 414 for (SequencerPathExpression pathExpression : config.getPathExpressions()) { 415 for (Property property : change.getModifiedProperties()) { 416 Name propertyName = property.getName(); 417 String propertyNameStr = context.getValueFactories().getStringFactory().create(propertyName); 418 String path = nodePathStr + "/@" + propertyNameStr; 419 SequencerPathExpression.Matcher matcher = pathExpression.matcher(path); 420 if (matcher.matches()) { 421 // String selectedPath = matcher.getSelectedPath(); 422 RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(), 423 repositorySourceName, 424 repositoryWorkspaceName); 425 SequencerCall call = new SequencerCall(sequencer, propertyNameStr); 426 // Record the output path ... 427 Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call); 428 if (outputPaths == null) { 429 outputPaths = new HashSet<RepositoryNodePath>(); 430 sequencerCalls.put(call, outputPaths); 431 } 432 outputPaths.add(outputPath); 433 sequencers.add(sequencer); 434 break; 435 } 436 } 437 } 438 } 439 440 RepositorySource source = repositoryLibrary.getSource(repositorySourceName); 441 Graph graph = Graph.create(source, context); 442 Node node = null; 443 if (!sequencers.isEmpty()) { 444 445 // Find the changed node ... 446 node = graph.getNodeAt(nodePath); 447 448 // Figure out which sequencers should run ... 449 sequencers = this.sequencerSelector.selectSequencers(sequencers, node, change); 450 } 451 if (sequencers.isEmpty()) { 452 this.statistics.recordNodeSkipped(); 453 if (logger.isDebugEnabled()) { 454 logger.trace("Skipping '{0}': no sequencers matched this condition", change); 455 } 456 } else { 457 // Run each of those sequencers ... 458 for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) { 459 460 final SequencerCall sequencerCall = entry.getKey(); 461 final Set<RepositoryNodePath> outputPaths = entry.getValue(); 462 final Sequencer sequencer = sequencerCall.getSequencer(); 463 final String sequencerName = sequencer.getConfiguration().getName(); 464 final String propertyName = sequencerCall.getSequencedPropertyName(); 465 466 // Get the paths to the nodes where the sequencer should write it's output ... 467 assert outputPaths != null && outputPaths.size() != 0; 468 469 // Create a new execution context for each sequencer 470 final SimpleProblems problems = new SimpleProblems(); 471 SequencerContext sequencerContext = new SequencerContext(context, graph); 472 try { 473 sequencer.execute(node, propertyName, change, outputPaths, sequencerContext, problems); 474 sequencerContext.getDestination().submit(); 475 } catch (SequencerException e) { 476 logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change); 477 } 478 } 479 this.statistics.recordNodeSequenced(); 480 } 481 } catch (Throwable e) { 482 logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, change); 483 } 484 } 485 486 /** 487 * The statistics for the system. Each sequencing system has an instance of this class that is updated. 488 * 489 * @author Randall Hauch 490 */ 491 @ThreadSafe 492 public class Statistics { 493 494 private final AtomicLong numberOfNodesSequenced = new AtomicLong(0); 495 private final AtomicLong numberOfNodesSkipped = new AtomicLong(0); 496 private final AtomicLong startTime; 497 498 protected Statistics() { 499 startTime = new AtomicLong(System.currentTimeMillis()); 500 } 501 502 public Statistics reset() { 503 this.startTime.set(System.currentTimeMillis()); 504 this.numberOfNodesSequenced.set(0); 505 this.numberOfNodesSkipped.set(0); 506 return this; 507 } 508 509 /** 510 * @return the system time when the statistics were started 511 */ 512 public long getStartTime() { 513 return this.startTime.get(); 514 } 515 516 /** 517 * @return the number of nodes that were sequenced 518 */ 519 public long getNumberOfNodesSequenced() { 520 return this.numberOfNodesSequenced.get(); 521 } 522 523 /** 524 * @return the number of nodes that were skipped because no sequencers applied 525 */ 526 public long getNumberOfNodesSkipped() { 527 return this.numberOfNodesSkipped.get(); 528 } 529 530 protected void recordNodeSequenced() { 531 this.numberOfNodesSequenced.incrementAndGet(); 532 } 533 534 protected void recordNodeSkipped() { 535 this.numberOfNodesSkipped.incrementAndGet(); 536 } 537 } 538 539 @Immutable 540 protected class SequencerCall { 541 542 private final Sequencer sequencer; 543 private final String sequencerName; 544 private final String sequencedPropertyName; 545 private final int hc; 546 547 protected SequencerCall( Sequencer sequencer, 548 String sequencedPropertyName ) { 549 this.sequencer = sequencer; 550 this.sequencerName = sequencer.getConfiguration().getName(); 551 this.sequencedPropertyName = sequencedPropertyName; 552 this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName); 553 } 554 555 /** 556 * @return sequencer 557 */ 558 public Sequencer getSequencer() { 559 return this.sequencer; 560 } 561 562 /** 563 * @return sequencedPropertyName 564 */ 565 public String getSequencedPropertyName() { 566 return this.sequencedPropertyName; 567 } 568 569 /** 570 * {@inheritDoc} 571 */ 572 @Override 573 public int hashCode() { 574 return this.hc; 575 } 576 577 /** 578 * {@inheritDoc} 579 */ 580 @Override 581 public boolean equals( Object obj ) { 582 if (obj == this) return true; 583 if (obj instanceof SequencerCall) { 584 SequencerCall that = (SequencerCall)obj; 585 if (!this.sequencerName.equals(that.sequencerName)) return false; 586 if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false; 587 return true; 588 } 589 return false; 590 } 591 } 592 593 protected class RepositoryObserver extends NetChangeObserver { 594 protected RepositoryObserver() { 595 } 596 597 /** 598 * {@inheritDoc} 599 * 600 * @see org.jboss.dna.graph.observe.NetChangeObserver#notify(org.jboss.dna.graph.observe.NetChangeObserver.NetChange) 601 */ 602 @Override 603 protected void notify( final NetChange change ) { 604 // Only care about new nodes or nodes that have new/changed properies ... 605 if (change.includes(ChangeType.NODE_ADDED, ChangeType.PROPERTY_ADDED, ChangeType.PROPERTY_CHANGED)) { 606 try { 607 getExecutorService().execute(new Runnable() { 608 609 public void run() { 610 processChange(change); 611 } 612 }); 613 } catch (RejectedExecutionException e) { 614 // The executor service has been shut down, so do nothing with this set of changes 615 } 616 } 617 } 618 } 619 }