001 /* 002 * JBoss DNA (http://www.jboss.org/dna) 003 * See the COPYRIGHT.txt file distributed with this work for information 004 * regarding copyright ownership. Some portions may be licensed 005 * to Red Hat, Inc. under one or more contributor license agreements. 006 * See the AUTHORS.txt file in the distribution for a full listing of 007 * individual contributors. 008 * 009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA 010 * is licensed to you under the terms of the GNU Lesser General Public License as 011 * published by the Free Software Foundation; either version 2.1 of 012 * the License, or (at your option) any later version. 013 * 014 * JBoss DNA is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 017 * Lesser General Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser General Public 020 * License along with this software; if not, write to the Free 021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 023 */ 024 package org.jboss.dna.graph.connector; 025 026 import java.util.Collection; 027 import java.util.HashSet; 028 import java.util.LinkedList; 029 import java.util.Set; 030 import java.util.concurrent.BlockingQueue; 031 import java.util.concurrent.LinkedBlockingQueue; 032 import java.util.concurrent.TimeUnit; 033 import java.util.concurrent.atomic.AtomicBoolean; 034 import java.util.concurrent.atomic.AtomicInteger; 035 import java.util.concurrent.atomic.AtomicLong; 036 import java.util.concurrent.locks.Condition; 037 import java.util.concurrent.locks.ReentrantLock; 038 import javax.transaction.xa.XAResource; 039 import net.jcip.annotations.GuardedBy; 040 import net.jcip.annotations.ThreadSafe; 041 import org.jboss.dna.common.util.CheckArg; 042 import org.jboss.dna.common.util.Logger; 043 import org.jboss.dna.graph.ExecutionContext; 044 import org.jboss.dna.graph.GraphI18n; 045 import org.jboss.dna.graph.cache.CachePolicy; 046 import org.jboss.dna.graph.request.Request; 047 048 /** 049 * A reusable implementation of a managed pool of connections that is optimized for safe concurrent operations. 050 * 051 * @author Randall Hauch 052 */ 053 @ThreadSafe 054 public class RepositoryConnectionPool { 055 056 /** 057 * The core pool size for default-constructed pools is {@value} . 058 */ 059 public static final int DEFAULT_CORE_POOL_SIZE = 1; 060 061 /** 062 * The maximum pool size for default-constructed pools is {@value} . 063 */ 064 public static final int DEFAULT_MAXIMUM_POOL_SIZE = 10; 065 066 /** 067 * The keep-alive time for connections in default-constructed pools is {@value} seconds. 068 */ 069 public static final long DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS = 30; 070 071 /** 072 * Permission for checking shutdown 073 */ 074 private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); 075 076 /** 077 * The source that this pool uses to create new connections. 078 */ 079 private final RepositorySource source; 080 081 /** 082 * Lock held on updates to poolSize, corePoolSize, maximumPoolSize, and workers set. 083 */ 084 private final ReentrantLock mainLock = new ReentrantLock(); 085 086 /** 087 * Wait condition to support awaitTermination 088 */ 089 private final Condition termination = mainLock.newCondition(); 090 091 /** 092 * Set containing all connections that are available for use. 093 */ 094 @GuardedBy( "mainLock" ) 095 private final BlockingQueue<ConnectionWrapper> availableConnections = new LinkedBlockingQueue<ConnectionWrapper>(); 096 097 /** 098 * The connections that are currently in use. 099 */ 100 @GuardedBy( "mainLock" ) 101 private final Set<ConnectionWrapper> inUseConnections = new HashSet<ConnectionWrapper>(); 102 103 /** 104 * Timeout in nanoseconds for idle connections waiting to be used. Threads use this timeout only when there are more than 105 * corePoolSize present. Otherwise they wait forever to be used. 106 */ 107 private volatile long keepAliveTime; 108 109 /** 110 * The target pool size, updated only while holding mainLock, but volatile to allow concurrent readability even during 111 * updates. 112 */ 113 @GuardedBy( "mainLock" ) 114 private volatile int corePoolSize; 115 116 /** 117 * Maximum pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates. 118 */ 119 @GuardedBy( "mainLock" ) 120 private volatile int maximumPoolSize; 121 122 /** 123 * Current pool size, updated only while holding mainLock but volatile to allow concurrent readability even during updates. 124 */ 125 @GuardedBy( "mainLock" ) 126 private volatile int poolSize; 127 128 /** 129 * Lifecycle state, updated only while holding mainLock but volatile to allow concurrent readability even during updates. 130 */ 131 @GuardedBy( "mainLock" ) 132 private volatile int runState; 133 134 // Special values for runState 135 /** Normal, not-shutdown mode */ 136 static final int RUNNING = 0; 137 /** Controlled shutdown mode */ 138 static final int SHUTDOWN = 1; 139 /** Immediate shutdown mode */ 140 static final int STOP = 2; 141 /** Final state */ 142 static final int TERMINATED = 3; 143 144 /** 145 * Flag specifying whether a connection should be validated before returning it from the {@link #getConnection()} method. 146 */ 147 private final AtomicBoolean validateConnectionBeforeUse = new AtomicBoolean(false); 148 149 /** 150 * The time in nanoseconds that ping should wait before timing out and failing. 151 */ 152 private final AtomicLong pingTimeout = new AtomicLong(0); 153 154 /** 155 * The number of times an attempt to obtain a connection should fail with invalid connections before throwing an exception. 156 */ 157 private final AtomicInteger maxFailedAttemptsBeforeError = new AtomicInteger(10); 158 159 private final AtomicLong totalConnectionsCreated = new AtomicLong(0); 160 161 private final AtomicLong totalConnectionsUsed = new AtomicLong(0); 162 163 private final Logger logger = Logger.getLogger(this.getClass()); 164 165 /** 166 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. This constructor 167 * uses the {@link #DEFAULT_CORE_POOL_SIZE default core pool size}, {@link #DEFAULT_MAXIMUM_POOL_SIZE default maximum pool 168 * size}, and {@link #DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS default keep-alive time (in seconds)}. 169 * 170 * @param source the source for connections 171 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid 172 */ 173 public RepositoryConnectionPool( RepositorySource source ) { 174 this(source, DEFAULT_CORE_POOL_SIZE, DEFAULT_MAXIMUM_POOL_SIZE, DEFAULT_KEEP_ALIVE_TIME_IN_SECONDS, TimeUnit.SECONDS); 175 } 176 177 /** 178 * Create the pool to use the supplied connection factory, which is typically a {@link RepositorySource}. 179 * 180 * @param source the source for connections 181 * @param corePoolSize the number of connections to keep in the pool, even if they are idle. 182 * @param maximumPoolSize the maximum number of connections to allow in the pool. 183 * @param keepAliveTime when the number of connection is greater than the core, this is the maximum time that excess idle 184 * connections will be kept before terminating. 185 * @param unit the time unit for the keepAliveTime argument. 186 * @throws IllegalArgumentException if the connection factory is null or any of the supplied arguments are invalid 187 */ 188 public RepositoryConnectionPool( RepositorySource source, 189 int corePoolSize, 190 int maximumPoolSize, 191 long keepAliveTime, 192 TimeUnit unit ) { 193 CheckArg.isNonNegative(corePoolSize, "corePoolSize"); 194 CheckArg.isPositive(maximumPoolSize, "maximumPoolSize"); 195 CheckArg.isNonNegative(keepAliveTime, "keepAliveTime"); 196 CheckArg.isNotNull(source, "source"); 197 if (maximumPoolSize < corePoolSize) { 198 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text()); 199 } 200 this.source = source; 201 this.corePoolSize = corePoolSize; 202 this.maximumPoolSize = maximumPoolSize; 203 this.keepAliveTime = unit.toNanos(keepAliveTime); 204 this.setPingTimeout(100, TimeUnit.MILLISECONDS); 205 } 206 207 /** 208 * Get the {@link RepositorySource} that's used by this pool. 209 * 210 * @return the repository source; never null 211 */ 212 public final RepositorySource getRepositorySource() { 213 return source; 214 } 215 216 /** 217 * Get the name of this pool, which delegates to the connection factory. 218 * 219 * @return the name of the source 220 */ 221 protected String getSourceName() { 222 return source.getName(); 223 } 224 225 // ------------------------------------------------- 226 // Property settings ... 227 // ------------------------------------------------- 228 229 /** 230 * @return validateConnectionBeforeUse 231 */ 232 public boolean getValidateConnectionBeforeUse() { 233 return this.validateConnectionBeforeUse.get(); 234 } 235 236 /** 237 * @param validateConnectionBeforeUse Sets validateConnectionBeforeUse to the specified value. 238 */ 239 public void setValidateConnectionBeforeUse( boolean validateConnectionBeforeUse ) { 240 this.validateConnectionBeforeUse.set(validateConnectionBeforeUse); 241 } 242 243 /** 244 * @return pingTimeout 245 */ 246 public long getPingTimeoutInNanos() { 247 return this.pingTimeout.get(); 248 } 249 250 /** 251 * @param pingTimeout the time to wait for a ping to complete 252 * @param unit the time unit of the time argument 253 */ 254 public void setPingTimeout( long pingTimeout, 255 TimeUnit unit ) { 256 CheckArg.isNonNegative(pingTimeout, "time"); 257 this.pingTimeout.set(unit.toNanos(pingTimeout)); 258 } 259 260 /** 261 * @return maxFailedAttemptsBeforeError 262 */ 263 public int getMaxFailedAttemptsBeforeError() { 264 return this.maxFailedAttemptsBeforeError.get(); 265 } 266 267 /** 268 * @param maxFailedAttemptsBeforeError Sets maxFailedAttemptsBeforeError to the specified value. 269 */ 270 public void setMaxFailedAttemptsBeforeError( int maxFailedAttemptsBeforeError ) { 271 this.maxFailedAttemptsBeforeError.set(maxFailedAttemptsBeforeError); 272 } 273 274 /** 275 * Sets the time limit for which connections may remain idle before being closed. If there are more than the core number of 276 * connections currently in the pool, after waiting this amount of time without being used, excess threads will be terminated. 277 * This overrides any value set in the constructor. 278 * 279 * @param time the time to wait. A time value of zero will cause excess connections to terminate immediately after being 280 * returned. 281 * @param unit the time unit of the time argument 282 * @throws IllegalArgumentException if time less than zero 283 * @see #getKeepAliveTime 284 */ 285 public void setKeepAliveTime( long time, 286 TimeUnit unit ) { 287 CheckArg.isNonNegative(time, "time"); 288 this.keepAliveTime = unit.toNanos(time); 289 } 290 291 /** 292 * Returns the connection keep-alive time, which is the amount of time which connections in excess of the core pool size may 293 * remain idle before being closed. 294 * 295 * @param unit the desired time unit of the result 296 * @return the time limit 297 * @see #setKeepAliveTime 298 */ 299 public long getKeepAliveTime( TimeUnit unit ) { 300 assert unit != null; 301 return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS); 302 } 303 304 /** 305 * @return maximumPoolSize 306 */ 307 public int getMaximumPoolSize() { 308 return this.maximumPoolSize; 309 } 310 311 /** 312 * Sets the maximum allowed number of connections. This overrides any value set in the constructor. If the new value is 313 * smaller than the current value, excess existing but unused connections will be closed. 314 * 315 * @param maximumPoolSize the new maximum 316 * @throws IllegalArgumentException if maximumPoolSize less than zero or the {@link #getCorePoolSize() core pool size} 317 * @see #getMaximumPoolSize 318 */ 319 public void setMaximumPoolSize( int maximumPoolSize ) { 320 CheckArg.isPositive(maximumPoolSize, "maximum pool size"); 321 if (maximumPoolSize < corePoolSize) { 322 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text()); 323 } 324 final ReentrantLock mainLock = this.mainLock; 325 try { 326 mainLock.lock(); 327 int extra = this.maximumPoolSize - maximumPoolSize; 328 this.maximumPoolSize = maximumPoolSize; 329 if (extra > 0 && poolSize > maximumPoolSize) { 330 // Drain the extra connections from those available ... 331 drainUnusedConnections(extra); 332 } 333 } finally { 334 mainLock.unlock(); 335 } 336 } 337 338 /** 339 * Returns the core number of connections. 340 * 341 * @return the core number of connections 342 * @see #setCorePoolSize(int) 343 */ 344 public int getCorePoolSize() { 345 return this.corePoolSize; 346 } 347 348 /** 349 * Sets the core number of connections. This overrides any value set in the constructor. If the new value is smaller than the 350 * current value, excess existing and unused connections will be closed. If larger, new connections will, if needed, be 351 * created. 352 * 353 * @param corePoolSize the new core size 354 * @throws RepositorySourceException if there was an error obtaining the new connection 355 * @throws InterruptedException if the thread was interrupted during the operation 356 * @throws IllegalArgumentException if <tt>corePoolSize</tt> less than zero 357 * @see #getCorePoolSize() 358 */ 359 public void setCorePoolSize( int corePoolSize ) throws RepositorySourceException, InterruptedException { 360 CheckArg.isNonNegative(corePoolSize, "core pool size"); 361 if (maximumPoolSize < corePoolSize) { 362 throw new IllegalArgumentException(GraphI18n.maximumPoolSizeMayNotBeSmallerThanCorePoolSize.text()); 363 } 364 final ReentrantLock mainLock = this.mainLock; 365 try { 366 mainLock.lock(); 367 int extra = this.corePoolSize - corePoolSize; 368 this.corePoolSize = corePoolSize; 369 if (extra < 0) { 370 // Add connections ... 371 addConnectionsIfUnderCorePoolSize(); 372 } else if (extra > 0 && poolSize > corePoolSize) { 373 // Drain the extra connections from those available ... 374 drainUnusedConnections(extra); 375 } 376 } finally { 377 mainLock.unlock(); 378 } 379 } 380 381 // ------------------------------------------------- 382 // Statistics ... 383 // ------------------------------------------------- 384 385 /** 386 * Returns the current number of connections in the pool, including those that are checked out (in use) and those that are not 387 * being used. 388 * 389 * @return the number of connections 390 */ 391 public int getPoolSize() { 392 return poolSize; 393 } 394 395 /** 396 * Returns the approximate number of connections that are currently checked out from the pool. 397 * 398 * @return the number of checked-out connections 399 */ 400 public int getInUseCount() { 401 final ReentrantLock mainLock = this.mainLock; 402 try { 403 mainLock.lock(); 404 return this.inUseConnections.size(); 405 } finally { 406 mainLock.unlock(); 407 } 408 } 409 410 /** 411 * Get the total number of connections that have been created by this pool. 412 * 413 * @return the total number of connections created by this pool 414 */ 415 public long getTotalConnectionsCreated() { 416 return this.totalConnectionsCreated.get(); 417 } 418 419 /** 420 * Get the total number of times connections have been {@link #getConnection()} used. 421 * 422 * @return the total number 423 */ 424 public long getTotalConnectionsUsed() { 425 return this.totalConnectionsUsed.get(); 426 } 427 428 // ------------------------------------------------- 429 // State management methods ... 430 // ------------------------------------------------- 431 432 /** 433 * Starts a core connection, causing it to idly wait for use. This overrides the default policy of starting core connections 434 * only when they are {@link #getConnection() needed}. This method will return <tt>false</tt> if all core connections have 435 * already been started. 436 * 437 * @return true if a connection was started 438 * @throws RepositorySourceException if there was an error obtaining the new connection 439 * @throws InterruptedException if the thread was interrupted during the operation 440 */ 441 public boolean prestartCoreConnection() throws RepositorySourceException, InterruptedException { 442 final ReentrantLock mainLock = this.mainLock; 443 try { 444 mainLock.lock(); 445 return addConnectionIfUnderCorePoolSize(); 446 } finally { 447 mainLock.unlock(); 448 } 449 } 450 451 /** 452 * Starts all core connections, causing them to idly wait for use. This overrides the default policy of starting core 453 * connections only when they are {@link #getConnection() needed}. 454 * 455 * @return the number of connections started. 456 * @throws RepositorySourceException if there was an error obtaining the new connection 457 * @throws InterruptedException if the thread was interrupted during the operation 458 */ 459 public int prestartAllCoreConnections() throws RepositorySourceException, InterruptedException { 460 final ReentrantLock mainLock = this.mainLock; 461 try { 462 mainLock.lock(); 463 return addConnectionsIfUnderCorePoolSize(); 464 } finally { 465 mainLock.unlock(); 466 } 467 } 468 469 /** 470 * Initiates an orderly shutdown of the pool in which connections that are currently in use are allowed to be used and closed 471 * as normal, but no new connections will be created. Invocation has no additional effect if already shut down. 472 * <p> 473 * Once the pool has been shutdown, it may not be used to {@link #getConnection() get connections}. 474 * </p> 475 * 476 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller 477 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 478 * or the security manager's <tt>checkAccess</tt> method denies access. 479 * @see #shutdownNow() 480 */ 481 public void shutdown() { 482 // Fail if caller doesn't have modifyThread permission. We 483 // explicitly check permissions directly because we can't trust 484 // implementations of SecurityManager to correctly override 485 // the "check access" methods such that our documented 486 // security policy is implemented. 487 SecurityManager security = System.getSecurityManager(); 488 if (security != null) java.security.AccessController.checkPermission(shutdownPerm); 489 490 this.logger.debug("Shutting down repository connection pool for {0}", getSourceName()); 491 boolean fullyTerminated = false; 492 final ReentrantLock mainLock = this.mainLock; 493 try { 494 mainLock.lock(); 495 int state = this.runState; 496 if (state == RUNNING) { 497 // don't override shutdownNow 498 this.runState = SHUTDOWN; 499 } 500 501 // Kill the maintenance thread ... 502 503 // Remove and close all available connections ... 504 if (!this.availableConnections.isEmpty()) { 505 // Drain the extra connections from those available ... 506 drainUnusedConnections(this.availableConnections.size()); 507 } 508 509 // If there are no connections being used, trigger full termination now ... 510 if (this.inUseConnections.isEmpty()) { 511 fullyTerminated = true; 512 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName()); 513 runState = TERMINATED; 514 termination.signalAll(); 515 this.logger.debug("Terminated repository connection pool for {0}", getSourceName()); 516 } 517 // Otherwise the last connection that is closed will transition the runState to TERMINATED ... 518 } finally { 519 mainLock.unlock(); 520 } 521 if (fullyTerminated) terminated(); 522 } 523 524 /** 525 * Attempts to close all connections in the pool, including those connections currently in use, and prevent the use of other 526 * connections. 527 * 528 * @throws SecurityException if a security manager exists and shutting down this pool may manipulate threads that the caller 529 * is not permitted to modify because it does not hold {@link java.lang.RuntimePermission}<tt>("modifyThread")</tt>, 530 * or the security manager's <tt>checkAccess</tt> method denies access. 531 * @see #shutdown() 532 */ 533 public void shutdownNow() { 534 // Almost the same code as shutdown() 535 SecurityManager security = System.getSecurityManager(); 536 if (security != null) java.security.AccessController.checkPermission(shutdownPerm); 537 538 this.logger.debug("Shutting down (immediately) repository connection pool for {0}", getSourceName()); 539 boolean fullyTerminated = false; 540 final ReentrantLock mainLock = this.mainLock; 541 try { 542 mainLock.lock(); 543 int state = this.runState; 544 if (state != TERMINATED) { 545 // don't override shutdownNow 546 this.runState = STOP; 547 } 548 549 // Kill the maintenance thread ... 550 551 // Remove and close all available connections ... 552 if (!this.availableConnections.isEmpty()) { 553 // Drain the extra connections from those available ... 554 drainUnusedConnections(this.availableConnections.size()); 555 } 556 557 // If there are connections being used, close them now ... 558 if (!this.inUseConnections.isEmpty()) { 559 for (ConnectionWrapper connectionInUse : this.inUseConnections) { 560 this.logger.trace("Closing repository connection to {0}", getSourceName()); 561 connectionInUse.getOriginal().close(); 562 } 563 this.poolSize -= this.inUseConnections.size(); 564 // The last connection that is closed will transition the runState to TERMINATED ... 565 } else { 566 // There are no connections in use, so trigger full termination now ... 567 fullyTerminated = true; 568 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName()); 569 runState = TERMINATED; 570 termination.signalAll(); 571 this.logger.debug("Terminated repository connection pool for {0}", getSourceName()); 572 } 573 574 } finally { 575 mainLock.unlock(); 576 } 577 if (fullyTerminated) terminated(); 578 } 579 580 /** 581 * Return whether this connection pool is running and is able to {@link #getConnection() provide connections}. Note that this 582 * method is effectively <code>!isShutdown()</code>. 583 * 584 * @return true if this pool is running, or false otherwise 585 * @see #isShutdown() 586 * @see #isTerminated() 587 * @see #isTerminating() 588 */ 589 public boolean isRunning() { 590 return runState == RUNNING; 591 } 592 593 /** 594 * Return whether this connection pool is in the process of shutting down or has already been shut down. A result of 595 * <code>true</code> signals that the pool may no longer be used. Note that this method is effectively 596 * <code>!isRunning()</code>. 597 * 598 * @return true if this pool has been shut down, or false otherwise 599 * @see #isShutdown() 600 * @see #isTerminated() 601 * @see #isTerminating() 602 */ 603 public boolean isShutdown() { 604 return runState != RUNNING; 605 } 606 607 /** 608 * Returns true if this pool is in the process of terminating after {@link #shutdown()} or {@link #shutdownNow()} has been 609 * called but has not completely terminated. This method may be useful for debugging. A return of <tt>true</tt> reported a 610 * sufficient period after shutdown may indicate that submitted tasks have ignored or suppressed interruption, causing this 611 * executor not to properly terminate. 612 * 613 * @return true if terminating but not yet terminated, or false otherwise 614 * @see #isTerminated() 615 */ 616 public boolean isTerminating() { 617 return runState == STOP; 618 } 619 620 /** 621 * Return true if this pool has completed its termination and no longer has any open connections. 622 * 623 * @return true if terminated, or false otherwise 624 * @see #isTerminating() 625 */ 626 public boolean isTerminated() { 627 return runState == TERMINATED; 628 } 629 630 /** 631 * Method that can be called after {@link #shutdown()} or {@link #shutdownNow()} to wait until all connections in use at the 632 * time those methods were called have been closed normally. This method accepts a maximum time duration, after which it will 633 * return even if all connections have not been closed. 634 * 635 * @param timeout the maximum time to wait for all connections to be closed and returned to the pool 636 * @param unit the time unit for <code>timeout</code> 637 * @return true if the pool was terminated in the supplied time (or was already terminated), or false if the timeout occurred 638 * before all the connections were closed 639 * @throws InterruptedException if the thread was interrupted 640 */ 641 public boolean awaitTermination( long timeout, 642 TimeUnit unit ) throws InterruptedException { 643 this.logger.trace("Awaiting termination"); 644 long nanos = unit.toNanos(timeout); 645 final ReentrantLock mainLock = this.mainLock; 646 try { 647 mainLock.lock(); 648 for (;;) { 649 // this.logger.trace("---> Run state = {0}; condition = {1}, {2} open", runState, termination, poolSize); 650 if (runState == TERMINATED) return true; 651 if (nanos <= 0) return false; 652 nanos = termination.awaitNanos(nanos); 653 //this.logger.trace("---> Done waiting: run state = {0}; condition = {1}, {2} open",runState,termination,poolSize) 654 // ; 655 } 656 } finally { 657 mainLock.unlock(); 658 this.logger.trace("Finished awaiting termination"); 659 } 660 } 661 662 /** 663 * Method invoked when the pool has terminated. Default implementation does nothing. Note: To properly nest multiple 664 * overridings, subclasses should generally invoke <tt>super.terminated</tt> within this method. 665 */ 666 protected void terminated() { 667 } 668 669 /** 670 * Invokes <tt>shutdown</tt> when this pool is no longer referenced. 671 */ 672 @Override 673 protected void finalize() { 674 shutdown(); 675 } 676 677 // ------------------------------------------------- 678 // Connection management methods ... 679 // ------------------------------------------------- 680 681 /** 682 * Get a connection from the pool. This method either returns an unused connection if one is available, creates a connection 683 * if there is still room in the pool, or blocks until a connection becomes available if the pool already contains the maximum 684 * number of connections and all connections are currently being used. 685 * 686 * @return a connection 687 * @throws RepositorySourceException if there is a problem obtaining a connection 688 * @throws IllegalStateException if the factory is not in a state to create or return connections 689 */ 690 public RepositoryConnection getConnection() throws RepositorySourceException { 691 int attemptsAllowed = this.maxFailedAttemptsBeforeError.get(); 692 ConnectionWrapper connection = null; 693 // Do this until we get a good connection ... 694 int attemptsRemaining = attemptsAllowed; 695 while (connection == null && attemptsRemaining > 0) { 696 --attemptsRemaining; 697 ReentrantLock mainLock = this.mainLock; 698 try { 699 mainLock.lock(); 700 // If we're shutting down the pool, then just close the connection ... 701 if (this.runState != RUNNING) { 702 throw new IllegalStateException(GraphI18n.repositoryConnectionPoolIsNotRunning.text()); 703 } 704 // If there are fewer total connections than the core size ... 705 if (this.poolSize < this.corePoolSize) { 706 // Immediately create a wrapped connection and return it ... 707 connection = newWrappedConnection(); 708 } 709 // Peek to see if there is a connection available ... 710 else if (this.availableConnections.peek() != null) { 711 // There is, so take it and return it ... 712 try { 713 connection = this.availableConnections.take(); 714 } catch (InterruptedException e) { 715 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName()); 716 Thread.interrupted(); 717 throw new RepositorySourceException(getSourceName(), e); 718 } 719 } 720 // There is no connection available. If there are fewer total connections than the maximum size ... 721 else if (this.poolSize < this.maximumPoolSize) { 722 // Immediately create a wrapped connection and return it ... 723 connection = newWrappedConnection(); 724 } 725 if (connection != null) { 726 this.inUseConnections.add(connection); 727 } 728 } finally { 729 mainLock.unlock(); 730 } 731 if (connection == null) { 732 // There are not enough connections, so wait in line for the next available connection ... 733 this.logger.trace("Waiting for a repository connection from pool {0}", getSourceName()); 734 try { 735 connection = this.availableConnections.take(); 736 } catch (InterruptedException e) { 737 this.logger.trace("Cancelled obtaining a repository connection from pool {0}", getSourceName()); 738 Thread.interrupted(); 739 throw new RepositorySourceException(getSourceName(), e); 740 } 741 mainLock = this.mainLock; 742 mainLock.lock(); 743 try { 744 if (connection != null) { 745 this.inUseConnections.add(connection); 746 } 747 } finally { 748 mainLock.unlock(); 749 } 750 this.logger.trace("Recieved a repository connection from pool {0}", getSourceName()); 751 } 752 if (connection != null && this.validateConnectionBeforeUse.get()) { 753 try { 754 connection = validateConnection(connection); 755 } catch (InterruptedException e) { 756 this.logger.trace("Cancelled validating a repository connection obtained from pool {0}", getSourceName()); 757 returnConnection(connection); 758 Thread.interrupted(); 759 throw new RepositorySourceException(getSourceName(), e); 760 } 761 } 762 } 763 if (connection == null) { 764 // We were unable to obtain a usable connection, so fail ... 765 throw new RepositorySourceException(GraphI18n.unableToObtainValidRepositoryAfterAttempts.text(attemptsAllowed)); 766 } 767 this.totalConnectionsUsed.incrementAndGet(); 768 return connection; 769 } 770 771 /** 772 * This method is automatically called by the {@link ConnectionWrapper} when it is {@link ConnectionWrapper#close() closed}. 773 * 774 * @param wrapper the wrapper to the connection that is being returned to the pool 775 */ 776 protected void returnConnection( ConnectionWrapper wrapper ) { 777 assert wrapper != null; 778 ConnectionWrapper wrapperToClose = null; 779 final ReentrantLock mainLock = this.mainLock; 780 try { 781 mainLock.lock(); 782 // Remove the connection from the in-use set ... 783 boolean removed = this.inUseConnections.remove(wrapper); 784 assert removed; 785 786 // If we're shutting down the pool, then just close the connection ... 787 if (this.runState != RUNNING) { 788 wrapperToClose = wrapper; 789 } 790 // If there are more connections than the maximum size... 791 else if (this.poolSize > this.maximumPoolSize) { 792 // Immediately close this connection ... 793 wrapperToClose = wrapper; 794 } 795 // Attempt to make the connection available (this should generally work, unless there is an upper limit 796 // to the number of available connections) ... 797 else if (!this.availableConnections.offer(new ConnectionWrapper(wrapper.getOriginal()))) { 798 // The pool of available connection is full, so release it ... 799 wrapperToClose = wrapper; 800 } 801 } finally { 802 mainLock.unlock(); 803 } 804 // Close the connection if we're supposed to (do it outside of the main lock)... 805 if (wrapperToClose != null) { 806 closeConnection(wrapperToClose); 807 } 808 } 809 810 /** 811 * Validate the supplied connection, returning the connection if valid or null if the connection is not valid. 812 * 813 * @param connection the connection to be validated; may not be null 814 * @return the validated connection, or null if the connection did not validate and was removed from the pool 815 * @throws InterruptedException if the thread is interrupted while validating the connection 816 */ 817 protected ConnectionWrapper validateConnection( ConnectionWrapper connection ) throws InterruptedException { 818 assert connection != null; 819 ConnectionWrapper invalidConnection = null; 820 try { 821 if (!connection.ping(this.pingTimeout.get(), TimeUnit.NANOSECONDS)) { 822 invalidConnection = connection; 823 } 824 } finally { 825 if (invalidConnection != null) { 826 connection = null; 827 returnConnection(invalidConnection); 828 } 829 } 830 return connection; 831 } 832 833 /** 834 * Obtain a new connection wrapped in a {@link ConnectionWrapper}. This method does not check whether creating the new 835 * connection would violate the {@link #maximumPoolSize maximum pool size} nor does it add the new connection to the 836 * {@link #availableConnections available connections} (as the caller may want it immediately), but it does increment the 837 * {@link #poolSize pool size}. 838 * 839 * @return the connection wrapper with a new connection 840 * @throws RepositorySourceException if there was an error obtaining the new connection 841 */ 842 @GuardedBy( "mainLock" ) 843 protected ConnectionWrapper newWrappedConnection() throws RepositorySourceException { 844 RepositoryConnection connection = this.source.getConnection(); 845 ++this.poolSize; 846 this.totalConnectionsCreated.incrementAndGet(); 847 return new ConnectionWrapper(connection); 848 } 849 850 /** 851 * Close a connection that is in the pool but no longer in the {@link #availableConnections available connections}. This 852 * method does decrement the {@link #poolSize pool size}. 853 * 854 * @param wrapper the wrapper for the connection to be closed 855 */ 856 protected void closeConnection( ConnectionWrapper wrapper ) { 857 assert wrapper != null; 858 RepositoryConnection original = wrapper.getOriginal(); 859 assert original != null; 860 try { 861 this.logger.debug("Closing repository connection to {0} ({1} open connections remain)", getSourceName(), poolSize); 862 original.close(); 863 } finally { 864 final ReentrantLock mainLock = this.mainLock; 865 try { 866 mainLock.lock(); 867 // No matter what reduce the pool size count 868 --this.poolSize; 869 // And if shutting down and this was the last connection being used... 870 if ((runState == SHUTDOWN || runState == STOP) && this.poolSize <= 0) { 871 // then signal anybody that has called "awaitTermination(...)" 872 this.logger.trace("Signalling termination of repository connection pool for {0}", getSourceName()); 873 this.runState = TERMINATED; 874 this.termination.signalAll(); 875 this.logger.trace("Terminated repository connection pool for {0}", getSourceName()); 876 877 // fall through to call terminate() outside of lock. 878 } 879 } finally { 880 mainLock.unlock(); 881 } 882 } 883 } 884 885 @GuardedBy( "mainLock" ) 886 protected int drainUnusedConnections( int count ) { 887 if (count <= 0) return 0; 888 this.logger.trace("Draining up to {0} unused repository connections to {1}", count, getSourceName()); 889 // Drain the extra connections from those available ... 890 Collection<ConnectionWrapper> extraConnections = new LinkedList<ConnectionWrapper>(); 891 this.availableConnections.drainTo(extraConnections, count); 892 for (ConnectionWrapper connection : extraConnections) { 893 this.logger.trace("Closing repository connection to {0}", getSourceName()); 894 connection.getOriginal().close(); 895 } 896 int numClosed = extraConnections.size(); 897 this.poolSize -= numClosed; 898 this.logger.trace("Drained {0} unused connections ({1} open connections remain)", numClosed, poolSize); 899 return numClosed; 900 } 901 902 @GuardedBy( "mainLock" ) 903 protected boolean addConnectionIfUnderCorePoolSize() throws RepositorySourceException { 904 // Add connection ... 905 if (this.poolSize < this.corePoolSize) { 906 this.availableConnections.offer(newWrappedConnection()); 907 this.logger.trace("Added connection to {0} in undersized pool", getSourceName()); 908 return true; 909 } 910 return false; 911 } 912 913 @GuardedBy( "mainLock" ) 914 protected int addConnectionsIfUnderCorePoolSize() throws RepositorySourceException { 915 // Add connections ... 916 int n = 0; 917 while (this.poolSize < this.corePoolSize) { 918 this.availableConnections.offer(newWrappedConnection()); 919 ++n; 920 } 921 this.logger.trace("Added {0} connection(s) to {1} in undersized pool", n, getSourceName()); 922 return n; 923 } 924 925 protected class ConnectionWrapper implements RepositoryConnection { 926 927 private final RepositoryConnection original; 928 private final long timeCreated; 929 private long lastUsed; 930 private boolean closed = false; 931 932 protected ConnectionWrapper( RepositoryConnection connection ) { 933 assert connection != null; 934 this.original = connection; 935 this.timeCreated = System.currentTimeMillis(); 936 } 937 938 /** 939 * @return original 940 */ 941 protected RepositoryConnection getOriginal() { 942 return this.original; 943 } 944 945 /** 946 * @return lastUsed 947 */ 948 public long getTimeLastUsed() { 949 return this.lastUsed; 950 } 951 952 /** 953 * @return timeCreated 954 */ 955 public long getTimeCreated() { 956 return this.timeCreated; 957 } 958 959 /** 960 * {@inheritDoc} 961 */ 962 public String getSourceName() { 963 return this.original.getSourceName(); 964 } 965 966 /** 967 * {@inheritDoc} 968 */ 969 public XAResource getXAResource() { 970 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 971 return this.original.getXAResource(); 972 } 973 974 /** 975 * {@inheritDoc} 976 */ 977 public CachePolicy getDefaultCachePolicy() { 978 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 979 return this.original.getDefaultCachePolicy(); 980 } 981 982 /** 983 * {@inheritDoc} 984 * 985 * @see org.jboss.dna.graph.connector.RepositoryConnection#execute(org.jboss.dna.graph.ExecutionContext, 986 * org.jboss.dna.graph.request.Request) 987 */ 988 public void execute( ExecutionContext context, 989 Request request ) throws RepositorySourceException { 990 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 991 this.original.execute(context, request); 992 } 993 994 /** 995 * {@inheritDoc} 996 */ 997 public boolean ping( long time, 998 TimeUnit unit ) throws InterruptedException { 999 if (closed) throw new IllegalStateException(GraphI18n.closedConnectionMayNotBeUsed.text()); 1000 return this.original.ping(time, unit); 1001 } 1002 1003 /** 1004 * {@inheritDoc} 1005 */ 1006 public void close() { 1007 if (!closed) { 1008 this.lastUsed = System.currentTimeMillis(); 1009 this.original.close(); 1010 this.closed = true; 1011 returnConnection(this); 1012 } 1013 } 1014 1015 /** 1016 * {@inheritDoc} 1017 */ 1018 public void setListener( RepositorySourceListener listener ) { 1019 if (!closed) this.original.setListener(listener); 1020 } 1021 1022 } 1023 1024 }