001 /* 002 * JBoss DNA (http://www.jboss.org/dna) 003 * See the COPYRIGHT.txt file distributed with this work for information 004 * regarding copyright ownership. Some portions may be licensed 005 * to Red Hat, Inc. under one or more contributor license agreements. 006 * See the AUTHORS.txt file in the distribution for a full listing of 007 * individual contributors. 008 * 009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA 010 * is licensed to you under the terms of the GNU Lesser General Public License as 011 * published by the Free Software Foundation; either version 2.1 of 012 * the License, or (at your option) any later version. 013 * 014 * JBoss DNA is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 017 * Lesser General Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser General Public 020 * License along with this software; if not, write to the Free 021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 023 */ 024 package org.jboss.dna.connector.federation; 025 026 import java.util.ArrayList; 027 import java.util.Collection; 028 import java.util.Collections; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.Iterator; 032 import java.util.LinkedList; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.Set; 036 import java.util.UUID; 037 import java.util.concurrent.TimeUnit; 038 import net.jcip.annotations.Immutable; 039 import net.jcip.annotations.NotThreadSafe; 040 import org.jboss.dna.common.i18n.I18n; 041 import org.jboss.dna.common.util.CheckArg; 042 import org.jboss.dna.common.util.Logger; 043 import org.jboss.dna.connector.federation.contribution.Contribution; 044 import org.jboss.dna.connector.federation.merge.FederatedNode; 045 import org.jboss.dna.connector.federation.merge.MergePlan; 046 import org.jboss.dna.graph.DnaLexicon; 047 import org.jboss.dna.graph.ExecutionContext; 048 import org.jboss.dna.graph.JcrLexicon; 049 import org.jboss.dna.graph.Location; 050 import org.jboss.dna.graph.NodeConflictBehavior; 051 import org.jboss.dna.graph.cache.CachePolicy; 052 import org.jboss.dna.graph.connector.RepositoryConnection; 053 import org.jboss.dna.graph.connector.RepositoryConnectionFactory; 054 import org.jboss.dna.graph.connector.RepositorySource; 055 import org.jboss.dna.graph.connector.RepositorySourceException; 056 import org.jboss.dna.graph.property.DateTime; 057 import org.jboss.dna.graph.property.Name; 058 import org.jboss.dna.graph.property.Path; 059 import org.jboss.dna.graph.property.PathFactory; 060 import org.jboss.dna.graph.property.PathNotFoundException; 061 import org.jboss.dna.graph.property.Property; 062 import org.jboss.dna.graph.property.PropertyFactory; 063 import org.jboss.dna.graph.request.CloneWorkspaceRequest; 064 import org.jboss.dna.graph.request.CompositeRequest; 065 import org.jboss.dna.graph.request.CopyBranchRequest; 066 import org.jboss.dna.graph.request.CreateNodeRequest; 067 import org.jboss.dna.graph.request.CreateWorkspaceRequest; 068 import org.jboss.dna.graph.request.DeleteBranchRequest; 069 import org.jboss.dna.graph.request.DestroyWorkspaceRequest; 070 import org.jboss.dna.graph.request.GetWorkspacesRequest; 071 import org.jboss.dna.graph.request.InvalidWorkspaceException; 072 import org.jboss.dna.graph.request.MoveBranchRequest; 073 import org.jboss.dna.graph.request.ReadAllChildrenRequest; 074 import org.jboss.dna.graph.request.ReadAllPropertiesRequest; 075 import org.jboss.dna.graph.request.ReadNodeRequest; 076 import org.jboss.dna.graph.request.Request; 077 import org.jboss.dna.graph.request.UnsupportedRequestException; 078 import org.jboss.dna.graph.request.UpdatePropertiesRequest; 079 import org.jboss.dna.graph.request.VerifyWorkspaceRequest; 080 import org.jboss.dna.graph.request.processor.RequestProcessor; 081 082 /** 083 * @author Randall Hauch 084 */ 085 @NotThreadSafe 086 public class FederatingRequestProcessor extends RequestProcessor { 087 088 private static final Set<Name> HIDDEN_PROPERTIES = Collections.singleton(DnaLexicon.MERGE_PLAN); 089 090 private final Map<String, FederatedWorkspace> workspaces; 091 private final FederatedWorkspace defaultWorkspace; 092 private final RepositoryConnectionFactory connectionFactory; 093 /** The set of all connections, including the cache connection */ 094 private final Map<String, RepositoryConnection> connectionsBySourceName; 095 protected final PathFactory pathFactory; 096 private Logger logger; 097 098 /** 099 * Create a command executor that federates (merges) the information from multiple sources described by the source projections 100 * for the particular workspace specified by the request(s). The request processor will use the {@link Projection cache 101 * projection} of each {@link FederatedWorkspace workspace} to identify the {@link Projection#getSourceName() repository 102 * source} for the cache as well as the {@link Projection#getRules() rules} for how the paths are mapped in the cache. This 103 * cache will be consulted first for the requested information, and will be kept up to date as changes are made to the 104 * federated information. 105 * 106 * @param context the execution context in which the executor will be run; may not be null 107 * @param sourceName the name of the {@link RepositorySource} that is making use of this executor; may not be null or empty 108 * @param workspaces the configuration for each workspace, keyed by workspace name; may not be null 109 * @param defaultWorkspace the default workspace; null if there is no default 110 * @param connectionFactory the factory for {@link RepositoryConnection} instances 111 */ 112 public FederatingRequestProcessor( ExecutionContext context, 113 String sourceName, 114 Map<String, FederatedWorkspace> workspaces, 115 FederatedWorkspace defaultWorkspace, 116 RepositoryConnectionFactory connectionFactory ) { 117 super(sourceName, context); 118 CheckArg.isNotEmpty(workspaces, "workspaces"); 119 CheckArg.isNotNull(connectionFactory, "connectionFactory"); 120 this.workspaces = workspaces; 121 this.connectionFactory = connectionFactory; 122 this.logger = context.getLogger(getClass()); 123 this.connectionsBySourceName = new HashMap<String, RepositoryConnection>(); 124 this.defaultWorkspace = defaultWorkspace; // may be null 125 this.pathFactory = context.getValueFactories().getPathFactory(); 126 } 127 128 protected DateTime getCurrentTimeInUtc() { 129 return getExecutionContext().getValueFactories().getDateFactory().createUtc(); 130 } 131 132 /** 133 * {@inheritDoc} 134 * 135 * @see RequestProcessor#close() 136 */ 137 @Override 138 public void close() { 139 try { 140 super.close(); 141 } finally { 142 // Make sure to close ALL open connections ... 143 for (RepositoryConnection connection : connectionsBySourceName.values()) { 144 if (connection == null) continue; 145 try { 146 connection.close(); 147 } catch (Throwable t) { 148 logger.debug("Error while closing connection to {0}", connection.getSourceName()); 149 } 150 } 151 connectionsBySourceName.clear(); 152 } 153 } 154 155 protected RepositoryConnection getConnectionToCacheFor( FederatedWorkspace workspace ) throws RepositorySourceException { 156 return getConnection(workspace.getCacheProjection()); 157 } 158 159 protected RepositoryConnection getConnection( Projection projection ) throws RepositorySourceException { 160 String sourceName = projection.getSourceName(); 161 RepositoryConnection connection = connectionsBySourceName.get(sourceName); 162 if (connection == null) { 163 connection = connectionFactory.createConnection(sourceName); 164 connectionsBySourceName.put(sourceName, connection); 165 } 166 return connection; 167 } 168 169 protected Set<String> getOpenConnections() { 170 return connectionsBySourceName.keySet(); 171 } 172 173 /** 174 * Utility to obtain the federated workspace referenced by the request. This method supports using the default workspace if 175 * the workspace name is null. If no such workspace, the request is marked with an appropriate error. 176 * 177 * @param request the request; may not be null 178 * @param workspaceName the name of the workspace; may be null if the default workspace should be used 179 * @return the federated workspace, or null if none was found 180 */ 181 protected FederatedWorkspace getWorkspace( Request request, 182 String workspaceName ) { 183 FederatedWorkspace workspace = null; 184 if (workspaceName == null) { 185 if (defaultWorkspace != null) return defaultWorkspace; 186 // There is no default, so record the error ... 187 String msg = FederationI18n.noDefaultWorkspace.text(getSourceName()); 188 request.setError(new InvalidWorkspaceException(msg)); 189 } 190 workspace = workspaces.get(workspaceName); 191 if (workspace == null) { 192 // There is no workspace with this name, so record an error ... 193 String msg = FederationI18n.workspaceDoesNotExist.text(getSourceName(), workspaceName); 194 request.setError(new InvalidWorkspaceException(msg)); 195 } 196 return workspace; 197 } 198 199 /** 200 * {@inheritDoc} 201 * 202 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadAllChildrenRequest) 203 */ 204 @Override 205 public void process( ReadAllChildrenRequest request ) { 206 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 207 if (workspace == null) return; 208 ReadNodeRequest nodeInfo = getNode(request.of(), workspace); 209 if (nodeInfo.hasError()) return; 210 for (Location child : nodeInfo.getChildren()) { 211 request.addChild(child); 212 } 213 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 214 } 215 216 /** 217 * {@inheritDoc} 218 * 219 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadAllPropertiesRequest) 220 */ 221 @Override 222 public void process( ReadAllPropertiesRequest request ) { 223 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 224 if (workspace == null) return; 225 ReadNodeRequest nodeInfo = getNode(request.at(), workspace); 226 if (nodeInfo.hasError()) return; 227 for (Property property : nodeInfo.getProperties()) { 228 if (HIDDEN_PROPERTIES.contains(property.getName())) continue; 229 request.addProperty(property); 230 } 231 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 232 } 233 234 /** 235 * {@inheritDoc} 236 * 237 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadNodeRequest) 238 */ 239 @Override 240 public void process( ReadNodeRequest request ) { 241 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 242 if (workspace == null) return; 243 ReadNodeRequest nodeInfo = getNode(request.at(), workspace); 244 if (nodeInfo.hasError()) return; 245 for (Property property : nodeInfo.getProperties()) { 246 if (HIDDEN_PROPERTIES.contains(property.getName())) continue; 247 request.addProperty(property); 248 } 249 for (Location child : nodeInfo.getChildren()) { 250 request.addChild(child); 251 } 252 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 253 } 254 255 /** 256 * {@inheritDoc} 257 * 258 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CreateNodeRequest) 259 */ 260 @Override 261 public void process( CreateNodeRequest request ) { 262 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 263 if (workspace == null) return; 264 265 // Can push this down if and only if the entire request is within a single federated source ... 266 SingleProjection projection = asSingleProjection(workspace, request.under(), request); 267 if (projection == null) return; 268 269 // Push down the request ... 270 Location parentLocation = Location.create(projection.pathInSource); 271 String workspaceName = projection.projection.getWorkspaceName(); 272 CreateNodeRequest sourceRequest = new CreateNodeRequest(parentLocation, workspaceName, request.named(), 273 request.properties()); 274 execute(sourceRequest, projection.projection); 275 276 // Copy/transform the results ... 277 if (sourceRequest.hasError()) { 278 request.setError(sourceRequest.getError()); 279 } else { 280 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode())); 281 } 282 } 283 284 /** 285 * {@inheritDoc} 286 * 287 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.DeleteBranchRequest) 288 */ 289 @Override 290 public void process( DeleteBranchRequest request ) { 291 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 292 if (workspace == null) return; 293 294 // Can push this down if and only if the entire request is within a single federated source ... 295 SingleProjection projection = asSingleProjection(workspace, request.at(), request); 296 if (projection == null) return; 297 298 // Push down the request ... 299 Location sourceLocation = Location.create(projection.pathInSource); 300 String workspaceName = projection.projection.getWorkspaceName(); 301 DeleteBranchRequest sourceRequest = new DeleteBranchRequest(sourceLocation, workspaceName); 302 execute(sourceRequest, projection.projection); 303 304 // Copy/transform the results ... 305 if (sourceRequest.hasError()) { 306 request.setError(sourceRequest.getError()); 307 } else { 308 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode())); 309 } 310 311 // Delete in the cache ... 312 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.at(), workspace.getCacheProjection() 313 .getWorkspaceName()); 314 executeInCache(cacheRequest, workspace); 315 } 316 317 /** 318 * {@inheritDoc} 319 * 320 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CopyBranchRequest) 321 */ 322 @Override 323 public void process( CopyBranchRequest request ) { 324 FederatedWorkspace fromWorkspace = getWorkspace(request, request.fromWorkspace()); 325 if (fromWorkspace == null) return; 326 FederatedWorkspace intoWorkspace = getWorkspace(request, request.intoWorkspace()); 327 if (intoWorkspace == null) return; 328 if (!fromWorkspace.equals(intoWorkspace)) { 329 // Otherwise there wasn't a single projection with a single path ... 330 String msg = FederationI18n.unableToPerformOperationSpanningWorkspaces.text(fromWorkspace.getName(), 331 intoWorkspace.getName()); 332 request.setError(new UnsupportedRequestException(msg)); 333 } 334 335 // Can push this down if and only if the entire request is within a single federated source ... 336 SingleProjection fromProjection = asSingleProjection(fromWorkspace, request.from(), request); 337 if (fromProjection == null) return; 338 SingleProjection intoProjection = asSingleProjection(intoWorkspace, request.into(), request); 339 if (intoProjection == null) return; 340 if (!intoProjection.projection.equals(fromProjection.projection)) { 341 // Otherwise there wasn't a single projection with a single path ... 342 String msg = FederationI18n.unableToPerformOperationUnlessLocationsAreFromSingleProjection.text(request.from(), 343 request.into(), 344 fromWorkspace.getName(), 345 fromProjection.projection.getRules(), 346 intoProjection.projection.getRules()); 347 request.setError(new UnsupportedRequestException(msg)); 348 } 349 350 // Push down the request ... 351 Location fromLocation = Location.create(fromProjection.pathInSource); 352 Location intoLocation = Location.create(intoProjection.pathInSource); 353 String workspaceName = fromProjection.projection.getWorkspaceName(); 354 CopyBranchRequest sourceRequest = new CopyBranchRequest(fromLocation, workspaceName, intoLocation, workspaceName, 355 request.desiredName(), request.conflictBehavior()); 356 execute(sourceRequest, fromProjection.projection); 357 358 // Copy/transform the results ... 359 if (sourceRequest.hasError()) { 360 request.setError(sourceRequest.getError()); 361 } else { 362 request.setActualLocations(fromProjection.convertToRepository(sourceRequest.getActualLocationBefore()), 363 intoProjection.convertToRepository(sourceRequest.getActualLocationAfter())); 364 } 365 366 // Delete from the cache the parent of the new location ... 367 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.into(), fromWorkspace.getCacheProjection() 368 .getWorkspaceName()); 369 executeInCache(cacheRequest, fromWorkspace); 370 } 371 372 /** 373 * {@inheritDoc} 374 * 375 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.MoveBranchRequest) 376 */ 377 @Override 378 public void process( MoveBranchRequest request ) { 379 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 380 if (workspace == null) return; 381 382 // Can push this down if and only if the entire request is within a single federated source ... 383 SingleProjection fromProjection = asSingleProjection(workspace, request.from(), request); 384 if (fromProjection == null) return; 385 SingleProjection intoProjection = asSingleProjection(workspace, request.into(), request); 386 if (intoProjection == null) return; 387 if (!intoProjection.projection.equals(fromProjection.projection)) { 388 // Otherwise there wasn't a single projection with a single path ... 389 String msg = FederationI18n.unableToPerformOperationUnlessLocationsAreFromSingleProjection.text(request.from(), 390 request.into(), 391 workspace.getName(), 392 fromProjection.projection.getRules(), 393 intoProjection.projection.getRules()); 394 request.setError(new UnsupportedRequestException(msg)); 395 } 396 397 // Push down the request ... 398 Location fromLocation = Location.create(fromProjection.pathInSource); 399 Location intoLocation = Location.create(intoProjection.pathInSource); 400 String workspaceName = fromProjection.projection.getWorkspaceName(); 401 MoveBranchRequest sourceRequest = new MoveBranchRequest(fromLocation, intoLocation, workspaceName, request.desiredName(), 402 request.conflictBehavior()); 403 execute(sourceRequest, fromProjection.projection); 404 405 // Copy/transform the results ... 406 if (sourceRequest.hasError()) { 407 request.setError(sourceRequest.getError()); 408 } else { 409 request.setActualLocations(fromProjection.convertToRepository(sourceRequest.getActualLocationBefore()), 410 intoProjection.convertToRepository(sourceRequest.getActualLocationAfter())); 411 } 412 // Delete from the cache ... 413 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.from(), workspace.getCacheProjection() 414 .getWorkspaceName()); 415 executeInCache(cacheRequest, workspace); 416 } 417 418 /** 419 * {@inheritDoc} 420 * 421 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.UpdatePropertiesRequest) 422 */ 423 @Override 424 public void process( UpdatePropertiesRequest request ) { 425 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 426 if (workspace == null) return; 427 428 // Can push this down if and only if the entire request is within a single federated source ... 429 SingleProjection projection = asSingleProjection(workspace, request.on(), request); 430 if (projection == null) return; 431 432 // Push down the request ... 433 Location sourceLocation = Location.create(projection.pathInSource); 434 String workspaceName = projection.projection.getWorkspaceName(); 435 UpdatePropertiesRequest sourceRequest = new UpdatePropertiesRequest(sourceLocation, workspaceName, request.properties()); 436 execute(sourceRequest, projection.projection); 437 438 // Copy/transform the results ... 439 if (sourceRequest.hasError()) { 440 request.setError(sourceRequest.getError()); 441 } else { 442 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode())); 443 } 444 445 // Update the cache ... 446 UpdatePropertiesRequest cacheRequest = new UpdatePropertiesRequest(request.on(), workspace.getCacheProjection() 447 .getWorkspaceName(), 448 request.properties()); 449 executeInCache(cacheRequest, workspace); 450 } 451 452 /** 453 * {@inheritDoc} 454 * 455 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.VerifyWorkspaceRequest) 456 */ 457 @Override 458 public void process( VerifyWorkspaceRequest request ) { 459 FederatedWorkspace workspace = getWorkspace(request, request.workspaceName()); 460 if (workspace != null) { 461 request.setActualWorkspaceName(workspace.getName()); 462 Location root = Location.create(pathFactory.createRootPath()); 463 ReadNodeRequest nodeInfo = getNode(root, workspace); 464 if (nodeInfo.hasError()) return; 465 request.setActualRootLocation(nodeInfo.getActualLocationOfNode()); 466 } 467 } 468 469 /** 470 * {@inheritDoc} 471 * 472 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.GetWorkspacesRequest) 473 */ 474 @Override 475 public void process( GetWorkspacesRequest request ) { 476 request.setAvailableWorkspaceNames(workspaces.keySet()); 477 super.setCacheableInfo(request); 478 } 479 480 /** 481 * {@inheritDoc} 482 * 483 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CreateWorkspaceRequest) 484 */ 485 @Override 486 public void process( CreateWorkspaceRequest request ) { 487 throw new UnsupportedOperationException(); 488 } 489 490 /** 491 * {@inheritDoc} 492 * 493 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CloneWorkspaceRequest) 494 */ 495 @Override 496 public void process( CloneWorkspaceRequest request ) { 497 throw new UnsupportedOperationException(); 498 } 499 500 /** 501 * {@inheritDoc} 502 * 503 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.DestroyWorkspaceRequest) 504 */ 505 @Override 506 public void process( DestroyWorkspaceRequest request ) { 507 throw new UnsupportedOperationException(); 508 } 509 510 @Immutable 511 protected class SingleProjection { 512 protected final Projection projection; 513 protected final Path pathInSource; 514 protected final Location federatedLocation; 515 516 protected SingleProjection( Projection projection, 517 Path pathInSource, 518 Location federatedLocation ) { 519 this.projection = projection; 520 this.federatedLocation = federatedLocation; 521 this.pathInSource = pathInSource; 522 } 523 524 protected Location convertToRepository( Location sourceLocation ) { 525 assert sourceLocation != null; 526 if (sourceLocation.hasPath()) { 527 Set<Path> paths = projection.getPathsInRepository(sourceLocation.getPath(), pathFactory); 528 assert paths.size() == 1; 529 return sourceLocation.with(paths.iterator().next()); 530 } 531 return sourceLocation; 532 } 533 } 534 535 protected SingleProjection asSingleProjection( FederatedWorkspace federatedWorkspace, 536 Location location, 537 Request request ) { 538 // Check the cache for this location ... 539 ReadNodeRequest nodeInfo = getNode(location, federatedWorkspace); 540 if (nodeInfo.hasError()) { 541 request.setError(nodeInfo.getError()); 542 return null; 543 } 544 Location actualLocation = nodeInfo.getActualLocationOfNode(); 545 Path pathInRepository = actualLocation.getPath(); 546 assert pathInRepository != null; 547 548 // Get the merge plan for the node ... 549 MergePlan plan = getMergePlan(nodeInfo); 550 assert plan != null; 551 if (plan.getRealContributionCount() == 1) { 552 for (Contribution contribution : plan) { 553 if (contribution.isEmpty() || contribution.isPlaceholder()) continue; 554 for (Projection projection : federatedWorkspace.getProjectionsFor(contribution.getSourceName())) { 555 Set<Path> paths = projection.getPathsInSource(pathInRepository, pathFactory); 556 if (paths.size() == 1) { 557 return new SingleProjection(projection, paths.iterator().next(), actualLocation); 558 } 559 } 560 } 561 } 562 563 // Otherwise there wasn't a single projection with a single path ... 564 StringBuilder projections = new StringBuilder(); 565 boolean first = true; 566 for (Contribution contribution : plan) { 567 if (contribution.isPlaceholder() || contribution.isEmpty()) continue; 568 if (first) first = false; 569 else projections.append(", "); 570 for (Projection projection : federatedWorkspace.getProjectionsFor(contribution.getSourceName())) { 571 Set<Path> paths = projection.getPathsInSource(pathInRepository, pathFactory); 572 if (paths.size() == 1) { 573 projections.append(FederationI18n.pathInProjection.text(paths.iterator().next(), projection.getRules())); 574 } else { 575 projections.append(FederationI18n.pathInProjection.text(paths, projection.getRules())); 576 } 577 } 578 } 579 String msg = FederationI18n.unableToPerformOperationUnlessLocationIsFromSingleProjection.text(location, 580 federatedWorkspace.getName(), 581 projections); 582 request.setError(new UnsupportedRequestException(msg)); 583 return null; 584 } 585 586 protected void execute( Request request, 587 Projection projection ) { 588 RepositoryConnection connection = getConnection(projection); 589 connection.execute(getExecutionContext(), request); 590 // Don't need to close, as we'll close all connections when this processor is closed 591 } 592 593 protected void executeInCache( Request request, 594 FederatedWorkspace workspace ) { 595 RepositoryConnection connection = getConnectionToCacheFor(workspace); 596 connection.execute(getExecutionContext(), request); 597 // Don't need to close, as we'll close all connections when this processor is closed 598 } 599 600 /** 601 * Get the node information from the underlying sources or, if possible, from the cache. 602 * 603 * @param location the location of the node to be returned 604 * @param workspace the federated workspace configuration; may be null 605 * @return the node information 606 * @throws RepositorySourceException 607 */ 608 protected ReadNodeRequest getNode( Location location, 609 FederatedWorkspace workspace ) throws RepositorySourceException { 610 // Check the cache first ... 611 ReadNodeRequest fromCache = new ReadNodeRequest(location, workspace.getCacheProjection().getWorkspaceName()); 612 executeInCache(fromCache, workspace); 613 614 // Look at the cache results from the cache for problems, or if found a plan in the cache look 615 // at the contributions. We'll be putting together the set of source names for which we need to 616 // get the contributions. 617 Set<String> sourceNames = null; 618 List<Contribution> contributions = new LinkedList<Contribution>(); 619 620 if (fromCache.hasError()) { 621 Throwable error = fromCache.getError(); 622 if (!(error instanceof PathNotFoundException)) return fromCache; 623 624 // The path was not found in the cache, so since we don't know whether the ancestors are federated 625 // from multiple source nodes, we need to populate the cache starting with the lowest ancestor 626 // that already exists in the cache. 627 PathNotFoundException notFound = (PathNotFoundException)fromCache.getError(); 628 Path lowestExistingAncestor = notFound.getLowestAncestorThatDoesExist(); 629 if (location.hasPath()) { 630 Path path = location.getPath(); 631 Path ancestor = path.getParent(); 632 if (!ancestor.equals(lowestExistingAncestor)) { 633 // Load the nodes along the path below the existing ancestor, down to (but excluding) the desired path 634 Path pathToLoad = ancestor; 635 while (!pathToLoad.equals(lowestExistingAncestor)) { 636 Location locationToLoad = Location.create(pathToLoad); 637 loadContributionsFromSources(locationToLoad, workspace, null, contributions); // sourceNames may be 638 // null or empty 639 FederatedNode mergedNode = createFederatedNode(locationToLoad, workspace, fromCache, contributions, true); 640 if (mergedNode == null) { 641 // No source had a contribution ... 642 I18n msg = FederationI18n.nodeDoesNotExistAtPath; 643 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(path, ancestor))); 644 return fromCache; 645 } 646 MergePlan mergePlan = mergedNode.getMergePlan(); 647 if (mergePlan != null) { 648 Property mergePlanProperty = getExecutionContext().getPropertyFactory().create(DnaLexicon.MERGE_PLAN, 649 (Object)mergePlan); 650 fromCache.addProperty(mergePlanProperty); 651 } 652 contributions.clear(); 653 // Move to the next child along the path ... 654 pathToLoad = pathToLoad.getParent(); 655 } 656 } 657 658 } 659 660 // At this point, all ancestors exist ... 661 } else { 662 // There is no error, so look for the merge plan ... 663 MergePlan mergePlan = getMergePlan(fromCache); 664 if (mergePlan != null) { 665 // We found the merge plan, so check whether it's still valid ... 666 final DateTime now = getCurrentTimeInUtc(); 667 if (!mergePlan.isExpired(now)) { 668 // It is still valid, so check whether any contribution is from a non-existant projection ... 669 for (Contribution contribution : mergePlan) { 670 if (!workspace.contains(contribution.getSourceName(), contribution.getWorkspaceName())) { 671 // TODO: Record that the cached contribution is from a source that is no longer in this repository 672 } 673 } 674 return fromCache; 675 } 676 677 // At least one of the contributions is expired, so go through the contributions and place 678 // the valid contributions in the 'contributions' list; any expired contribution 679 // needs to be loaded by adding the name to the 'sourceNames' 680 if (mergePlan.getContributionCount() > 0) { 681 sourceNames = new HashSet<String>(); 682 for (Contribution contribution : mergePlan) { 683 if (contribution.isExpired(now)) { 684 sourceNames.add(contribution.getSourceName()); 685 contributions.add(contribution); 686 } 687 } 688 } 689 } 690 } 691 692 // Get the contributions from the sources given their names ... 693 location = fromCache.getActualLocationOfNode(); 694 if (location == null) { 695 // Not yet in the cache ... 696 location = fromCache.at(); 697 } 698 loadContributionsFromSources(location, workspace, sourceNames, contributions); // sourceNames may be null or empty 699 FederatedNode mergedNode = createFederatedNode(location, workspace, fromCache, contributions, true); 700 if (mergedNode == null) { 701 // No source had a contribution ... 702 if (location.hasPath()) { 703 Path ancestor = location.getPath().getParent(); 704 I18n msg = FederationI18n.nodeDoesNotExistAtPath; 705 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(location, ancestor))); 706 return fromCache; 707 } 708 I18n msg = FederationI18n.nodeDoesNotExistAtLocation; 709 fromCache.setError(new PathNotFoundException(location, null, msg.text(location))); 710 return fromCache; 711 } 712 return mergedNode; 713 } 714 715 protected FederatedNode createFederatedNode( Location location, 716 FederatedWorkspace federatedWorkspace, 717 ReadNodeRequest fromCache, 718 List<Contribution> contributions, 719 boolean updateCache ) throws RepositorySourceException { 720 assert location != null; 721 722 // If there are no contributions from any source ... 723 boolean foundNonEmptyContribution = false; 724 for (Contribution contribution : contributions) { 725 assert contribution != null; 726 if (!contribution.isEmpty()) { 727 foundNonEmptyContribution = true; 728 break; 729 } 730 } 731 if (!foundNonEmptyContribution) return null; 732 if (logger.isTraceEnabled()) { 733 logger.trace("Loaded {0} from sources, resulting in these contributions:", location); 734 int i = 0; 735 for (Contribution contribution : contributions) { 736 logger.trace(" {0} {1}", ++i, contribution); 737 } 738 } 739 740 // Create the node, and use the existing UUID if one is found in the cache ... 741 ExecutionContext context = getExecutionContext(); 742 assert context != null; 743 FederatedNode mergedNode = new FederatedNode(location, federatedWorkspace.getName()); 744 745 // Merge the results into a single set of results ... 746 assert contributions.size() > 0; 747 federatedWorkspace.getMergingStrategy().merge(mergedNode, contributions, context); 748 if (mergedNode.getCachePolicy() == null) { 749 mergedNode.setCachePolicy(federatedWorkspace.getCachePolicy()); 750 } 751 if (updateCache) { 752 // Place the results into the cache ... 753 updateCache(federatedWorkspace, mergedNode, fromCache); 754 } 755 // And return the results ... 756 return mergedNode; 757 } 758 759 /** 760 * Load the node at the supplied location from the sources with the supplied name, returning the information. This method 761 * always obtains the information from the sources and does not use or update the cache. 762 * 763 * @param location the location of the node that is to be loaded 764 * @param federatedWorkspace the federated workspace 765 * @param sourceNames the names of the sources from which contributions are to be loaded; may be empty or null if all 766 * contributions from all sources are to be loaded 767 * @param contributions the list into which the contributions are to be placed 768 * @throws RepositorySourceException 769 */ 770 protected void loadContributionsFromSources( Location location, 771 FederatedWorkspace federatedWorkspace, 772 Set<String> sourceNames, 773 List<Contribution> contributions ) throws RepositorySourceException { 774 // At this point, there is no merge plan, so read information from the sources ... 775 final ExecutionContext context = getExecutionContext(); 776 777 CachePolicy cachePolicy = federatedWorkspace.getCachePolicy(); 778 // If the location has no path, then we have to submit a request to ALL sources ... 779 if (!location.hasPath()) { 780 for (Projection projection : federatedWorkspace.getSourceProjections()) { 781 final String source = projection.getSourceName(); 782 final String workspace = projection.getSourceName(); 783 if (sourceNames != null && !sourceNames.contains(source)) continue; 784 final RepositoryConnection sourceConnection = getConnection(projection); 785 if (sourceConnection == null) continue; // No source exists by this name 786 // Submit the request ... 787 ReadNodeRequest request = new ReadNodeRequest(location, federatedWorkspace.getName()); 788 sourceConnection.execute(context, request); 789 if (request.hasError()) continue; 790 791 // Figure out how long we can cache this contribution ... 792 long minimumTimeToLive = Long.MAX_VALUE; 793 if (cachePolicy != null) { 794 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive()); 795 } 796 CachePolicy requestCachePolicy = request.getCachePolicy(); 797 if (requestCachePolicy != null) { 798 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive()); 799 } else { 800 // See if the source has a default policy ... 801 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy(); 802 if (sourceCachePolicy != null) { 803 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive()); 804 } 805 } 806 // The expiration time should be the smallest of the minimum TTL values ... 807 DateTime expirationTime = null; 808 if (minimumTimeToLive < Long.MAX_VALUE) { 809 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS); 810 } 811 812 // Convert the locations of the children (relative to the source) to be relative to this node 813 Contribution contribution = Contribution.create(source, 814 workspace, 815 request.getActualLocationOfNode(), 816 expirationTime, 817 request.getProperties(), 818 request.getChildren()); 819 contributions.add(contribution); 820 } 821 } 822 823 // Otherwise, we can do it by path and projections ... 824 Path path = location.getPath(); 825 for (Projection projection : federatedWorkspace.getSourceProjections()) { 826 final String source = projection.getSourceName(); 827 final String workspace = projection.getWorkspaceName(); 828 if (sourceNames != null && !sourceNames.contains(source)) continue; 829 final RepositoryConnection sourceConnection = getConnection(projection); 830 if (sourceConnection == null) continue; // No source exists by this name 831 // Get the cached information ... 832 DateTime expirationTime = null; 833 if (cachePolicy != null) { 834 expirationTime = getCurrentTimeInUtc().plus(cachePolicy.getTimeToLive(), TimeUnit.MILLISECONDS); 835 } 836 // Get the paths-in-source where we should fetch node contributions ... 837 Set<Path> pathsInSource = projection.getPathsInSource(path, pathFactory); 838 if (pathsInSource.isEmpty()) { 839 // The source has no contributions, but see whether the project exists BELOW this path. 840 // We do this by getting the top-level repository paths of the projection, and then 841 // use those to figure out the children of the nodes. 842 Contribution contribution = null; 843 List<Path> topLevelPaths = projection.getTopLevelPathsInRepository(pathFactory); 844 Location input = Location.create(path); 845 switch (topLevelPaths.size()) { 846 case 0: 847 break; 848 case 1: { 849 Path topLevelPath = topLevelPaths.iterator().next(); 850 if (path.isAncestorOf(topLevelPath)) { 851 Location child = Location.create(topLevelPath); 852 contribution = Contribution.createPlaceholder(source, workspace, input, expirationTime, child); 853 } 854 break; 855 } 856 default: { 857 // We assume that the top-level paths do not overlap ... 858 List<Location> children = new ArrayList<Location>(topLevelPaths.size()); 859 for (Path topLevelPath : topLevelPaths) { 860 if (path.isAncestorOf(topLevelPath)) { 861 children.add(Location.create(topLevelPath)); 862 } 863 } 864 if (children.size() > 0) { 865 contribution = Contribution.createPlaceholder(source, workspace, input, expirationTime, children); 866 } 867 } 868 } 869 if (contribution == null) contribution = Contribution.create(source, workspace, expirationTime); 870 contributions.add(contribution); 871 } else { 872 // There is at least one (real) contribution ... 873 874 // Get the contributions ... 875 final int numPaths = pathsInSource.size(); 876 if (numPaths == 1) { 877 Path pathInSource = pathsInSource.iterator().next(); 878 ReadNodeRequest fromSource = new ReadNodeRequest(Location.create(pathInSource), workspace); 879 sourceConnection.execute(getExecutionContext(), fromSource); 880 if (!fromSource.hasError()) { 881 Collection<Property> properties = fromSource.getProperties(); 882 List<Location> children = fromSource.getChildren(); 883 884 // Figure out how long we can cache this contribution ... 885 long minimumTimeToLive = Long.MAX_VALUE; 886 if (cachePolicy != null) { 887 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive()); 888 } 889 CachePolicy requestCachePolicy = fromSource.getCachePolicy(); 890 if (requestCachePolicy != null) { 891 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive()); 892 } else { 893 // See if the source has a default policy ... 894 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy(); 895 if (sourceCachePolicy != null) { 896 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive()); 897 } 898 } 899 // The expiration time should be the smallest of the minimum TTL values ... 900 expirationTime = null; 901 if (minimumTimeToLive < Long.MAX_VALUE) { 902 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS); 903 } 904 905 Location actualLocation = fromSource.getActualLocationOfNode(); 906 Contribution contribution = Contribution.create(source, 907 workspace, 908 actualLocation, 909 expirationTime, 910 properties, 911 children); 912 contributions.add(contribution); 913 } 914 } else { 915 List<Request> fromSourceCommands = new ArrayList<Request>(numPaths); 916 for (Path pathInSource : pathsInSource) { 917 fromSourceCommands.add(new ReadNodeRequest(Location.create(pathInSource), workspace)); 918 } 919 Request request = CompositeRequest.with(fromSourceCommands); 920 sourceConnection.execute(context, request); 921 for (Request requestObj : fromSourceCommands) { 922 ReadNodeRequest fromSource = (ReadNodeRequest)requestObj; 923 if (fromSource.hasError()) continue; 924 925 // Figure out how long we can cache this contribution ... 926 long minimumTimeToLive = Long.MAX_VALUE; 927 if (cachePolicy != null) { 928 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive()); 929 } 930 CachePolicy requestCachePolicy = fromSource.getCachePolicy(); 931 if (requestCachePolicy != null) { 932 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive()); 933 } else { 934 // See if the source has a default policy ... 935 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy(); 936 if (sourceCachePolicy != null) { 937 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive()); 938 } 939 } 940 // The expiration time should be the smallest of the minimum TTL values ... 941 expirationTime = null; 942 if (minimumTimeToLive < Long.MAX_VALUE) { 943 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS); 944 } 945 946 List<Location> children = fromSource.getChildren(); 947 Contribution contribution = Contribution.create(source, 948 workspace, 949 fromSource.getActualLocationOfNode(), 950 expirationTime, 951 fromSource.getProperties(), 952 children); 953 contributions.add(contribution); 954 } 955 } 956 } 957 } 958 } 959 960 protected MergePlan getMergePlan( ReadNodeRequest request ) { 961 Property mergePlanProperty = request.getPropertiesByName().get(DnaLexicon.MERGE_PLAN); 962 if (mergePlanProperty == null || mergePlanProperty.isEmpty()) { 963 return null; 964 } 965 Object value = mergePlanProperty.getValues().next(); 966 return value instanceof MergePlan ? (MergePlan)value : null; 967 } 968 969 protected void updateCache( FederatedWorkspace federatedWorkspace, 970 FederatedNode mergedNode, 971 ReadNodeRequest fromCache ) throws RepositorySourceException { 972 final ExecutionContext context = getExecutionContext(); 973 final Location location = mergedNode.at(); 974 final Path path = location.getPath(); 975 final String cacheWorkspace = federatedWorkspace.getCacheProjection().getWorkspaceName(); 976 assert path != null; 977 List<Request> requests = new ArrayList<Request>(); 978 Name childName = null; 979 980 // If the merged node has a merge plan, then add it to the properties if it is not already there ... 981 Map<Name, Property> properties = mergedNode.getPropertiesByName(); 982 MergePlan mergePlan = mergedNode.getMergePlan(); 983 if (mergePlan != null && !properties.containsKey(DnaLexicon.MERGE_PLAN)) { 984 // Record the merge plan on the merged node ... 985 Property mergePlanProperty = getExecutionContext().getPropertyFactory().create(DnaLexicon.MERGE_PLAN, 986 (Object)mergePlan); 987 properties.put(mergePlanProperty.getName(), mergePlanProperty); 988 } 989 990 // Make sure the UUID is being stored ... 991 PropertyFactory propertyFactory = getExecutionContext().getPropertyFactory(); 992 Property uuidProperty = properties.get(DnaLexicon.UUID); 993 if (uuidProperty == null) uuidProperty = properties.get(JcrLexicon.UUID); 994 if (uuidProperty == null) { 995 UUID uuid = mergedNode.at().getUuid(); 996 if (uuid == null) uuid = UUID.randomUUID(); 997 uuidProperty = propertyFactory.create(DnaLexicon.UUID, uuid); 998 properties.put(uuidProperty.getName(), uuidProperty); 999 } 1000 1001 // Have the children changed ... 1002 if (mergedNode.hasError() && !path.isRoot()) { 1003 // This is not the root node, so we need to create the node (or replace it if it exists) ... 1004 final Location parentLocation = Location.create(path.getParent()); 1005 childName = path.getLastSegment().getName(); 1006 requests.add(new CreateNodeRequest(parentLocation, cacheWorkspace, childName, NodeConflictBehavior.REPLACE, 1007 mergedNode.getProperties())); 1008 // logger.trace("Adding {0} to cache with properties {1}", location, properties); 1009 // Now create all of the children that this federated node knows of ... 1010 for (Location child : mergedNode.getChildren()) { 1011 childName = child.getPath().getLastSegment().getName(); 1012 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, child)); 1013 // logger.trace("Caching child of {0} named {1}", location, childName); 1014 } 1015 } else if (fromCache.getChildren().equals(mergedNode.getChildren())) { 1016 // Just update the properties ... 1017 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1018 // logger.trace("Updating cached properties on the root to {0}", properties); 1019 } else { 1020 // The children have changed, so figure out how ... 1021 if (fromCache.getChildren().isEmpty()) { 1022 // No children in the cache, so just update the properties of the node ... 1023 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1024 // logger.trace("Updating cached properties on {0} to {1}", location, properties); 1025 1026 // And create all of the children that this federated node knows of ... 1027 for (Location child : mergedNode.getChildren()) { 1028 childName = child.getPath().getLastSegment().getName(); 1029 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, child)); 1030 // logger.trace("Caching child of {0} named {1}", location, childName); 1031 } 1032 } else if (mergedNode.getChildren().isEmpty()) { 1033 // There were children in the cache but not in the merged node, so update the cached properties 1034 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1035 1036 // and delete all the children ... 1037 for (Location child : fromCache.getChildren()) { 1038 requests.add(new DeleteBranchRequest(child, cacheWorkspace)); 1039 // logger.trace("Removing {0} from cache", child); 1040 } 1041 } else { 1042 // There were children in the cache and in the merged node. The easy way is to just remove the 1043 // branch from the cache, the create it again ... 1044 if (path.isRoot()) { 1045 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1046 // logger.trace("Updating cached properties on {0} to {1}", location, properties); 1047 1048 // and delete all the children ... 1049 for (Location child : fromCache.getChildren()) { 1050 requests.add(new DeleteBranchRequest(child, cacheWorkspace)); 1051 // logger.trace("Removing child node {0} from cache", child); 1052 } 1053 1054 // Now create all of the children that this federated node knows of ... 1055 for (Location child : mergedNode.getChildren()) { 1056 childName = child.getPath().getLastSegment().getName(); 1057 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, 1058 child)); 1059 // logger.trace("Caching child of {0} named {1}", location, childName); 1060 } 1061 } else { 1062 requests.add(new DeleteBranchRequest(location, cacheWorkspace)); 1063 // logger.trace("Replacing node {0} from cache", location); 1064 1065 // This is not the root node, so we need to create the node (or replace it if it exists) ... 1066 final Location parentLocation = Location.create(path.getParent()); 1067 childName = path.getLastSegment().getName(); 1068 requests.add(new CreateNodeRequest(parentLocation, cacheWorkspace, childName, NodeConflictBehavior.REPLACE, 1069 mergedNode.getProperties())); 1070 // logger.trace("Adding {0} to cache with properties {1}", location, properties); 1071 // Now create all of the children that this federated node knows of ... 1072 for (Location child : mergedNode.getChildren()) { 1073 childName = child.getPath().getLastSegment().getName(); 1074 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, 1075 child)); 1076 // logger.trace("Caching child of {0} named {1}", location, childName); 1077 } 1078 } 1079 } 1080 } 1081 1082 // Execute all the requests ... 1083 final RepositoryConnection cacheConnection = getConnectionToCacheFor(federatedWorkspace); 1084 cacheConnection.execute(context, CompositeRequest.with(requests)); 1085 1086 // If the children did not have UUIDs, then find the actual locations for each of the cached nodes ... 1087 if (requests.size() > 1) { 1088 Iterator<Request> requestIter = requests.iterator(); 1089 requestIter.next(); // Skip the first request, which creates/updates the node (we want children) 1090 List<Location> children = mergedNode.getChildren(); 1091 for (int i = 0; i != children.size(); ++i) { 1092 Request request = requestIter.next(); 1093 while (!(request instanceof CreateNodeRequest)) { // skip non-create requests 1094 request = requestIter.next(); 1095 } 1096 Location actual = ((CreateNodeRequest)request).getActualLocationOfNode(); 1097 Location child = children.get(i); 1098 if (!child.hasIdProperties()) { 1099 assert child.getPath().equals(actual.getPath()); 1100 children.set(i, actual); 1101 } 1102 } 1103 } 1104 } 1105 }