001    /*
002     * JBoss, Home of Professional Open Source.
003     * Copyright 2008, Red Hat Middleware LLC, and individual contributors
004     * as indicated by the @author tags. See the copyright.txt file in the
005     * distribution for a full listing of individual contributors. 
006     *
007     * This is free software; you can redistribute it and/or modify it
008     * under the terms of the GNU Lesser General Public License as
009     * published by the Free Software Foundation; either version 2.1 of
010     * the License, or (at your option) any later version.
011     *
012     * This software is distributed in the hope that it will be useful,
013     * but WITHOUT ANY WARRANTY; without even the implied warranty of
014     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015     * Lesser General Public License for more details.
016     *
017     * You should have received a copy of the GNU Lesser General Public
018     * License along with this software; if not, write to the Free
019     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021     */
022    package org.jboss.dna.repository;
023    
024    import java.util.Collection;
025    import java.util.Collections;
026    import java.util.HashSet;
027    import java.util.LinkedList;
028    import java.util.List;
029    import java.util.Set;
030    import java.util.concurrent.CopyOnWriteArrayList;
031    import java.util.concurrent.TimeUnit;
032    import java.util.concurrent.locks.ReadWriteLock;
033    import java.util.concurrent.locks.ReentrantReadWriteLock;
034    import net.jcip.annotations.ThreadSafe;
035    import org.jboss.dna.common.util.CheckArg;
036    import org.jboss.dna.graph.BasicExecutionContextFactory;
037    import org.jboss.dna.graph.ExecutionContext;
038    import org.jboss.dna.graph.ExecutionContextFactory;
039    import org.jboss.dna.graph.connectors.RepositoryConnection;
040    import org.jboss.dna.graph.connectors.RepositoryConnectionFactory;
041    import org.jboss.dna.graph.connectors.RepositoryConnectionPool;
042    import org.jboss.dna.graph.connectors.RepositoryContext;
043    import org.jboss.dna.graph.connectors.RepositorySource;
044    import org.jboss.dna.repository.services.AbstractServiceAdministrator;
045    import org.jboss.dna.repository.services.ServiceAdministrator;
046    
047    /**
048     * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for
049     * each.
050     * 
051     * @author Randall Hauch
052     */
053    @ThreadSafe
054    public class RepositoryLibrary implements RepositoryConnectionFactory {
055    
056        /**
057         * The administrative component for this service.
058         * 
059         * @author Randall Hauch
060         */
061        protected class Administrator extends AbstractServiceAdministrator {
062    
063            protected Administrator() {
064                super(RepositoryI18n.federationServiceName, State.STARTED);
065            }
066    
067            /**
068             * {@inheritDoc}
069             */
070            @Override
071            protected void doStart( State fromState ) {
072                super.doStart(fromState);
073                RepositoryLibrary.this.start();
074            }
075    
076            /**
077             * {@inheritDoc}
078             */
079            @Override
080            protected void doShutdown( State fromState ) {
081                super.doShutdown(fromState);
082                RepositoryLibrary.this.shutdown();
083            }
084    
085            /**
086             * {@inheritDoc}
087             */
088            public boolean awaitTermination( long timeout,
089                                             TimeUnit unit ) throws InterruptedException {
090                return RepositoryLibrary.this.awaitTermination(timeout, unit);
091            }
092    
093            /**
094             * {@inheritDoc}
095             */
096            @Override
097            protected boolean doCheckIsTerminated() {
098                return RepositoryLibrary.this.isTerminated();
099            }
100    
101        }
102    
103        private final ServiceAdministrator administrator = new Administrator();
104        private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock();
105        private final CopyOnWriteArrayList<RepositoryConnectionPool> pools = new CopyOnWriteArrayList<RepositoryConnectionPool>();
106        private RepositoryConnectionFactory delegate;
107        private final ExecutionContextFactory executionContextFactory;
108        private final RepositoryContext repositoryContext;
109    
110        /**
111         * Create a new manager instance.
112         */
113        public RepositoryLibrary() {
114            this(new BasicExecutionContextFactory(), null);
115        }
116    
117        /**
118         * Create a new manager instance.
119         * 
120         * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in
121         *        this manager; may be null if there is no delegate
122         */
123        public RepositoryLibrary( RepositoryConnectionFactory delegate ) {
124            this(new BasicExecutionContextFactory(), delegate);
125        }
126    
127        /**
128         * Create a new manager instance.
129         * 
130         * @param executionContextFactory the execution context factory, used by sources to create {@link ExecutionContext} instances
131         * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null
132         */
133        public RepositoryLibrary( ExecutionContextFactory executionContextFactory ) {
134            this(executionContextFactory, null);
135        }
136    
137        /**
138         * Create a new manager instance.
139         * 
140         * @param executionContextFactory the execution context factory, used by sources to create {@link ExecutionContext} instances
141         * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in
142         *        this manager; may be null if there is no delegate
143         * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null
144         */
145        public RepositoryLibrary( ExecutionContextFactory executionContextFactory,
146                                  RepositoryConnectionFactory delegate ) {
147            CheckArg.isNotNull(executionContextFactory, "executionContextFactory");
148            this.delegate = delegate;
149            this.executionContextFactory = executionContextFactory;
150            this.repositoryContext = new RepositoryContext() {
151                /**
152                 * {@inheritDoc}
153                 * 
154                 * @see org.jboss.dna.graph.connectors.RepositoryContext#getExecutionContextFactory()
155                 */
156                public ExecutionContextFactory getExecutionContextFactory() {
157                    return RepositoryLibrary.this.getExecutionContextFactory();
158                }
159    
160                /**
161                 * {@inheritDoc}
162                 * 
163                 * @see org.jboss.dna.graph.connectors.RepositoryContext#getRepositoryConnectionFactory()
164                 */
165                public RepositoryConnectionFactory getRepositoryConnectionFactory() {
166                    return RepositoryLibrary.this;
167                }
168            };
169        }
170    
171        /**
172         * @return executionContextFactory
173         */
174        public ExecutionContextFactory getExecutionContextFactory() {
175            return executionContextFactory;
176        }
177    
178        /**
179         * Get the delegate connection factory.
180         * 
181         * @return the connection factory to which this instance should delegate in the event that a source is not found in this
182         *         manager, or null if there is no delegate
183         */
184        public RepositoryConnectionFactory getDelegate() {
185            return delegate;
186        }
187    
188        /**
189         * Set the delegate connection factory.
190         * 
191         * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in
192         *        this manager; may be null if there is no delegate
193         */
194        public void setDelegate( RepositoryConnectionFactory delegate ) {
195            this.delegate = delegate;
196        }
197    
198        /**
199         * @return administrator
200         */
201        public ServiceAdministrator getAdministrator() {
202            return this.administrator;
203        }
204    
205        /**
206         * Utility method called by the administrator.
207         */
208        protected void start() {
209            // Do not establish connections to the pools; these will be established as needed
210    
211        }
212    
213        /**
214         * Utility method called by the administrator.
215         */
216        protected void shutdown() {
217            // Close all connections to the pools. This is done inside the pools write lock.
218            try {
219                this.sourcesLock.readLock().lock();
220                for (RepositoryConnectionPool pool : this.pools) {
221                    pool.shutdown();
222                }
223            } finally {
224                this.sourcesLock.readLock().unlock();
225            }
226        }
227    
228        /**
229         * Utility method called by the administrator.
230         * 
231         * @param timeout
232         * @param unit
233         * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout
234         *         occurred before all the connections were closed
235         * @throws InterruptedException
236         */
237        protected boolean awaitTermination( long timeout,
238                                            TimeUnit unit ) throws InterruptedException {
239            // Check whether all source pools are shut down. This is done inside the pools write lock.
240            try {
241                this.sourcesLock.readLock().lock();
242                for (RepositoryConnectionPool pool : this.pools) {
243                    if (!pool.awaitTermination(timeout, unit)) return false;
244                }
245                return true;
246            } finally {
247                this.sourcesLock.readLock().unlock();
248            }
249        }
250    
251        /**
252         * Returns true if this federated repository is in the process of terminating after {@link ServiceAdministrator#shutdown()}
253         * has been called on the {@link #getAdministrator() administrator}, but the federated repository has connections that have
254         * not yet normally been {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of
255         * <tt>true</tt> reported a sufficient period after shutdown may indicate that connection users have ignored or suppressed
256         * interruption, causing this repository not to properly terminate.
257         * 
258         * @return true if terminating but not yet terminated, or false otherwise
259         * @see #isTerminated()
260         */
261        public boolean isTerminating() {
262            try {
263                this.sourcesLock.readLock().lock();
264                for (RepositoryConnectionPool pool : this.pools) {
265                    if (pool.isTerminating()) return true;
266                }
267                return false;
268            } finally {
269                this.sourcesLock.readLock().unlock();
270            }
271        }
272    
273        /**
274         * Return true if this federated repository has completed its termination and no longer has any open connections.
275         * 
276         * @return true if terminated, or false otherwise
277         * @see #isTerminating()
278         */
279        public boolean isTerminated() {
280            try {
281                this.sourcesLock.readLock().lock();
282                for (RepositoryConnectionPool pool : this.pools) {
283                    if (!pool.isTerminated()) return false;
284                }
285                return true;
286            } finally {
287                this.sourcesLock.readLock().unlock();
288            }
289        }
290    
291        /**
292         * Get an unmodifiable collection of {@link RepositorySource} names.
293         * 
294         * @return the pools
295         */
296        public Collection<String> getSourceNames() {
297            Set<String> sourceNames = new HashSet<String>();
298            for (RepositoryConnectionPool pool : this.pools) {
299                sourceNames.add(pool.getRepositorySource().getName());
300            }
301            return Collections.unmodifiableCollection(sourceNames);
302        }
303    
304        /**
305         * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance.
306         * 
307         * @return the pools
308         */
309        public Collection<RepositorySource> getSources() {
310            List<RepositorySource> sources = new LinkedList<RepositorySource>();
311            for (RepositoryConnectionPool pool : this.pools) {
312                sources.add(pool.getRepositorySource());
313            }
314            return Collections.unmodifiableCollection(sources);
315        }
316    
317        /**
318         * Get the RepositorySource with the specified name managed by this instance.
319         * 
320         * @param sourceName the name of the source
321         * @return the source, or null if no such source exists in this instance
322         */
323        public RepositorySource getSource( String sourceName ) {
324            try {
325                this.sourcesLock.readLock().lock();
326                for (RepositoryConnectionPool existingPool : this.pools) {
327                    RepositorySource source = existingPool.getRepositorySource();
328                    if (source.getName().equals(sourceName)) return source;
329                }
330            } finally {
331                this.sourcesLock.readLock().unlock();
332            }
333            return null;
334        }
335    
336        /**
337         * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance.
338         * 
339         * @param sourceName the name of the source
340         * @return the pool, or null if no such pool exists in this instance
341         */
342        public RepositoryConnectionPool getConnectionPool( String sourceName ) {
343            try {
344                this.sourcesLock.readLock().lock();
345                for (RepositoryConnectionPool existingPool : this.pools) {
346                    RepositorySource source = existingPool.getRepositorySource();
347                    if (source.getName().equals(sourceName)) return existingPool;
348                }
349            } finally {
350                this.sourcesLock.readLock().unlock();
351            }
352            return null;
353        }
354    
355        /**
356         * Add the supplied federated source. This method returns false if the source is null.
357         * 
358         * @param source the source to add
359         * @return true if the source is added, or false if the reference is null or if there is already an existing source with the
360         *         supplied name.
361         */
362        public boolean addSource( RepositorySource source ) {
363            if (source == null) return false;
364            try {
365                this.sourcesLock.writeLock().lock();
366                for (RepositoryConnectionPool existingPool : this.pools) {
367                    if (existingPool.getRepositorySource().getName().equals(source.getName())) return false;
368                }
369                source.initialize(repositoryContext);
370                RepositoryConnectionPool pool = new RepositoryConnectionPool(source);
371                this.pools.add(pool);
372                return true;
373            } finally {
374                this.sourcesLock.writeLock().unlock();
375            }
376        }
377    
378        /**
379         * Remove from this federated repository the supplied source (or a source with the same name as that supplied). This call
380         * shuts down the connections in the source in an orderly fashion, allowing those connection currently in use to be used and
381         * closed normally, but preventing further connections from being used.
382         * <p>
383         * This method can safely be called while the federation repository is in use.
384         * </p>
385         * 
386         * @param source the source to be removed
387         * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
388         *        should not wait at all
389         * @param unit the time unit to be used for <code>timeToAwait</code>
390         * @return true if the source was removed, or false if the source was not a source for this repository.
391         * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
392         */
393        public boolean removeSource( RepositorySource source,
394                                     long timeToAwait,
395                                     TimeUnit unit ) throws InterruptedException {
396            // Use the name; don't use the object equality ...
397            return removeSource(source.getName(), timeToAwait, unit) != null;
398        }
399    
400        /**
401         * Remove from this federated repository the source with the supplied name. This call shuts down the connections in the source
402         * in an orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further
403         * connections from being used.
404         * 
405         * @param name the name of the source to be removed
406         * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call
407         *        should not wait at all
408         * @param unit the time unit to be used for <code>timeToAwait</code>
409         * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could
410         *         be found
411         * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections
412         */
413        public RepositorySource removeSource( String name,
414                                              long timeToAwait,
415                                              TimeUnit unit ) throws InterruptedException {
416            try {
417                this.sourcesLock.writeLock().lock();
418                for (RepositoryConnectionPool existingPool : this.pools) {
419                    if (existingPool.getRepositorySource().getName().equals(name)) {
420                        // Shut down the source ...
421                        existingPool.shutdown();
422                        if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit);
423                    }
424                    return existingPool.getRepositorySource();
425                }
426            } finally {
427                this.sourcesLock.writeLock().unlock();
428            }
429            return null;
430        }
431    
432        /**
433         * {@inheritDoc}
434         * 
435         * @see org.jboss.dna.graph.connectors.RepositoryConnectionFactory#createConnection(java.lang.String)
436         */
437        public RepositoryConnection createConnection( String sourceName ) {
438            try {
439                this.sourcesLock.readLock().lock();
440                for (RepositoryConnectionPool existingPool : this.pools) {
441                    RepositorySource source = existingPool.getRepositorySource();
442                    if (source.getName().equals(sourceName)) return existingPool.getConnection();
443                }
444                RepositoryConnectionFactory delegate = this.delegate;
445                if (delegate != null) {
446                    return delegate.createConnection(sourceName);
447                }
448            } finally {
449                this.sourcesLock.readLock().unlock();
450            }
451            return null;
452        }
453    }