001    /*
002     * JBoss DNA (http://www.jboss.org/dna)
003     * See the COPYRIGHT.txt file distributed with this work for information
004     * regarding copyright ownership.  Some portions may be licensed
005     * to Red Hat, Inc. under one or more contributor license agreements.
006     * See the AUTHORS.txt file in the distribution for a full listing of 
007     * individual contributors. 
008     *
009     * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010     * is licensed to you under the terms of the GNU Lesser General Public License as
011     * published by the Free Software Foundation; either version 2.1 of
012     * the License, or (at your option) any later version.
013     *
014     * JBoss DNA is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017     * Lesser General Public License for more details.
018     *
019     * You should have received a copy of the GNU Lesser General Public
020     * License along with this software; if not, write to the Free
021     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023     */
024    package org.jboss.dna.repository.sequencer;
025    
026    import java.util.ArrayList;
027    import java.util.HashMap;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Executors;
034    import java.util.concurrent.RejectedExecutionException;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.atomic.AtomicLong;
037    import javax.jcr.Node;
038    import javax.jcr.Repository;
039    import javax.jcr.RepositoryException;
040    import javax.jcr.Session;
041    import javax.jcr.observation.Event;
042    import net.jcip.annotations.Immutable;
043    import net.jcip.annotations.ThreadSafe;
044    import org.jboss.dna.common.collection.SimpleProblems;
045    import org.jboss.dna.common.component.ClassLoaderFactory;
046    import org.jboss.dna.common.component.ComponentLibrary;
047    import org.jboss.dna.common.component.StandardClassLoaderFactory;
048    import org.jboss.dna.common.util.CheckArg;
049    import org.jboss.dna.common.util.HashCode;
050    import org.jboss.dna.common.util.Logger;
051    import org.jboss.dna.repository.RepositoryI18n;
052    import org.jboss.dna.repository.observation.NodeChange;
053    import org.jboss.dna.repository.observation.NodeChangeListener;
054    import org.jboss.dna.repository.observation.NodeChanges;
055    import org.jboss.dna.repository.service.AbstractServiceAdministrator;
056    import org.jboss.dna.repository.service.AdministeredService;
057    import org.jboss.dna.repository.service.ServiceAdministrator;
058    import org.jboss.dna.repository.util.JcrExecutionContext;
059    import org.jboss.dna.repository.util.RepositoryNodePath;
060    
061    /**
062     * A sequencing system is used to monitor changes in the content of {@link Repository JCR repositories} and to sequence the
063     * content to extract or to generate structured information.
064     * 
065     * @author Randall Hauch
066     * @author John Verhaeg
067     */
068    public class SequencingService implements AdministeredService, NodeChangeListener {
069    
070        /**
071         * Interface used to select the set of {@link Sequencer} instances that should be run.
072         * 
073         * @author Randall Hauch
074         */
075        public static interface Selector {
076    
077            /**
078             * Select the sequencers that should be used to sequence the supplied node.
079             * 
080             * @param sequencers the list of all sequencers available at the moment; never null
081             * @param node the node to be sequenced; never null
082             * @param nodeChange the set of node changes; never null
083             * @return the list of sequencers that should be used; may not be null
084             */
085            List<Sequencer> selectSequencers( List<Sequencer> sequencers,
086                                              Node node,
087                                              NodeChange nodeChange );
088        }
089    
090        /**
091         * The default {@link Selector} implementation that selects every sequencer every time it's called, regardless of the node (or
092         * logger) supplied.
093         * 
094         * @author Randall Hauch
095         */
096        protected static class DefaultSelector implements Selector {
097    
098            public List<Sequencer> selectSequencers( List<Sequencer> sequencers,
099                                                     Node node,
100                                                     NodeChange nodeChange ) {
101                return sequencers;
102            }
103        }
104    
105        /**
106         * Interface used to determine whether a {@link NodeChange} should be processed.
107         * 
108         * @author Randall Hauch
109         */
110        public static interface NodeFilter {
111    
112            /**
113             * Determine whether the node represented by the supplied change should be submitted for sequencing.
114             * 
115             * @param nodeChange the node change event
116             * @return true if the node should be submitted for sequencing, or false if the change should be ignored
117             */
118            boolean accept( NodeChange nodeChange );
119        }
120    
121        /**
122         * The default filter implementation, which accepts only new nodes or nodes that have new or changed properties.
123         * 
124         * @author Randall Hauch
125         */
126        protected static class DefaultNodeFilter implements NodeFilter {
127    
128            public boolean accept( NodeChange nodeChange ) {
129                // Only care about new nodes or nodes that have new/changed properies ...
130                return nodeChange.includesEventTypes(Event.NODE_ADDED, Event.PROPERTY_ADDED, Event.PROPERTY_CHANGED);
131            }
132        }
133    
134        /**
135         * The default {@link Selector} that considers every {@link Sequencer} to be used for every node.
136         * 
137         * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector)
138         */
139        public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector();
140        /**
141         * The default {@link NodeFilter} that accepts new nodes or nodes that have new/changed properties.
142         * 
143         * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector)
144         */
145        public static final NodeFilter DEFAULT_NODE_FILTER = new DefaultNodeFilter();
146    
147        /**
148         * Class loader factory instance that always returns the {@link Thread#getContextClassLoader() current thread's context class
149         * loader} (if not null) or component library's class loader.
150         */
151        protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory(
152                                                                                                               SequencingService.class.getClassLoader());
153    
154        /**
155         * The administrative component for this service.
156         * 
157         * @author Randall Hauch
158         */
159        protected class Administrator extends AbstractServiceAdministrator {
160    
161            protected Administrator() {
162                super(RepositoryI18n.sequencingServiceName, State.PAUSED);
163            }
164    
165            /**
166             * {@inheritDoc}
167             */
168            @Override
169            protected void doStart( State fromState ) {
170                super.doStart(fromState);
171                startService();
172            }
173    
174            /**
175             * {@inheritDoc}
176             */
177            @Override
178            protected void doShutdown( State fromState ) {
179                super.doShutdown(fromState);
180                shutdownService();
181            }
182    
183            /**
184             * {@inheritDoc}
185             */
186            @Override
187            protected boolean doCheckIsTerminated() {
188                return isServiceTerminated();
189            }
190    
191            /**
192             * {@inheritDoc}
193             */
194            public boolean awaitTermination( long timeout,
195                                             TimeUnit unit ) throws InterruptedException {
196                return doAwaitTermination(timeout, unit);
197            }
198    
199        }
200    
201        private JcrExecutionContext executionContext;
202        private SequencerLibrary sequencerLibrary = new SequencerLibrary();
203        private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR;
204        private NodeFilter nodeFilter = DEFAULT_NODE_FILTER;
205        private ExecutorService executorService;
206        private final Statistics statistics = new Statistics();
207        private final Administrator administrator = new Administrator();
208    
209        /**
210         * Create a new sequencing system, configured with no sequencers and not monitoring any workspaces. Upon construction, the
211         * system is {@link ServiceAdministrator#isPaused() paused} and must be configured and then
212         * {@link ServiceAdministrator#start() started}.
213         */
214        public SequencingService() {
215            this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY);
216        }
217    
218        /**
219         * Return the administrative component for this service.
220         * 
221         * @return the administrative component; never null
222         */
223        public ServiceAdministrator getAdministrator() {
224            return this.administrator;
225        }
226    
227        /**
228         * Get the statistics for this system.
229         * 
230         * @return statistics
231         */
232        public Statistics getStatistics() {
233            return this.statistics;
234        }
235    
236        /**
237         * @return sequencerLibrary
238         */
239        protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() {
240            return this.sequencerLibrary;
241        }
242    
243        /**
244         * Add the configuration for a sequencer, or update any existing one that represents the
245         * {@link SequencerConfig#equals(Object) same configuration}
246         * 
247         * @param config the new configuration
248         * @return true if the sequencer was added, or false if there already was an existing and
249         *         {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
250         * @throws IllegalArgumentException if <code>config</code> is null
251         * @see #updateSequencer(SequencerConfig)
252         * @see #removeSequencer(SequencerConfig)
253         */
254        public boolean addSequencer( SequencerConfig config ) {
255            return this.sequencerLibrary.add(config);
256        }
257    
258        /**
259         * Update the configuration for a sequencer, or add it if there is no {@link SequencerConfig#equals(Object) matching
260         * configuration}.
261         * 
262         * @param config the updated (or new) configuration
263         * @return true if the sequencer was updated, or false if there already was an existing and
264         *         {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
265         * @throws IllegalArgumentException if <code>config</code> is null
266         * @see #addSequencer(SequencerConfig)
267         * @see #removeSequencer(SequencerConfig)
268         */
269        public boolean updateSequencer( SequencerConfig config ) {
270            return this.sequencerLibrary.update(config);
271        }
272    
273        /**
274         * Remove the configuration for a sequencer.
275         * 
276         * @param config the configuration to be removed
277         * @return true if the sequencer was removed, or false if there was no existing sequencer
278         * @throws IllegalArgumentException if <code>config</code> is null
279         * @see #addSequencer(SequencerConfig)
280         * @see #updateSequencer(SequencerConfig)
281         */
282        public boolean removeSequencer( SequencerConfig config ) {
283            return this.sequencerLibrary.remove(config);
284        }
285    
286        /**
287         * @return executionContext
288         */
289        public JcrExecutionContext getExecutionContext() {
290            return this.executionContext;
291        }
292    
293        /**
294         * @param executionContext Sets executionContext to the specified value.
295         */
296        public void setExecutionContext( JcrExecutionContext executionContext ) {
297            CheckArg.isNotNull(executionContext, "execution context");
298            if (this.getAdministrator().isStarted()) {
299                throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
300            }
301            this.executionContext = executionContext;
302            this.sequencerLibrary.setClassLoaderFactory(executionContext);
303        }
304    
305        /**
306         * Get the executor service used to run the sequencers.
307         * 
308         * @return the executor service
309         * @see #setExecutorService(ExecutorService)
310         */
311        public ExecutorService getExecutorService() {
312            return this.executorService;
313        }
314    
315        /**
316         * Set the executor service that should be used by this system. By default, the system is set up with a
317         * {@link Executors#newSingleThreadExecutor() executor that uses a single thread}.
318         * 
319         * @param executorService the executor service
320         * @see #getExecutorService()
321         * @see Executors#newCachedThreadPool()
322         * @see Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
323         * @see Executors#newFixedThreadPool(int)
324         * @see Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)
325         * @see Executors#newScheduledThreadPool(int)
326         * @see Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)
327         * @see Executors#newSingleThreadExecutor()
328         * @see Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)
329         * @see Executors#newSingleThreadScheduledExecutor()
330         * @see Executors#newSingleThreadScheduledExecutor(java.util.concurrent.ThreadFactory)
331         */
332        public void setExecutorService( ExecutorService executorService ) {
333            CheckArg.isNotNull(executorService, "executor service");
334            if (this.getAdministrator().isStarted()) {
335                throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
336            }
337            this.executorService = executorService;
338        }
339    
340        /**
341         * Override this method to creates a different kind of default executor service. This method is called when the system is
342         * {@link #startService() started} without an executor service being {@link #setExecutorService(ExecutorService) set}.
343         * <p>
344         * This method creates a {@link Executors#newSingleThreadExecutor() single-threaded executor}.
345         * </p>
346         * 
347         * @return the executor service
348         */
349        protected ExecutorService createDefaultExecutorService() {
350            return Executors.newSingleThreadExecutor();
351        }
352    
353        protected void startService() {
354            if (this.getExecutionContext() == null) {
355                throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text());
356            }
357            if (this.executorService == null) {
358                this.executorService = createDefaultExecutorService();
359            }
360            assert this.executorService != null;
361            assert this.sequencerSelector != null;
362            assert this.nodeFilter != null;
363            assert this.sequencerLibrary != null;
364        }
365    
366        protected void shutdownService() {
367            if (this.executorService != null) {
368                this.executorService.shutdown();
369            }
370        }
371    
372        protected boolean isServiceTerminated() {
373            if (this.executorService != null) {
374                return this.executorService.isTerminated();
375            }
376            return true;
377        }
378    
379        protected boolean doAwaitTermination( long timeout,
380                                              TimeUnit unit ) throws InterruptedException {
381            if (this.executorService == null || this.executorService.isTerminated()) return true;
382            return this.executorService.awaitTermination(timeout, unit);
383        }
384    
385        /**
386         * Get the sequencing selector used by this system.
387         * 
388         * @return the sequencing selector
389         */
390        public Selector getSequencerSelector() {
391            return this.sequencerSelector;
392        }
393    
394        /**
395         * Set the sequencer selector, or null if the {@link #DEFAULT_SEQUENCER_SELECTOR default sequencer selector} should be used.
396         * 
397         * @param sequencerSelector the selector
398         */
399        public void setSequencerSelector( Selector sequencerSelector ) {
400            this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR;
401        }
402    
403        /**
404         * Get the node filter used by this system.
405         * 
406         * @return the node filter
407         */
408        public NodeFilter getNodeFilter() {
409            return this.nodeFilter;
410        }
411    
412        /**
413         * Set the filter that checks which nodes are to be sequenced, or null if the {@link #DEFAULT_NODE_FILTER default node filter}
414         * should be used.
415         * 
416         * @param nodeFilter the new node filter
417         */
418        public void setNodeFilter( NodeFilter nodeFilter ) {
419            this.nodeFilter = nodeFilter != null ? nodeFilter : DEFAULT_NODE_FILTER;
420        }
421    
422        /**
423         * {@inheritDoc}
424         */
425        public void onNodeChanges( NodeChanges changes ) {
426            NodeFilter filter = this.getNodeFilter();
427            for (final NodeChange changedNode : changes) {
428                // Only care about new nodes or nodes that have new/changed properies ...
429                if (filter.accept(changedNode)) {
430                    try {
431                        this.executorService.execute(new Runnable() {
432    
433                            public void run() {
434                                processChangedNode(changedNode);
435                            }
436                        });
437                    } catch (RejectedExecutionException e) {
438                        // The executor service has been shut down, so do nothing with this set of changes
439                    }
440                }
441            }
442        }
443    
444        /**
445         * Do the work of processing by sequencing the node. This method is called by the {@link #executorService executor service}
446         * when it performs it's work on the enqueued {@link NodeChange NodeChange runnable objects}.
447         * 
448         * @param changedNode the node to be processed.
449         */
450        protected void processChangedNode( NodeChange changedNode ) {
451            final JcrExecutionContext context = this.getExecutionContext();
452            final Logger logger = context.getLogger(getClass());
453            assert logger != null;
454            try {
455                final String repositoryWorkspaceName = changedNode.getRepositoryWorkspaceName();
456                Session session = null;
457                try {
458                    // Figure out which sequencers accept this path,
459                    // and track which output nodes should be passed to each sequencer...
460                    final String nodePath = changedNode.getAbsolutePath();
461                    Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>();
462                    List<Sequencer> allSequencers = this.sequencerLibrary.getInstances();
463                    List<Sequencer> sequencers = new ArrayList<Sequencer>(allSequencers.size());
464                    for (Sequencer sequencer : allSequencers) {
465                        final SequencerConfig config = sequencer.getConfiguration();
466                        for (SequencerPathExpression pathExpression : config.getPathExpressions()) {
467                            for (String propertyName : changedNode.getModifiedProperties()) {
468                                String path = nodePath + "/@" + propertyName;
469                                SequencerPathExpression.Matcher matcher = pathExpression.matcher(path);
470                                if (matcher.matches()) {
471                                    // String selectedPath = matcher.getSelectedPath();
472                                    RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(),
473                                                                                             repositoryWorkspaceName);
474                                    SequencerCall call = new SequencerCall(sequencer, propertyName);
475                                    // Record the output path ...
476                                    Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call);
477                                    if (outputPaths == null) {
478                                        outputPaths = new HashSet<RepositoryNodePath>();
479                                        sequencerCalls.put(call, outputPaths);
480                                    }
481                                    outputPaths.add(outputPath);
482                                    sequencers.add(sequencer);
483                                    break;
484                                }
485                            }
486                        }
487                    }
488    
489                    Node node = null;
490                    if (!sequencers.isEmpty()) {
491                        // Create a session that we'll use for all sequencing ...
492                        session = context.getSessionFactory().createSession(repositoryWorkspaceName);
493    
494                        // Find the changed node ...
495                        String relPath = changedNode.getAbsolutePath().replaceAll("^/+", "");
496                        node = session.getRootNode().getNode(relPath);
497    
498                        // Figure out which sequencers should run ...
499                        sequencers = this.sequencerSelector.selectSequencers(sequencers, node, changedNode);
500                    }
501                    if (sequencers.isEmpty()) {
502                        this.statistics.recordNodeSkipped();
503                        if (logger.isDebugEnabled()) {
504                            logger.trace("Skipping '{0}': no sequencers matched this condition", changedNode);
505                        }
506                    } else {
507                        // Run each of those sequencers ...
508                        for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) {
509                            final SequencerCall sequencerCall = entry.getKey();
510                            final Set<RepositoryNodePath> outputPaths = entry.getValue();
511                            final Sequencer sequencer = sequencerCall.getSequencer();
512                            final String sequencerName = sequencer.getConfiguration().getName();
513                            final String propertyName = sequencerCall.getSequencedPropertyName();
514    
515                            // Get the paths to the nodes where the sequencer should write it's output ...
516                            assert outputPaths != null && outputPaths.size() != 0;
517    
518                            // Create a new execution context for each sequencer
519                            final SimpleProblems problems = new SimpleProblems();
520                            JcrExecutionContext sequencerContext = context.clone();
521                            try {
522                                sequencer.execute(node, propertyName, changedNode, outputPaths, sequencerContext, problems);
523                            } catch (RepositoryException e) {
524                                logger.error(e, RepositoryI18n.errorInRepositoryWhileSequencingNode, sequencerName, changedNode);
525                            } catch (SequencerException e) {
526                                logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, changedNode);
527                            } finally {
528                                try {
529                                    // Save the changes made by each sequencer ...
530                                    if (session != null) session.save();
531                                } finally {
532                                    // And always close the context.
533                                    // This closes all sessions that may have been created by the sequencer.
534                                    sequencerContext.close();
535                                }
536                            }
537                        }
538                        this.statistics.recordNodeSequenced();
539                    }
540                } finally {
541                    if (session != null) session.logout();
542                }
543            } catch (RepositoryException e) {
544                logger.error(e, RepositoryI18n.errorInRepositoryWhileFindingSequencersToRunAgainstNode, changedNode);
545            } catch (Throwable e) {
546                logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, changedNode);
547            }
548        }
549    
550        /**
551         * The statistics for the system. Each sequencing system has an instance of this class that is updated.
552         * 
553         * @author Randall Hauch
554         */
555        @ThreadSafe
556        public class Statistics {
557    
558            private final AtomicLong numberOfNodesSequenced = new AtomicLong(0);
559            private final AtomicLong numberOfNodesSkipped = new AtomicLong(0);
560            private final AtomicLong startTime;
561    
562            protected Statistics() {
563                startTime = new AtomicLong(System.currentTimeMillis());
564            }
565    
566            public Statistics reset() {
567                this.startTime.set(System.currentTimeMillis());
568                this.numberOfNodesSequenced.set(0);
569                this.numberOfNodesSkipped.set(0);
570                return this;
571            }
572    
573            /**
574             * @return the system time when the statistics were started
575             */
576            public long getStartTime() {
577                return this.startTime.get();
578            }
579    
580            /**
581             * @return the number of nodes that were sequenced
582             */
583            public long getNumberOfNodesSequenced() {
584                return this.numberOfNodesSequenced.get();
585            }
586    
587            /**
588             * @return the number of nodes that were skipped because no sequencers applied
589             */
590            public long getNumberOfNodesSkipped() {
591                return this.numberOfNodesSkipped.get();
592            }
593    
594            protected void recordNodeSequenced() {
595                this.numberOfNodesSequenced.incrementAndGet();
596            }
597    
598            protected void recordNodeSkipped() {
599                this.numberOfNodesSkipped.incrementAndGet();
600            }
601        }
602    
603        @Immutable
604        protected class SequencerCall {
605    
606            private final Sequencer sequencer;
607            private final String sequencerName;
608            private final String sequencedPropertyName;
609            private final int hc;
610    
611            protected SequencerCall( Sequencer sequencer,
612                                     String sequencedPropertyName ) {
613                this.sequencer = sequencer;
614                this.sequencerName = sequencer.getConfiguration().getName();
615                this.sequencedPropertyName = sequencedPropertyName;
616                this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName);
617            }
618    
619            /**
620             * @return sequencer
621             */
622            public Sequencer getSequencer() {
623                return this.sequencer;
624            }
625    
626            /**
627             * @return sequencedPropertyName
628             */
629            public String getSequencedPropertyName() {
630                return this.sequencedPropertyName;
631            }
632    
633            /**
634             * {@inheritDoc}
635             */
636            @Override
637            public int hashCode() {
638                return this.hc;
639            }
640    
641            /**
642             * {@inheritDoc}
643             */
644            @Override
645            public boolean equals( Object obj ) {
646                if (obj == this) return true;
647                if (obj instanceof SequencerCall) {
648                    SequencerCall that = (SequencerCall)obj;
649                    if (!this.sequencerName.equals(that.sequencerName)) return false;
650                    if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false;
651                    return true;
652                }
653                return false;
654            }
655        }
656    }