001 /* 002 * JBoss, Home of Professional Open Source. 003 * Copyright 2008, Red Hat Middleware LLC, and individual contributors 004 * as indicated by the @author tags. See the copyright.txt file in the 005 * distribution for a full listing of individual contributors. 006 * 007 * This is free software; you can redistribute it and/or modify it 008 * under the terms of the GNU Lesser General Public License as 009 * published by the Free Software Foundation; either version 2.1 of 010 * the License, or (at your option) any later version. 011 * 012 * This software is distributed in the hope that it will be useful, 013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 015 * Lesser General Public License for more details. 016 * 017 * You should have received a copy of the GNU Lesser General Public 018 * License along with this software; if not, write to the Free 019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 021 */ 022 package org.jboss.dna.repository; 023 024 import java.util.Collection; 025 import java.util.Collections; 026 import java.util.HashSet; 027 import java.util.LinkedList; 028 import java.util.List; 029 import java.util.Set; 030 import java.util.concurrent.CopyOnWriteArrayList; 031 import java.util.concurrent.TimeUnit; 032 import java.util.concurrent.locks.ReadWriteLock; 033 import java.util.concurrent.locks.ReentrantReadWriteLock; 034 import net.jcip.annotations.ThreadSafe; 035 import org.jboss.dna.common.util.CheckArg; 036 import org.jboss.dna.graph.BasicExecutionContextFactory; 037 import org.jboss.dna.graph.ExecutionContext; 038 import org.jboss.dna.graph.ExecutionContextFactory; 039 import org.jboss.dna.graph.connectors.RepositoryConnection; 040 import org.jboss.dna.graph.connectors.RepositoryConnectionFactory; 041 import org.jboss.dna.graph.connectors.RepositoryConnectionPool; 042 import org.jboss.dna.graph.connectors.RepositoryContext; 043 import org.jboss.dna.graph.connectors.RepositorySource; 044 import org.jboss.dna.repository.services.AbstractServiceAdministrator; 045 import org.jboss.dna.repository.services.ServiceAdministrator; 046 047 /** 048 * A library of {@link RepositorySource} instances and the {@link RepositoryConnectionPool} used to manage the connections for 049 * each. 050 * 051 * @author Randall Hauch 052 */ 053 @ThreadSafe 054 public class RepositoryLibrary implements RepositoryConnectionFactory { 055 056 /** 057 * The administrative component for this service. 058 * 059 * @author Randall Hauch 060 */ 061 protected class Administrator extends AbstractServiceAdministrator { 062 063 protected Administrator() { 064 super(RepositoryI18n.federationServiceName, State.STARTED); 065 } 066 067 /** 068 * {@inheritDoc} 069 */ 070 @Override 071 protected void doStart( State fromState ) { 072 super.doStart(fromState); 073 RepositoryLibrary.this.start(); 074 } 075 076 /** 077 * {@inheritDoc} 078 */ 079 @Override 080 protected void doShutdown( State fromState ) { 081 super.doShutdown(fromState); 082 RepositoryLibrary.this.shutdown(); 083 } 084 085 /** 086 * {@inheritDoc} 087 */ 088 public boolean awaitTermination( long timeout, 089 TimeUnit unit ) throws InterruptedException { 090 return RepositoryLibrary.this.awaitTermination(timeout, unit); 091 } 092 093 /** 094 * {@inheritDoc} 095 */ 096 @Override 097 protected boolean doCheckIsTerminated() { 098 return RepositoryLibrary.this.isTerminated(); 099 } 100 101 } 102 103 private final ServiceAdministrator administrator = new Administrator(); 104 private final ReadWriteLock sourcesLock = new ReentrantReadWriteLock(); 105 private final CopyOnWriteArrayList<RepositoryConnectionPool> pools = new CopyOnWriteArrayList<RepositoryConnectionPool>(); 106 private RepositoryConnectionFactory delegate; 107 private final ExecutionContextFactory executionContextFactory; 108 private final RepositoryContext repositoryContext; 109 110 /** 111 * Create a new manager instance. 112 */ 113 public RepositoryLibrary() { 114 this(new BasicExecutionContextFactory(), null); 115 } 116 117 /** 118 * Create a new manager instance. 119 * 120 * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in 121 * this manager; may be null if there is no delegate 122 */ 123 public RepositoryLibrary( RepositoryConnectionFactory delegate ) { 124 this(new BasicExecutionContextFactory(), delegate); 125 } 126 127 /** 128 * Create a new manager instance. 129 * 130 * @param executionContextFactory the execution context factory, used by sources to create {@link ExecutionContext} instances 131 * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null 132 */ 133 public RepositoryLibrary( ExecutionContextFactory executionContextFactory ) { 134 this(executionContextFactory, null); 135 } 136 137 /** 138 * Create a new manager instance. 139 * 140 * @param executionContextFactory the execution context factory, used by sources to create {@link ExecutionContext} instances 141 * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in 142 * this manager; may be null if there is no delegate 143 * @throws IllegalArgumentException if the <code>executionContextFactory</code> reference is null 144 */ 145 public RepositoryLibrary( ExecutionContextFactory executionContextFactory, 146 RepositoryConnectionFactory delegate ) { 147 CheckArg.isNotNull(executionContextFactory, "executionContextFactory"); 148 this.delegate = delegate; 149 this.executionContextFactory = executionContextFactory; 150 this.repositoryContext = new RepositoryContext() { 151 /** 152 * {@inheritDoc} 153 * 154 * @see org.jboss.dna.graph.connectors.RepositoryContext#getExecutionContextFactory() 155 */ 156 public ExecutionContextFactory getExecutionContextFactory() { 157 return RepositoryLibrary.this.getExecutionContextFactory(); 158 } 159 160 /** 161 * {@inheritDoc} 162 * 163 * @see org.jboss.dna.graph.connectors.RepositoryContext#getRepositoryConnectionFactory() 164 */ 165 public RepositoryConnectionFactory getRepositoryConnectionFactory() { 166 return RepositoryLibrary.this; 167 } 168 }; 169 } 170 171 /** 172 * @return executionContextFactory 173 */ 174 public ExecutionContextFactory getExecutionContextFactory() { 175 return executionContextFactory; 176 } 177 178 /** 179 * Get the delegate connection factory. 180 * 181 * @return the connection factory to which this instance should delegate in the event that a source is not found in this 182 * manager, or null if there is no delegate 183 */ 184 public RepositoryConnectionFactory getDelegate() { 185 return delegate; 186 } 187 188 /** 189 * Set the delegate connection factory. 190 * 191 * @param delegate the connection factory to which this instance should delegate in the event that a source is not found in 192 * this manager; may be null if there is no delegate 193 */ 194 public void setDelegate( RepositoryConnectionFactory delegate ) { 195 this.delegate = delegate; 196 } 197 198 /** 199 * @return administrator 200 */ 201 public ServiceAdministrator getAdministrator() { 202 return this.administrator; 203 } 204 205 /** 206 * Utility method called by the administrator. 207 */ 208 protected void start() { 209 // Do not establish connections to the pools; these will be established as needed 210 211 } 212 213 /** 214 * Utility method called by the administrator. 215 */ 216 protected void shutdown() { 217 // Close all connections to the pools. This is done inside the pools write lock. 218 try { 219 this.sourcesLock.readLock().lock(); 220 for (RepositoryConnectionPool pool : this.pools) { 221 pool.shutdown(); 222 } 223 } finally { 224 this.sourcesLock.readLock().unlock(); 225 } 226 } 227 228 /** 229 * Utility method called by the administrator. 230 * 231 * @param timeout 232 * @param unit 233 * @return true if all pools were terminated in the supplied time (or were already terminated), or false if the timeout 234 * occurred before all the connections were closed 235 * @throws InterruptedException 236 */ 237 protected boolean awaitTermination( long timeout, 238 TimeUnit unit ) throws InterruptedException { 239 // Check whether all source pools are shut down. This is done inside the pools write lock. 240 try { 241 this.sourcesLock.readLock().lock(); 242 for (RepositoryConnectionPool pool : this.pools) { 243 if (!pool.awaitTermination(timeout, unit)) return false; 244 } 245 return true; 246 } finally { 247 this.sourcesLock.readLock().unlock(); 248 } 249 } 250 251 /** 252 * Returns true if this federated repository is in the process of terminating after {@link ServiceAdministrator#shutdown()} 253 * has been called on the {@link #getAdministrator() administrator}, but the federated repository has connections that have 254 * not yet normally been {@link RepositoryConnection#close() closed}. This method may be useful for debugging. A return of 255 * <tt>true</tt> reported a sufficient period after shutdown may indicate that connection users have ignored or suppressed 256 * interruption, causing this repository not to properly terminate. 257 * 258 * @return true if terminating but not yet terminated, or false otherwise 259 * @see #isTerminated() 260 */ 261 public boolean isTerminating() { 262 try { 263 this.sourcesLock.readLock().lock(); 264 for (RepositoryConnectionPool pool : this.pools) { 265 if (pool.isTerminating()) return true; 266 } 267 return false; 268 } finally { 269 this.sourcesLock.readLock().unlock(); 270 } 271 } 272 273 /** 274 * Return true if this federated repository has completed its termination and no longer has any open connections. 275 * 276 * @return true if terminated, or false otherwise 277 * @see #isTerminating() 278 */ 279 public boolean isTerminated() { 280 try { 281 this.sourcesLock.readLock().lock(); 282 for (RepositoryConnectionPool pool : this.pools) { 283 if (!pool.isTerminated()) return false; 284 } 285 return true; 286 } finally { 287 this.sourcesLock.readLock().unlock(); 288 } 289 } 290 291 /** 292 * Get an unmodifiable collection of {@link RepositorySource} names. 293 * 294 * @return the pools 295 */ 296 public Collection<String> getSourceNames() { 297 Set<String> sourceNames = new HashSet<String>(); 298 for (RepositoryConnectionPool pool : this.pools) { 299 sourceNames.add(pool.getRepositorySource().getName()); 300 } 301 return Collections.unmodifiableCollection(sourceNames); 302 } 303 304 /** 305 * Get an unmodifiable collection of {@link RepositorySource} instances managed by this instance. 306 * 307 * @return the pools 308 */ 309 public Collection<RepositorySource> getSources() { 310 List<RepositorySource> sources = new LinkedList<RepositorySource>(); 311 for (RepositoryConnectionPool pool : this.pools) { 312 sources.add(pool.getRepositorySource()); 313 } 314 return Collections.unmodifiableCollection(sources); 315 } 316 317 /** 318 * Get the RepositorySource with the specified name managed by this instance. 319 * 320 * @param sourceName the name of the source 321 * @return the source, or null if no such source exists in this instance 322 */ 323 public RepositorySource getSource( String sourceName ) { 324 try { 325 this.sourcesLock.readLock().lock(); 326 for (RepositoryConnectionPool existingPool : this.pools) { 327 RepositorySource source = existingPool.getRepositorySource(); 328 if (source.getName().equals(sourceName)) return source; 329 } 330 } finally { 331 this.sourcesLock.readLock().unlock(); 332 } 333 return null; 334 } 335 336 /** 337 * Get the connection pool managing the {@link RepositorySource} with the specified name managed by this instance. 338 * 339 * @param sourceName the name of the source 340 * @return the pool, or null if no such pool exists in this instance 341 */ 342 public RepositoryConnectionPool getConnectionPool( String sourceName ) { 343 try { 344 this.sourcesLock.readLock().lock(); 345 for (RepositoryConnectionPool existingPool : this.pools) { 346 RepositorySource source = existingPool.getRepositorySource(); 347 if (source.getName().equals(sourceName)) return existingPool; 348 } 349 } finally { 350 this.sourcesLock.readLock().unlock(); 351 } 352 return null; 353 } 354 355 /** 356 * Add the supplied federated source. This method returns false if the source is null. 357 * 358 * @param source the source to add 359 * @return true if the source is added, or false if the reference is null or if there is already an existing source with the 360 * supplied name. 361 */ 362 public boolean addSource( RepositorySource source ) { 363 if (source == null) return false; 364 try { 365 this.sourcesLock.writeLock().lock(); 366 for (RepositoryConnectionPool existingPool : this.pools) { 367 if (existingPool.getRepositorySource().getName().equals(source.getName())) return false; 368 } 369 source.initialize(repositoryContext); 370 RepositoryConnectionPool pool = new RepositoryConnectionPool(source); 371 this.pools.add(pool); 372 return true; 373 } finally { 374 this.sourcesLock.writeLock().unlock(); 375 } 376 } 377 378 /** 379 * Remove from this federated repository the supplied source (or a source with the same name as that supplied). This call 380 * shuts down the connections in the source in an orderly fashion, allowing those connection currently in use to be used and 381 * closed normally, but preventing further connections from being used. 382 * <p> 383 * This method can safely be called while the federation repository is in use. 384 * </p> 385 * 386 * @param source the source to be removed 387 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call 388 * should not wait at all 389 * @param unit the time unit to be used for <code>timeToAwait</code> 390 * @return true if the source was removed, or false if the source was not a source for this repository. 391 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections 392 */ 393 public boolean removeSource( RepositorySource source, 394 long timeToAwait, 395 TimeUnit unit ) throws InterruptedException { 396 // Use the name; don't use the object equality ... 397 return removeSource(source.getName(), timeToAwait, unit) != null; 398 } 399 400 /** 401 * Remove from this federated repository the source with the supplied name. This call shuts down the connections in the source 402 * in an orderly fashion, allowing those connection currently in use to be used and closed normally, but preventing further 403 * connections from being used. 404 * 405 * @param name the name of the source to be removed 406 * @param timeToAwait the amount of time to wait while all of the source's connections are closed, or non-positive if the call 407 * should not wait at all 408 * @param unit the time unit to be used for <code>timeToAwait</code> 409 * @return the source with the supplied name that was removed, or null if no existing source matching the supplied name could 410 * be found 411 * @throws InterruptedException if the thread is interrupted while awaiting closing of the connections 412 */ 413 public RepositorySource removeSource( String name, 414 long timeToAwait, 415 TimeUnit unit ) throws InterruptedException { 416 try { 417 this.sourcesLock.writeLock().lock(); 418 for (RepositoryConnectionPool existingPool : this.pools) { 419 if (existingPool.getRepositorySource().getName().equals(name)) { 420 // Shut down the source ... 421 existingPool.shutdown(); 422 if (timeToAwait > 0L) existingPool.awaitTermination(timeToAwait, unit); 423 } 424 return existingPool.getRepositorySource(); 425 } 426 } finally { 427 this.sourcesLock.writeLock().unlock(); 428 } 429 return null; 430 } 431 432 /** 433 * {@inheritDoc} 434 * 435 * @see org.jboss.dna.graph.connectors.RepositoryConnectionFactory#createConnection(java.lang.String) 436 */ 437 public RepositoryConnection createConnection( String sourceName ) { 438 try { 439 this.sourcesLock.readLock().lock(); 440 for (RepositoryConnectionPool existingPool : this.pools) { 441 RepositorySource source = existingPool.getRepositorySource(); 442 if (source.getName().equals(sourceName)) return existingPool.getConnection(); 443 } 444 RepositoryConnectionFactory delegate = this.delegate; 445 if (delegate != null) { 446 return delegate.createConnection(sourceName); 447 } 448 } finally { 449 this.sourcesLock.readLock().unlock(); 450 } 451 return null; 452 } 453 }