001    /*
002     * JBoss DNA (http://www.jboss.org/dna)
003     * See the COPYRIGHT.txt file distributed with this work for information
004     * regarding copyright ownership.  Some portions may be licensed
005     * to Red Hat, Inc. under one or more contributor license agreements.
006     * See the AUTHORS.txt file in the distribution for a full listing of 
007     * individual contributors. 
008     *
009     * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA
010     * is licensed to you under the terms of the GNU Lesser General Public License as
011     * published by the Free Software Foundation; either version 2.1 of
012     * the License, or (at your option) any later version.
013     *
014     * JBoss DNA is distributed in the hope that it will be useful,
015     * but WITHOUT ANY WARRANTY; without even the implied warranty of
016     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
017     * Lesser General Public License for more details.
018     *
019     * You should have received a copy of the GNU Lesser General Public
020     * License along with this software; if not, write to the Free
021     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
022     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
023     */
024    package org.jboss.dna.graph.connector;
025    
026    import java.util.Collection;
027    import java.util.HashSet;
028    import java.util.LinkedList;
029    import java.util.Set;
030    import java.util.concurrent.BlockingQueue;
031    import java.util.concurrent.LinkedBlockingQueue;
032    import java.util.concurrent.TimeUnit;
033    import java.util.concurrent.atomic.AtomicBoolean;
034    import java.util.concurrent.atomic.AtomicInteger;
035    import java.util.concurrent.atomic.AtomicLong;
036    import java.util.concurrent.locks.Condition;
037    import java.util.concurrent.locks.ReentrantLock;
038    import javax.transaction.xa.XAResource;
039    import net.jcip.annotations.GuardedBy;
040    import net.jcip.annotations.ThreadSafe;
041    import org.jboss.dna.common.util.CheckArg;
042    import org.jboss.dna.common.util.Logger;
043    import org.jboss.dna.graph.ExecutionContext;
044    import org.jboss.dna.graph.GraphI18n;
045    import org.jboss.dna.graph.cache.CachePolicy;
046    import org.jboss.dna.graph.request.Request;
047    
048    /**
049     * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations.
050     * 
051     * @author Randall Hauch
052     */
053    @ThreadSafe
054    public class RepositoryConnectionPool {
055    
056        /**
057         * The core pool size for default-constructed pools is {@value} .
058         */
059        public static final int DEFAULT_CORE_POOL_SIZE = 1;
060    
061        /**
062         * The maximum pool size for default-constructed pools is {@value} .
063         */
064        public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10;
065    
066        /**
067         * The keep-alive time for connections in default-constructed pools is {@value} seconds.
068         */
069        public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30;
070    
071        /**
072         * Permission for checking shutdown
073         */
074        private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread");
075    
076        /**
077         * The source that this pool uses to create new connections.
078         */
079        private final RepositorySource source;
080    
081        /**
082         * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set.
083         */
084        private final ReentrantLock mainLock = new ReentrantLock();
085    
086        /**
087         * Wait condition to support awaitTermination
088         */
089        private final Condition termination = mainLock.newCondition();
090    
091        /**
092         * Set containing all connections that are available for use.
093         */
094        @GuardedBy( "mainLock" )
095        private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>();
096    
097        /**
098         * The connections that are currently in use.
099         */
100        @GuardedBy( "mainLock" )
101        private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>();
102    
103        /**
104         * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than
105         * corePoolSize present. Otherwise they wait forever to be used.
106         */
107        private volatile long keepAliveTime;
108    
109        /**
110         * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during
111         * updates.
112         */
113        @GuardedBy( "mainLock" )
114        private volatile int corePoolSize;
115    
116        /**
117         * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
118         */
119        @GuardedBy( "mainLock" )
120        private volatile int maximumPoolSize;
121    
122        /**
123         * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
124         */
125        @GuardedBy( "mainLock" )
126        private volatile int poolSize;
127    
128        /**
129         * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates.
130         */
131        @GuardedBy( "mainLock" )
132        private volatile int runState;
133    
134        // Special values for runState
135        /** Normal, not-shutdown mode */
136        static final int RUNNING = 0;
137        /** Controlled shutdown mode */
138        static final int SHUTDOWN = 1;
139        /** Immediate shutdown mode */
140        static final int STOP = 2;
141        /** Final state */
142        static final int TERMINATED = 3;
143    
144        /**
145         * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method.
146         */
147        private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false);
148    
149        /**
150         * The time in nanoseconds that ping should wait before timing out and failing.
151         */
152        private final AtomicLong pingTimeout = new AtomicLong(0);
153    
154        /**
155         * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception.
156         */
157        private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10);
158    
159        private final AtomicLong totalConnectionsCreated = new AtomicLong(0);
160    
161        private final AtomicLong totalConnectionsUsed = new AtomicLong(0);
162    
163        private final Logger logger = Logger.getLogger(this.getClass());
164    
165        /**
166         * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor
167         * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool
168         * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}.
169         * 
170         * @param source the source for connections
171         * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
172         */
173        public RepositoryConnectionPool( RepositorySource source ) {
174            this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS);
175        }
176    
177        /**
178         * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}.
179         * 
180         * @param source the source for connections
181         * @param corePoolSize the number of connections to keep in the pool, even if they are idle.
182         * @param maximumPoolSize the maximum number of connections to allow in the pool.
183         * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle
184         *        connections will be kept before terminating.
185         * @param unit the time unit for the keepAliveTime argument.
186         * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid
187         */
188        public RepositoryConnectionPool( RepositorySource source,
189                                         int corePoolSize,
190                                         int maximumPoolSize,
191                                         long keepAliveTime,
192                                         TimeUnit unit ) {
193            CheckArg.isNonNegative(corePoolSize, "corePoolSize");
194            CheckArg.isPositive(maximumPoolSize, "maximumPoolSize");
195            CheckArg.isNonNegative(keepAliveTime, "keepAliveTime");
196            CheckArg.isNotNull(source, "source");
197            if (maximumPoolSize < corePoolSize) {
198                throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
199            }
200            this.source = source;
201            this.corePoolSize = corePoolSize;
202            this.maximumPoolSize = maximumPoolSize;
203            this.keepAliveTime = unit.toNanos(keepAliveTime);
204            this.setPingTimeout(100, TimeUnit.MILLISECONDS);
205        }
206    
207        /**
208         * Get the {@link RepositorySource} that's used by this pool.
209         * 
210         * @return the repository source; never null
211         */
212        public final RepositorySource getRepositorySource() {
213            return source;
214        }
215    
216        /**
217         * Get the name of this pool, which delegates to the connection factory.
218         * 
219         * @return the name of the source
220         */
221        protected String getSourceName() {
222            return source.getName();
223        }
224    
225        // -------------------------------------------------
226        // Property settings ...
227        // -------------------------------------------------
228    
229        /**
230         * @return validateConnectionBeforeUse
231         */
232        public boolean getValidateConnectionBeforeUse() {
233            return this.validateConnectionBeforeUse.get();
234        }
235    
236        /**
237         * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value.
238         */
239        public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) {
240            this.validateConnectionBeforeUse.set(validateConnectionBeforeUse);
241        }
242    
243        /**
244         * @return pingTimeout
245         */
246        public long getPingTimeoutInNanos() {
247            return this.pingTimeout.get();
248        }
249    
250        /**
251         * @param pingTimeout the time to wait for a ping to complete
252         * @param unit the time unit of the time argument
253         */
254        public void setPingTimeout( long pingTimeout,
255                                    TimeUnit unit ) {
256            CheckArg.isNonNegative(pingTimeout, "time");
257            this.pingTimeout.set(unit.toNanos(pingTimeout));
258        }
259    
260        /**
261         * @return maxFailedAttemptsBeforeError
262         */
263        public int getMaxFailedAttemptsBeforeError() {
264            return this.maxFailedAttemptsBeforeError.get();
265        }
266    
267        /**
268         * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value.
269         */
270        public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) {
271            this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError);
272        }
273    
274        /**
275         * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of
276         * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated.
277         * This overrides any value set in the constructor.
278         * 
279         * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being
280         *        returned.
281         * @param unit the time unit of the time argument
282         * @throws IllegalArgumentException if time less than zero
283         * @see #getKeepAliveTime
284         */
285        public void setKeepAliveTime( long time,
286                                      TimeUnit unit ) {
287            CheckArg.isNonNegative(time, "time");
288            this.keepAliveTime = unit.toNanos(time);
289        }
290    
291        /**
292         * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may
293         * remain idle before being closed.
294         * 
295         * @param unit the desired time unit of the result
296         * @return the time limit
297         * @see #setKeepAliveTime
298         */
299        public long getKeepAliveTime( TimeUnit unit ) {
300            assert unit != null;
301            return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
302        }
303    
304        /**
305         * @return maximumPoolSize
306         */
307        public int getMaximumPoolSize() {
308            return this.maximumPoolSize;
309        }
310    
311        /**
312         * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is
313         * smaller than the current value, excess existing but unused connections will be closed.
314         * 
315         * @param maximumPoolSize the new maximum
316         * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size}
317         * @see #getMaximumPoolSize
318         */
319        public void setMaximumPoolSize( int maximumPoolSize ) {
320            CheckArg.isPositive(maximumPoolSize, "maximum pool size");
321            if (maximumPoolSize < corePoolSize) {
322                throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
323            }
324            final ReentrantLock mainLock = this.mainLock;
325            try {
326                mainLock.lock();
327                int extra = this.maximumPoolSize - maximumPoolSize;
328                this.maximumPoolSize = maximumPoolSize;
329                if (extra > 0 && poolSize > maximumPoolSize) {
330                    // Drain the extra connections from those available ...
331                    drainUnusedConnections(extra);
332                }
333            } finally {
334                mainLock.unlock();
335            }
336        }
337    
338        /**
339         * Returns the core number of connections.
340         * 
341         * @return the core number of connections
342         * @see #setCorePoolSize(int)
343         */
344        public int getCorePoolSize() {
345            return this.corePoolSize;
346        }
347    
348        /**
349         * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the
350         * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be
351         * created.
352         * 
353         * @param corePoolSize the new core size
354         * @throws RepositorySourceException if there was an error obtaining the new connection
355         * @throws InterruptedException if the thread was interrupted during the operation
356         * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero
357         * @see #getCorePoolSize()
358         */
359        public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException {
360            CheckArg.isNonNegative(corePoolSize, "core pool size");
361            if (maximumPoolSize < corePoolSize) {
362                throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text());
363            }
364            final ReentrantLock mainLock = this.mainLock;
365            try {
366                mainLock.lock();
367                int extra = this.corePoolSize - corePoolSize;
368                this.corePoolSize = corePoolSize;
369                if (extra < 0) {
370                    // Add connections ...
371                    addConnectionsIfUnderCorePoolSize();
372                } else if (extra > 0 && poolSize > corePoolSize) {
373                    // Drain the extra connections from those available ...
374                    drainUnusedConnections(extra);
375                }
376            } finally {
377                mainLock.unlock();
378            }
379        }
380    
381        // -------------------------------------------------
382        // Statistics ...
383        // -------------------------------------------------
384    
385        /**
386         * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not
387         * being used.
388         * 
389         * @return the number of connections
390         */
391        public int getPoolSize() {
392            return poolSize;
393        }
394    
395        /**
396         * Returns the approximate number of connections that are currently checked out from the pool.
397         * 
398         * @return the number of checked-out connections
399         */
400        public int getInUseCount() {
401            final ReentrantLock mainLock = this.mainLock;
402            try {
403                mainLock.lock();
404                return this.inUseConnections.size();
405            } finally {
406                mainLock.unlock();
407            }
408        }
409    
410        /**
411         * Get the total number of connections that have been created by this pool.
412         * 
413         * @return the total number of connections created by this pool
414         */
415        public long getTotalConnectionsCreated() {
416            return this.totalConnectionsCreated.get();
417        }
418    
419        /**
420         * Get the total number of times connections have been {@link #getConnection()} used.
421         * 
422         * @return the total number
423         */
424        public long getTotalConnectionsUsed() {
425            return this.totalConnectionsUsed.get();
426        }
427    
428        // -------------------------------------------------
429        // State management methods ...
430        // -------------------------------------------------
431    
432        /**
433         * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections
434         * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have
435         * already been started.
436         * 
437         * @return true if a connection was started
438         * @throws RepositorySourceException if there was an error obtaining the new connection
439         * @throws InterruptedException if the thread was interrupted during the operation
440         */
441        public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException {
442            final ReentrantLock mainLock = this.mainLock;
443            try {
444                mainLock.lock();
445                return addConnectionIfUnderCorePoolSize();
446            } finally {
447                mainLock.unlock();
448            }
449        }
450    
451        /**
452         * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core
453         * connections only when they are {@link #getConnection() needed}.
454         * 
455         * @return the number of connections started.
456         * @throws RepositorySourceException if there was an error obtaining the new connection
457         * @throws InterruptedException if the thread was interrupted during the operation
458         */
459        public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException {
460            final ReentrantLock mainLock = this.mainLock;
461            try {
462                mainLock.lock();
463                return addConnectionsIfUnderCorePoolSize();
464            } finally {
465                mainLock.unlock();
466            }
467        }
468    
469        /**
470         * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed
471         * as normal, but no new connections will be created. Invocation has no additional effect if already shut down.
472         * <p>
473         * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}.
474         * </p>
475         * 
476         * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
477         *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
478         *         or the security manager's <tt>checkAccess</tt> method denies access.
479         * @see #shutdownNow()
480         */
481        public void shutdown() {
482            // Fail if caller doesn't have modifyThread permission. We
483            // explicitly check permissions directly because we can't trust
484            // implementations of SecurityManager to correctly override
485            // the "check access" methods such that our documented
486            // security policy is implemented.
487            SecurityManager security = System.getSecurityManager();
488            if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
489    
490            this.logger.debug("Shutting down repository connection pool for {0}", getSourceName());
491            boolean fullyTerminated = false;
492            final ReentrantLock mainLock = this.mainLock;
493            try {
494                mainLock.lock();
495                int state = this.runState;
496                if (state == RUNNING) {
497                    // don't override shutdownNow
498                    this.runState = SHUTDOWN;
499                }
500    
501                // Kill the maintenance thread ...
502    
503                // Remove and close all available connections ...
504                if (!this.availableConnections.isEmpty()) {
505                    // Drain the extra connections from those available ...
506                    drainUnusedConnections(this.availableConnections.size());
507                }
508    
509                // If there are no connections being used, trigger full termination now ...
510                if (this.inUseConnections.isEmpty()) {
511                    fullyTerminated = true;
512                    this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
513                    runState = TERMINATED;
514                    termination.signalAll();
515                    this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
516                }
517                // Otherwise the last connection that is closed will transition the runState to TERMINATED ...
518            } finally {
519                mainLock.unlock();
520            }
521            if (fullyTerminated) terminated();
522        }
523    
524        /**
525         * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other
526         * connections.
527         * 
528         * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller
529         *         is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>,
530         *         or the security manager's <tt>checkAccess</tt> method denies access.
531         * @see #shutdown()
532         */
533        public void shutdownNow() {
534            // Almost the same code as shutdown()
535            SecurityManager security = System.getSecurityManager();
536            if (security != null) java.security.AccessController.checkPermission(shutdownPerm);
537    
538            this.logger.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName());
539            boolean fullyTerminated = false;
540            final ReentrantLock mainLock = this.mainLock;
541            try {
542                mainLock.lock();
543                int state = this.runState;
544                if (state != TERMINATED) {
545                    // don't override shutdownNow
546                    this.runState = STOP;
547                }
548    
549                // Kill the maintenance thread ...
550    
551                // Remove and close all available connections ...
552                if (!this.availableConnections.isEmpty()) {
553                    // Drain the extra connections from those available ...
554                    drainUnusedConnections(this.availableConnections.size());
555                }
556    
557                // If there are connections being used, close them now ...
558                if (!this.inUseConnections.isEmpty()) {
559                    for (ConnectionWrapper connectionInUse : this.inUseConnections) {
560                        this.logger.trace("Closing repository connection to {0}", getSourceName());
561                        connectionInUse.getOriginal().close();
562                    }
563                    this.poolSize -= this.inUseConnections.size();
564                    // The last connection that is closed will transition the runState to TERMINATED ...
565                } else {
566                    // There are no connections in use, so trigger full termination now ...
567                    fullyTerminated = true;
568                    this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
569                    runState = TERMINATED;
570                    termination.signalAll();
571                    this.logger.debug("Terminated repository connection pool for {0}", getSourceName());
572                }
573    
574            } finally {
575                mainLock.unlock();
576            }
577            if (fullyTerminated) terminated();
578        }
579    
580        /**
581         * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this
582         * method is effectively <code>!isShutdown()</code>.
583         * 
584         * @return true if this pool is running, or false otherwise
585         * @see #isShutdown()
586         * @see #isTerminated()
587         * @see #isTerminating()
588         */
589        public boolean isRunning() {
590            return runState == RUNNING;
591        }
592    
593        /**
594         * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of
595         * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively
596         * <code>!isRunning()</code>.
597         * 
598         * @return true if this pool has been shut down, or false otherwise
599         * @see #isShutdown()
600         * @see #isTerminated()
601         * @see #isTerminating()
602         */
603        public boolean isShutdown() {
604            return runState != RUNNING;
605        }
606    
607        /**
608         * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been
609         * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a
610         * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this
611         * executor not to properly terminate.
612         * 
613         * @return true if terminating but not yet terminated, or false otherwise
614         * @see #isTerminated()
615         */
616        public boolean isTerminating() {
617            return runState == STOP;
618        }
619    
620        /**
621         * Return true if this pool has completed its termination and no longer has any open connections.
622         * 
623         * @return true if terminated, or false otherwise
624         * @see #isTerminating()
625         */
626        public boolean isTerminated() {
627            return runState == TERMINATED;
628        }
629    
630        /**
631         * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the
632         * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will
633         * return even if all connections have not been closed.
634         * 
635         * @param timeout the maximum time to wait for all connections to be closed and returned to the pool
636         * @param unit the time unit for <code>timeout</code>
637         * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred
638         *         before all the connections were closed
639         * @throws InterruptedException if the thread was interrupted
640         */
641        public boolean awaitTermination( long timeout,
642                                         TimeUnit unit ) throws InterruptedException {
643            this.logger.trace("Awaiting termination");
644            long nanos = unit.toNanos(timeout);
645            final ReentrantLock mainLock = this.mainLock;
646            try {
647                mainLock.lock();
648                for (;;) {
649                    // this.logger.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize);
650                    if (runState == TERMINATED) return true;
651                    if (nanos <= 0) return false;
652                    nanos = termination.awaitNanos(nanos);
653                    //this.logger.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize)
654                    // ;
655                }
656            } finally {
657                mainLock.unlock();
658                this.logger.trace("Finished awaiting termination");
659            }
660        }
661    
662        /**
663         * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple
664         * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method.
665         */
666        protected void terminated() {
667        }
668    
669        /**
670         * Invokes <tt>shutdown</tt> when this pool is no longer referenced.
671         */
672        @Override
673        protected void finalize() {
674            shutdown();
675        }
676    
677        // -------------------------------------------------
678        // Connection management methods ...
679        // -------------------------------------------------
680    
681        /**
682         * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection
683         * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum
684         * number of connections and all connections are currently being used.
685         * 
686         * @return a connection
687         * @throws RepositorySourceException if there is a problem obtaining a connection
688         * @throws IllegalStateException if the factory is not in a state to create or return connections
689         */
690        public RepositoryConnection getConnection() throws RepositorySourceException {
691            int attemptsAllowed = this.maxFailedAttemptsBeforeError.get();
692            ConnectionWrapper connection = null;
693            // Do this until we get a good connection ...
694            int attemptsRemaining = attemptsAllowed;
695            while (connection == null && attemptsRemaining > 0) {
696                --attemptsRemaining;
697                ReentrantLock mainLock = this.mainLock;
698                try {
699                    mainLock.lock();
700                    // If we're shutting down the pool, then just close the connection ...
701                    if (this.runState != RUNNING) {
702                        throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text());
703                    }
704                    // If there are fewer total connections than the core size ...
705                    if (this.poolSize < this.corePoolSize) {
706                        // Immediately create a wrapped connection and return it ...
707                        connection = newWrappedConnection();
708                    }
709                    // Peek to see if there is a connection available ...
710                    else if (this.availableConnections.peek() != null) {
711                        // There is, so take it and return it ...
712                        try {
713                            connection = this.availableConnections.take();
714                        } catch (InterruptedException e) {
715                            this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
716                            Thread.interrupted();
717                            throw new RepositorySourceException(getSourceName(), e);
718                        }
719                    }
720                    // There is no connection available. If there are fewer total connections than the maximum size ...
721                    else if (this.poolSize < this.maximumPoolSize) {
722                        // Immediately create a wrapped connection and return it ...
723                        connection = newWrappedConnection();
724                    }
725                    if (connection != null) {
726                        this.inUseConnections.add(connection);
727                    }
728                } finally {
729                    mainLock.unlock();
730                }
731                if (connection == null) {
732                    // There are not enough connections, so wait in line for the next available connection ...
733                    this.logger.trace("Waiting for a repository connection from pool {0}", getSourceName());
734                    try {
735                        connection = this.availableConnections.take();
736                    } catch (InterruptedException e) {
737                        this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName());
738                        Thread.interrupted();
739                        throw new RepositorySourceException(getSourceName(), e);
740                    }
741                    mainLock = this.mainLock;
742                    mainLock.lock();
743                    try {
744                        if (connection != null) {
745                            this.inUseConnections.add(connection);
746                        }
747                    } finally {
748                        mainLock.unlock();
749                    }
750                    this.logger.trace("Recieved a repository connection from pool {0}", getSourceName());
751                }
752                if (connection != null && this.validateConnectionBeforeUse.get()) {
753                    try {
754                        connection = validateConnection(connection);
755                    } catch (InterruptedException e) {
756                        this.logger.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName());
757                        returnConnection(connection);
758                        Thread.interrupted();
759                        throw new RepositorySourceException(getSourceName(), e);
760                    }
761                }
762            }
763            if (connection == null) {
764                // We were unable to obtain a usable connection, so fail ...
765                throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed));
766            }
767            this.totalConnectionsUsed.incrementAndGet();
768            return connection;
769        }
770    
771        /**
772         * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}.
773         * 
774         * @param wrapper the wrapper to the connection that is being returned to the pool
775         */
776        protected void returnConnection( ConnectionWrapper wrapper ) {
777            assert wrapper != null;
778            ConnectionWrapper wrapperToClose = null;
779            final ReentrantLock mainLock = this.mainLock;
780            try {
781                mainLock.lock();
782                // Remove the connection from the in-use set ...
783                boolean removed = this.inUseConnections.remove(wrapper);
784                assert removed;
785    
786                // If we're shutting down the pool, then just close the connection ...
787                if (this.runState != RUNNING) {
788                    wrapperToClose = wrapper;
789                }
790                // If there are more connections than the maximum size...
791                else if (this.poolSize > this.maximumPoolSize) {
792                    // Immediately close this connection ...
793                    wrapperToClose = wrapper;
794                }
795                // Attempt to make the connection available (this should generally work, unless there is an upper limit
796                // to the number of available connections) ...
797                else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) {
798                    // The pool of available connection is full, so release it ...
799                    wrapperToClose = wrapper;
800                }
801            } finally {
802                mainLock.unlock();
803            }
804            // Close the connection if we're supposed to (do it outside of the main lock)...
805            if (wrapperToClose != null) {
806                closeConnection(wrapperToClose);
807            }
808        }
809    
810        /**
811         * Validate the supplied connection, returning the connection if valid or null if the connection is not valid.
812         * 
813         * @param connection the connection to be validated; may not be null
814         * @return the validated connection, or null if the connection did not validate and was removed from the pool
815         * @throws InterruptedException if the thread is interrupted while validating the connection
816         */
817        protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException {
818            assert connection != null;
819            ConnectionWrapper invalidConnection = null;
820            try {
821                if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) {
822                    invalidConnection = connection;
823                }
824            } finally {
825                if (invalidConnection != null) {
826                    connection = null;
827                    returnConnection(invalidConnection);
828                }
829            }
830            return connection;
831        }
832    
833        /**
834         * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new
835         * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the
836         * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the
837         * {@link #poolSize pool size}.
838         * 
839         * @return the connection wrapper with a new connection
840         * @throws RepositorySourceException if there was an error obtaining the new connection
841         */
842        @GuardedBy( "mainLock" )
843        protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException {
844            RepositoryConnection connection = this.source.getConnection();
845            ++this.poolSize;
846            this.totalConnectionsCreated.incrementAndGet();
847            return new ConnectionWrapper(connection);
848        }
849    
850        /**
851         * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This
852         * method does decrement the {@link #poolSize pool size}.
853         * 
854         * @param wrapper the wrapper for the connection to be closed
855         */
856        protected void closeConnection( ConnectionWrapper wrapper ) {
857            assert wrapper != null;
858            RepositoryConnection original = wrapper.getOriginal();
859            assert original != null;
860            try {
861                this.logger.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize);
862                original.close();
863            } finally {
864                final ReentrantLock mainLock = this.mainLock;
865                try {
866                    mainLock.lock();
867                    // No matter what reduce the pool size count
868                    --this.poolSize;
869                    // And if shutting down and this was the last connection being used...
870                    if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) {
871                        // then signal anybody that has called "awaitTermination(...)"
872                        this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName());
873                        this.runState = TERMINATED;
874                        this.termination.signalAll();
875                        this.logger.trace("Terminated repository connection pool for {0}", getSourceName());
876    
877                        // fall through to call terminate() outside of lock.
878                    }
879                } finally {
880                    mainLock.unlock();
881                }
882            }
883        }
884    
885        @GuardedBy( "mainLock" )
886        protected int drainUnusedConnections( int count ) {
887            if (count <= 0) return 0;
888            this.logger.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName());
889            // Drain the extra connections from those available ...
890            Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>();
891            this.availableConnections.drainTo(extraConnections, count);
892            for (ConnectionWrapper connection : extraConnections) {
893                this.logger.trace("Closing repository connection to {0}", getSourceName());
894                connection.getOriginal().close();
895            }
896            int numClosed = extraConnections.size();
897            this.poolSize -= numClosed;
898            this.logger.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize);
899            return numClosed;
900        }
901    
902        @GuardedBy( "mainLock" )
903        protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException {
904            // Add connection ...
905            if (this.poolSize < this.corePoolSize) {
906                this.availableConnections.offer(newWrappedConnection());
907                this.logger.trace("Added connection to {0} in undersized pool", getSourceName());
908                return true;
909            }
910            return false;
911        }
912    
913        @GuardedBy( "mainLock" )
914        protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException {
915            // Add connections ...
916            int n = 0;
917            while (this.poolSize < this.corePoolSize) {
918                this.availableConnections.offer(newWrappedConnection());
919                ++n;
920            }
921            this.logger.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName());
922            return n;
923        }
924    
925        protected class ConnectionWrapper implements RepositoryConnection {
926    
927            private final RepositoryConnection original;
928            private final long timeCreated;
929            private long lastUsed;
930            private boolean closed = false;
931    
932            protected ConnectionWrapper( RepositoryConnection connection ) {
933                assert connection != null;
934                this.original = connection;
935                this.timeCreated = System.currentTimeMillis();
936            }
937    
938            /**
939             * @return original
940             */
941            protected RepositoryConnection getOriginal() {
942                return this.original;
943            }
944    
945            /**
946             * @return lastUsed
947             */
948            public long getTimeLastUsed() {
949                return this.lastUsed;
950            }
951    
952            /**
953             * @return timeCreated
954             */
955            public long getTimeCreated() {
956                return this.timeCreated;
957            }
958    
959            /**
960             * {@inheritDoc}
961             */
962            public String getSourceName() {
963                return this.original.getSourceName();
964            }
965    
966            /**
967             * {@inheritDoc}
968             */
969            public XAResource getXAResource() {
970                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
971                return this.original.getXAResource();
972            }
973    
974            /**
975             * {@inheritDoc}
976             */
977            public CachePolicy getDefaultCachePolicy() {
978                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
979                return this.original.getDefaultCachePolicy();
980            }
981    
982            /**
983             * {@inheritDoc}
984             * 
985             * @see org.jboss.dna.graph.connector.RepositoryConnection#execute(org.jboss.dna.graph.ExecutionContext,
986             *      org.jboss.dna.graph.request.Request)
987             */
988            public void execute( ExecutionContext context,
989                                 Request request ) throws RepositorySourceException {
990                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
991                this.original.execute(context, request);
992            }
993    
994            /**
995             * {@inheritDoc}
996             */
997            public boolean ping( long time,
998                                 TimeUnit unit ) throws InterruptedException {
999                if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text());
1000                return this.original.ping(time, unit);
1001            }
1002    
1003            /**
1004             * {@inheritDoc}
1005             */
1006            public void close() {
1007                if (!closed) {
1008                    this.lastUsed = System.currentTimeMillis();
1009                    this.original.close();
1010                    this.closed = true;
1011                    returnConnection(this);
1012                }
1013            }
1014    
1015            /**
1016             * {@inheritDoc}
1017             */
1018            public void setListener( RepositorySourceListener listener ) {
1019                if (!closed) this.original.setListener(listener);
1020            }
1021    
1022        }
1023    
1024    }