001    /*
002     * JBoss DNA (http://www.jboss.org/dna)
003     * See the COPYRIGHT.txt file distributed with this work for information
004     * regarding copyright ownership.  Some portions may be licensed
005     * to Red Hat, Inc. under one or more contributor license agreements.
006     * See the AUTHORS.txt file in the distribution for a full listing of 
007     * individual contributors. 
008     *
009     * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010     * is licensed to you under the terms of the GNU Lesser General Public License as
011     * published by the Free Software Foundation; either version 2.1 of
012     * the License, or (at your option) any later version.
013     *
014     * JBoss DNA is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017     * Lesser General Public License for more details.
018     *
019     * You should have received a copy of the GNU Lesser General Public
020     * License along with this software; if not, write to the Free
021     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023     */
024    package org.jboss.dna.repository.sequencer;
025    
026    import java.util.ArrayList;
027    import java.util.HashMap;
028    import java.util.HashSet;
029    import java.util.List;
030    import java.util.Map;
031    import java.util.Set;
032    import java.util.concurrent.ExecutorService;
033    import java.util.concurrent.Executors;
034    import java.util.concurrent.RejectedExecutionException;
035    import java.util.concurrent.TimeUnit;
036    import java.util.concurrent.atomic.AtomicLong;
037    import net.jcip.annotations.Immutable;
038    import net.jcip.annotations.ThreadSafe;
039    import org.jboss.dna.common.collection.SimpleProblems;
040    import org.jboss.dna.common.component.ClassLoaderFactory;
041    import org.jboss.dna.common.component.ComponentLibrary;
042    import org.jboss.dna.common.component.StandardClassLoaderFactory;
043    import org.jboss.dna.common.util.CheckArg;
044    import org.jboss.dna.common.util.HashCode;
045    import org.jboss.dna.common.util.Logger;
046    import org.jboss.dna.graph.ExecutionContext;
047    import org.jboss.dna.graph.Graph;
048    import org.jboss.dna.graph.Node;
049    import org.jboss.dna.graph.connector.RepositorySource;
050    import org.jboss.dna.graph.observe.ChangeObserver;
051    import org.jboss.dna.graph.observe.NetChangeObserver;
052    import org.jboss.dna.graph.observe.NetChangeObserver.NetChange;
053    import org.jboss.dna.graph.property.Name;
054    import org.jboss.dna.graph.property.Path;
055    import org.jboss.dna.graph.property.Property;
056    import org.jboss.dna.repository.RepositoryI18n;
057    import org.jboss.dna.repository.RepositoryLibrary;
058    import org.jboss.dna.repository.service.AbstractServiceAdministrator;
059    import org.jboss.dna.repository.service.AdministeredService;
060    import org.jboss.dna.repository.service.ServiceAdministrator;
061    import org.jboss.dna.repository.util.RepositoryNodePath;
062    
063    /**
064     * A sequencing system is used to monitor changes in the content of DNA repositories and to sequence the content to extract or to
065     * generate structured information.
066     * 
067     * @author Randall Hauch
068     * @author John Verhaeg
069     */
070    public class SequencingService implements AdministeredService {
071    
072        /**
073         * Interface used to select the set of {@link Sequencer} instances that should be run.
074         * 
075         * @author Randall Hauch
076         */
077        public static interface Selector {
078    
079            /**
080             * Select the sequencers that should be used to sequence the supplied node.
081             * 
082             * @param sequencers the list of all sequencers available at the moment; never null
083             * @param node the node to be sequenced; never null
084             * @param nodeChange the set of node changes; never null
085             * @return the list of sequencers that should be used; may not be null
086             */
087            List<Sequencer> selectSequencers( List<Sequencer> sequencers,
088                                              Node node,
089                                              NetChange nodeChange );
090        }
091    
092        /**
093         * The default {@link Selector} implementation that selects every sequencer every time it's called, regardless of the node (or
094         * logger) supplied.
095         * 
096         * @author Randall Hauch
097         */
098        protected static class DefaultSelector implements Selector {
099    
100            public List<Sequencer> selectSequencers( List<Sequencer> sequencers,
101                                                     Node node,
102                                                     NetChange nodeChange ) {
103                return sequencers;
104            }
105        }
106    
107        /**
108         * The default {@link Selector} that considers every {@link Sequencer} to be used for every node.
109         * 
110         * @see SequencingService#setSequencerSelector(org.jboss.dna.repository.sequencer.SequencingService.Selector)
111         */
112        public static final Selector DEFAULT_SEQUENCER_SELECTOR = new DefaultSelector();
113    
114        /**
115         * Class loader factory instance that always returns the {@link Thread#getContextClassLoader() current thread's context class
116         * loader} (if not null) or component library's class loader.
117         */
118        protected static final ClassLoaderFactory DEFAULT_CLASSLOADER_FACTORY = new StandardClassLoaderFactory(
119                                                                                                               SequencingService.class.getClassLoader());
120    
121        /**
122         * The administrative component for this service.
123         * 
124         * @author Randall Hauch
125         */
126        protected class Administrator extends AbstractServiceAdministrator {
127    
128            protected Administrator() {
129                super(RepositoryI18n.sequencingServiceName, State.PAUSED);
130            }
131    
132            /**
133             * {@inheritDoc}
134             */
135            @Override
136            protected void doStart( State fromState ) {
137                super.doStart(fromState);
138                startService();
139            }
140    
141            /**
142             * {@inheritDoc}
143             */
144            @Override
145            protected void doShutdown( State fromState ) {
146                super.doShutdown(fromState);
147                shutdownService();
148            }
149    
150            /**
151             * {@inheritDoc}
152             */
153            @Override
154            protected boolean doCheckIsTerminated() {
155                return isServiceTerminated();
156            }
157    
158            /**
159             * {@inheritDoc}
160             */
161            public boolean awaitTermination( long timeout,
162                                             TimeUnit unit ) throws InterruptedException {
163                return doAwaitTermination(timeout, unit);
164            }
165    
166        }
167    
168        private ExecutionContext executionContext;
169        private SequencerLibrary sequencerLibrary = new SequencerLibrary();
170        private Selector sequencerSelector = DEFAULT_SEQUENCER_SELECTOR;
171        private ExecutorService executorService;
172        private RepositoryLibrary repositoryLibrary;
173        private ChangeObserver repositoryObserver;
174        private final Statistics statistics = new Statistics();
175        private final Administrator administrator = new Administrator();
176    
177        /**
178         * Create a new sequencing system, configured with no sequencers and not monitoring any workspaces. Upon construction, the
179         * system is {@link ServiceAdministrator#isPaused() paused} and must be configured and then
180         * {@link ServiceAdministrator#start() started}.
181         */
182        public SequencingService() {
183            this.sequencerLibrary.setClassLoaderFactory(DEFAULT_CLASSLOADER_FACTORY);
184        }
185    
186        /**
187         * Return the administrative component for this service.
188         * 
189         * @return the administrative component; never null
190         */
191        public ServiceAdministrator getAdministrator() {
192            return this.administrator;
193        }
194    
195        /**
196         * Get the statistics for this system.
197         * 
198         * @return statistics
199         */
200        public Statistics getStatistics() {
201            return this.statistics;
202        }
203    
204        /**
205         * @return sequencerLibrary
206         */
207        protected ComponentLibrary<Sequencer, SequencerConfig> getSequencerLibrary() {
208            return this.sequencerLibrary;
209        }
210    
211        /**
212         * Add the configuration for a sequencer, or update any existing one that represents the
213         * {@link SequencerConfig#equals(Object) same configuration}
214         * 
215         * @param config the new configuration
216         * @return true if the sequencer was added, or false if there already was an existing and
217         *         {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
218         * @throws IllegalArgumentException if <code>config</code> is null
219         * @see #updateSequencer(SequencerConfig)
220         * @see #removeSequencer(SequencerConfig)
221         */
222        public boolean addSequencer( SequencerConfig config ) {
223            return this.sequencerLibrary.add(config);
224        }
225    
226        /**
227         * Update the configuration for a sequencer, or add it if there is no {@link SequencerConfig#equals(Object) matching
228         * configuration}.
229         * 
230         * @param config the updated (or new) configuration
231         * @return true if the sequencer was updated, or false if there already was an existing and
232         *         {@link SequencerConfig#hasChanged(SequencerConfig) unchanged} sequencer configuration
233         * @throws IllegalArgumentException if <code>config</code> is null
234         * @see #addSequencer(SequencerConfig)
235         * @see #removeSequencer(SequencerConfig)
236         */
237        public boolean updateSequencer( SequencerConfig config ) {
238            return this.sequencerLibrary.update(config);
239        }
240    
241        /**
242         * Remove the configuration for a sequencer.
243         * 
244         * @param config the configuration to be removed
245         * @return true if the sequencer was removed, or false if there was no existing sequencer
246         * @throws IllegalArgumentException if <code>config</code> is null
247         * @see #addSequencer(SequencerConfig)
248         * @see #updateSequencer(SequencerConfig)
249         */
250        public boolean removeSequencer( SequencerConfig config ) {
251            return this.sequencerLibrary.remove(config);
252        }
253    
254        /**
255         * @return executionContext
256         */
257        public ExecutionContext getExecutionContext() {
258            return this.executionContext;
259        }
260    
261        /**
262         * @param executionContext Sets executionContext to the specified value.
263         */
264        public void setExecutionContext( ExecutionContext executionContext ) {
265            CheckArg.isNotNull(executionContext, "execution context");
266            if (this.getAdministrator().isStarted()) {
267                throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
268            }
269            this.executionContext = executionContext;
270            this.sequencerLibrary.setClassLoaderFactory(executionContext);
271        }
272    
273        /**
274         * Get the repository library to be used for repository lookup
275         * 
276         * @return the repository library
277         */
278        public RepositoryLibrary getRepositoryLibrary() {
279            return this.repositoryLibrary;
280        }
281    
282        public void setRepositoryLibrary( RepositoryLibrary repositoryLibrary ) {
283            this.repositoryLibrary = repositoryLibrary;
284        }
285    
286        /**
287         * Get the executor service used to run the sequencers.
288         * 
289         * @return the executor service
290         * @see #setExecutorService(ExecutorService)
291         */
292        public ExecutorService getExecutorService() {
293            return this.executorService;
294        }
295    
296        /**
297         * Set the executor service that should be used by this system. By default, the system is set up with a
298         * {@link Executors#newSingleThreadExecutor() executor that uses a single thread}.
299         * 
300         * @param executorService the executor service
301         * @see #getExecutorService()
302         * @see Executors#newCachedThreadPool()
303         * @see Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)
304         * @see Executors#newFixedThreadPool(int)
305         * @see Executors#newFixedThreadPool(int, java.util.concurrent.ThreadFactory)
306         * @see Executors#newScheduledThreadPool(int)
307         * @see Executors#newScheduledThreadPool(int, java.util.concurrent.ThreadFactory)
308         * @see Executors#newSingleThreadExecutor()
309         * @see Executors#newSingleThreadExecutor(java.util.concurrent.ThreadFactory)
310         * @see Executors#newSingleThreadScheduledExecutor()
311         * @see Executors#newSingleThreadScheduledExecutor(java.util.concurrent.ThreadFactory)
312         */
313        public void setExecutorService( ExecutorService executorService ) {
314            CheckArg.isNotNull(executorService, "executor service");
315            if (this.getAdministrator().isStarted()) {
316                throw new IllegalStateException(RepositoryI18n.unableToChangeExecutionContextWhileRunning.text());
317            }
318            this.executorService = executorService;
319        }
320    
321        /**
322         * Override this method to creates a different kind of default executor service. This method is called when the system is
323         * {@link #startService() started} without an executor service being {@link #setExecutorService(ExecutorService) set}.
324         * <p>
325         * This method creates a {@link Executors#newSingleThreadExecutor() single-threaded executor}.
326         * </p>
327         * 
328         * @return the executor service
329         */
330        protected ExecutorService createDefaultExecutorService() {
331            return Executors.newSingleThreadExecutor();
332        }
333    
334        protected void startService() {
335            if (this.getExecutionContext() == null) {
336                throw new IllegalStateException(RepositoryI18n.unableToStartSequencingServiceWithoutExecutionContext.text());
337            }
338            if (this.executorService == null) {
339                this.executorService = createDefaultExecutorService();
340            }
341            assert this.executorService != null;
342            assert this.sequencerSelector != null;
343            assert this.sequencerLibrary != null;
344            assert this.repositoryLibrary != null;
345            this.repositoryObserver = new RepositoryObserver();
346            // Register the observer ...
347            this.repositoryLibrary.register(this.repositoryObserver);
348        }
349    
350        protected void shutdownService() {
351            // Unregister our observer ...
352            if (this.repositoryObserver != null) this.repositoryObserver.unregister();
353            // And shut down the executor service ..
354            if (this.executorService != null) {
355                this.executorService.shutdown();
356            }
357        }
358    
359        protected boolean isServiceTerminated() {
360            if (this.executorService != null) {
361                return this.executorService.isTerminated();
362            }
363            return true;
364        }
365    
366        protected boolean doAwaitTermination( long timeout,
367                                              TimeUnit unit ) throws InterruptedException {
368            if (this.executorService == null || this.executorService.isTerminated()) return true;
369            return this.executorService.awaitTermination(timeout, unit);
370        }
371    
372        /**
373         * Get the sequencing selector used by this system.
374         * 
375         * @return the sequencing selector
376         */
377        public Selector getSequencerSelector() {
378            return this.sequencerSelector;
379        }
380    
381        /**
382         * Set the sequencer selector, or null if the {@link #DEFAULT_SEQUENCER_SELECTOR default sequencer selector} should be used.
383         * 
384         * @param sequencerSelector the selector
385         */
386        public void setSequencerSelector( Selector sequencerSelector ) {
387            this.sequencerSelector = sequencerSelector != null ? sequencerSelector : DEFAULT_SEQUENCER_SELECTOR;
388        }
389    
390        /**
391         * Do the work of processing by sequencing the node. This method is called by the {@link #executorService executor service}
392         * when it performs it's work on the enqueued {@link NetChange NetChange runnable objects}.
393         * 
394         * @param change the change describing the node to be processed.
395         */
396        protected void processChange( NetChange change ) {
397            final ExecutionContext context = this.getExecutionContext();
398            final Logger logger = context.getLogger(getClass());
399            assert logger != null;
400    
401            try {
402                final String repositorySourceName = change.getRepositorySourceName();
403                final String repositoryWorkspaceName = change.getRepositoryWorkspaceName();
404    
405                // Figure out which sequencers accept this path,
406                // and track which output nodes should be passed to each sequencer...
407                final Path nodePath = change.getPath();
408                final String nodePathStr = context.getValueFactories().getStringFactory().create(nodePath);
409                Map<SequencerCall, Set<RepositoryNodePath>> sequencerCalls = new HashMap<SequencerCall, Set<RepositoryNodePath>>();
410                List<Sequencer> allSequencers = this.sequencerLibrary.getInstances();
411                List<Sequencer> sequencers = new ArrayList<Sequencer>(allSequencers.size());
412                for (Sequencer sequencer : allSequencers) {
413                    final SequencerConfig config = sequencer.getConfiguration();
414                    for (SequencerPathExpression pathExpression : config.getPathExpressions()) {
415                        for (Property property : change.getModifiedProperties()) {
416                            Name propertyName = property.getName();
417                            String propertyNameStr = context.getValueFactories().getStringFactory().create(propertyName);
418                            String path = nodePathStr + "/@" + propertyNameStr;
419                            SequencerPathExpression.Matcher matcher = pathExpression.matcher(path);
420                            if (matcher.matches()) {
421                                // String selectedPath = matcher.getSelectedPath();
422                                RepositoryNodePath outputPath = RepositoryNodePath.parse(matcher.getOutputPath(),
423                                                                                         repositorySourceName,
424                                                                                         repositoryWorkspaceName);
425                                SequencerCall call = new SequencerCall(sequencer, propertyNameStr);
426                                // Record the output path ...
427                                Set<RepositoryNodePath> outputPaths = sequencerCalls.get(call);
428                                if (outputPaths == null) {
429                                    outputPaths = new HashSet<RepositoryNodePath>();
430                                    sequencerCalls.put(call, outputPaths);
431                                }
432                                outputPaths.add(outputPath);
433                                sequencers.add(sequencer);
434                                break;
435                            }
436                        }
437                    }
438                }
439    
440                RepositorySource source = repositoryLibrary.getSource(repositorySourceName);
441                Graph graph = Graph.create(source, context);
442                Node node = null;
443                if (!sequencers.isEmpty()) {
444    
445                    // Find the changed node ...
446                    node = graph.getNodeAt(nodePath);
447    
448                    // Figure out which sequencers should run ...
449                    sequencers = this.sequencerSelector.selectSequencers(sequencers, node, change);
450                }
451                if (sequencers.isEmpty()) {
452                    this.statistics.recordNodeSkipped();
453                    if (logger.isDebugEnabled()) {
454                        logger.trace("Skipping '{0}': no sequencers matched this condition", change);
455                    }
456                } else {
457                    // Run each of those sequencers ...
458                    for (Map.Entry<SequencerCall, Set<RepositoryNodePath>> entry : sequencerCalls.entrySet()) {
459    
460                        final SequencerCall sequencerCall = entry.getKey();
461                        final Set<RepositoryNodePath> outputPaths = entry.getValue();
462                        final Sequencer sequencer = sequencerCall.getSequencer();
463                        final String sequencerName = sequencer.getConfiguration().getName();
464                        final String propertyName = sequencerCall.getSequencedPropertyName();
465    
466                        // Get the paths to the nodes where the sequencer should write it's output ...
467                        assert outputPaths != null && outputPaths.size() != 0;
468    
469                        // Create a new execution context for each sequencer
470                        final SimpleProblems problems = new SimpleProblems();
471                        SequencerContext sequencerContext = new SequencerContext(context, graph);
472                        try {
473                            sequencer.execute(node, propertyName, change, outputPaths, sequencerContext, problems);
474                            sequencerContext.getDestination().submit();
475                        } catch (SequencerException e) {
476                            logger.error(e, RepositoryI18n.errorWhileSequencingNode, sequencerName, change);
477                        }
478                    }
479                    this.statistics.recordNodeSequenced();
480                }
481            } catch (Throwable e) {
482                logger.error(e, RepositoryI18n.errorFindingSequencersToRunAgainstNode, change);
483            }
484        }
485    
486        /**
487         * The statistics for the system. Each sequencing system has an instance of this class that is updated.
488         * 
489         * @author Randall Hauch
490         */
491        @ThreadSafe
492        public class Statistics {
493    
494            private final AtomicLong numberOfNodesSequenced = new AtomicLong(0);
495            private final AtomicLong numberOfNodesSkipped = new AtomicLong(0);
496            private final AtomicLong startTime;
497    
498            protected Statistics() {
499                startTime = new AtomicLong(System.currentTimeMillis());
500            }
501    
502            public Statistics reset() {
503                this.startTime.set(System.currentTimeMillis());
504                this.numberOfNodesSequenced.set(0);
505                this.numberOfNodesSkipped.set(0);
506                return this;
507            }
508    
509            /**
510             * @return the system time when the statistics were started
511             */
512            public long getStartTime() {
513                return this.startTime.get();
514            }
515    
516            /**
517             * @return the number of nodes that were sequenced
518             */
519            public long getNumberOfNodesSequenced() {
520                return this.numberOfNodesSequenced.get();
521            }
522    
523            /**
524             * @return the number of nodes that were skipped because no sequencers applied
525             */
526            public long getNumberOfNodesSkipped() {
527                return this.numberOfNodesSkipped.get();
528            }
529    
530            protected void recordNodeSequenced() {
531                this.numberOfNodesSequenced.incrementAndGet();
532            }
533    
534            protected void recordNodeSkipped() {
535                this.numberOfNodesSkipped.incrementAndGet();
536            }
537        }
538    
539        @Immutable
540        protected class SequencerCall {
541    
542            private final Sequencer sequencer;
543            private final String sequencerName;
544            private final String sequencedPropertyName;
545            private final int hc;
546    
547            protected SequencerCall( Sequencer sequencer,
548                                     String sequencedPropertyName ) {
549                this.sequencer = sequencer;
550                this.sequencerName = sequencer.getConfiguration().getName();
551                this.sequencedPropertyName = sequencedPropertyName;
552                this.hc = HashCode.compute(this.sequencerName, this.sequencedPropertyName);
553            }
554    
555            /**
556             * @return sequencer
557             */
558            public Sequencer getSequencer() {
559                return this.sequencer;
560            }
561    
562            /**
563             * @return sequencedPropertyName
564             */
565            public String getSequencedPropertyName() {
566                return this.sequencedPropertyName;
567            }
568    
569            /**
570             * {@inheritDoc}
571             */
572            @Override
573            public int hashCode() {
574                return this.hc;
575            }
576    
577            /**
578             * {@inheritDoc}
579             */
580            @Override
581            public boolean equals( Object obj ) {
582                if (obj == this) return true;
583                if (obj instanceof SequencerCall) {
584                    SequencerCall that = (SequencerCall)obj;
585                    if (!this.sequencerName.equals(that.sequencerName)) return false;
586                    if (!this.sequencedPropertyName.equals(that.sequencedPropertyName)) return false;
587                    return true;
588                }
589                return false;
590            }
591        }
592    
593        protected class RepositoryObserver extends NetChangeObserver {
594            protected RepositoryObserver() {
595            }
596    
597            /**
598             * {@inheritDoc}
599             * 
600             * @see org.jboss.dna.graph.observe.NetChangeObserver#notify(org.jboss.dna.graph.observe.NetChangeObserver.NetChange)
601             */
602            @Override
603            protected void notify( final NetChange change ) {
604                // Only care about new nodes or nodes that have new/changed properies ...
605                if (change.includes(ChangeType.NODE_ADDED, ChangeType.PROPERTY_ADDED, ChangeType.PROPERTY_CHANGED)) {
606                    try {
607                        getExecutorService().execute(new Runnable() {
608    
609                            public void run() {
610                                processChange(change);
611                            }
612                        });
613                    } catch (RejectedExecutionException e) {
614                        // The executor service has been shut down, so do nothing with this set of changes
615                    }
616                }
617            }
618        }
619    }