001 /* 002 * JBoss DNA (http://www.jboss.org/dna) 003 * See the COPYRIGHT.txt file distributed with this work for information 004 * regarding copyright ownership. Some portions may be licensed 005 * to Red Hat, Inc. under one or more contributor license agreements. 006 * See the AUTHORS.txt file in the distribution for a full listing of 007 * individual contributors. 008 * 009 * JBoss DNA is free software. Unless otherwise indicated, all code in JBoss DNA 010 * is licensed to you under the terms of the GNU Lesser General Public License as 011 * published by the Free Software Foundation; either version 2.1 of 012 * the License, or (at your option) any later version. 013 * 014 * JBoss DNA is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 017 * Lesser General Public License for more details. 018 * 019 * You should have received a copy of the GNU Lesser General Public 020 * License along with this software; if not, write to the Free 021 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 022 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 023 */ 024 package org.jboss.dna.connector.federation; 025 026 import java.util.ArrayList; 027 import java.util.Collection; 028 import java.util.Collections; 029 import java.util.HashMap; 030 import java.util.HashSet; 031 import java.util.Iterator; 032 import java.util.LinkedList; 033 import java.util.List; 034 import java.util.Map; 035 import java.util.Set; 036 import java.util.UUID; 037 import java.util.concurrent.TimeUnit; 038 import net.jcip.annotations.Immutable; 039 import net.jcip.annotations.NotThreadSafe; 040 import org.jboss.dna.common.i18n.I18n; 041 import org.jboss.dna.common.util.CheckArg; 042 import org.jboss.dna.common.util.Logger; 043 import org.jboss.dna.connector.federation.contribution.Contribution; 044 import org.jboss.dna.connector.federation.merge.FederatedNode; 045 import org.jboss.dna.connector.federation.merge.MergePlan; 046 import org.jboss.dna.graph.DnaLexicon; 047 import org.jboss.dna.graph.ExecutionContext; 048 import org.jboss.dna.graph.JcrLexicon; 049 import org.jboss.dna.graph.Location; 050 import org.jboss.dna.graph.NodeConflictBehavior; 051 import org.jboss.dna.graph.cache.CachePolicy; 052 import org.jboss.dna.graph.connector.RepositoryConnection; 053 import org.jboss.dna.graph.connector.RepositoryConnectionFactory; 054 import org.jboss.dna.graph.connector.RepositorySource; 055 import org.jboss.dna.graph.connector.RepositorySourceException; 056 import org.jboss.dna.graph.property.DateTime; 057 import org.jboss.dna.graph.property.Name; 058 import org.jboss.dna.graph.property.NamespaceRegistry; 059 import org.jboss.dna.graph.property.Path; 060 import org.jboss.dna.graph.property.PathFactory; 061 import org.jboss.dna.graph.property.PathNotFoundException; 062 import org.jboss.dna.graph.property.Property; 063 import org.jboss.dna.graph.property.PropertyFactory; 064 import org.jboss.dna.graph.request.ChangeRequest; 065 import org.jboss.dna.graph.request.CloneWorkspaceRequest; 066 import org.jboss.dna.graph.request.CompositeRequest; 067 import org.jboss.dna.graph.request.CopyBranchRequest; 068 import org.jboss.dna.graph.request.CreateNodeRequest; 069 import org.jboss.dna.graph.request.CreateWorkspaceRequest; 070 import org.jboss.dna.graph.request.DeleteBranchRequest; 071 import org.jboss.dna.graph.request.DestroyWorkspaceRequest; 072 import org.jboss.dna.graph.request.GetWorkspacesRequest; 073 import org.jboss.dna.graph.request.InvalidWorkspaceException; 074 import org.jboss.dna.graph.request.MoveBranchRequest; 075 import org.jboss.dna.graph.request.ReadAllChildrenRequest; 076 import org.jboss.dna.graph.request.ReadAllPropertiesRequest; 077 import org.jboss.dna.graph.request.ReadNodeRequest; 078 import org.jboss.dna.graph.request.Request; 079 import org.jboss.dna.graph.request.UnsupportedRequestException; 080 import org.jboss.dna.graph.request.UpdatePropertiesRequest; 081 import org.jboss.dna.graph.request.VerifyWorkspaceRequest; 082 import org.jboss.dna.graph.request.processor.RequestProcessor; 083 084 /** 085 * @author Randall Hauch 086 */ 087 @NotThreadSafe 088 public class FederatingRequestProcessor extends RequestProcessor { 089 090 private static final Set<Name> HIDDEN_PROPERTIES = Collections.singleton(DnaLexicon.MERGE_PLAN); 091 092 private final Map<String, FederatedWorkspace> workspaces; 093 private final FederatedWorkspace defaultWorkspace; 094 private final RepositoryConnectionFactory connectionFactory; 095 /** The set of all connections, including the cache connection */ 096 private final Map<String, RepositoryConnection> connectionsBySourceName; 097 protected final PathFactory pathFactory; 098 private Logger logger; 099 100 /** 101 * Create a command executor that federates (merges) the information from multiple sources described by the source projections 102 * for the particular workspace specified by the request(s). The request processor will use the {@link Projection cache 103 * projection} of each {@link FederatedWorkspace workspace} to identify the {@link Projection#getSourceName() repository 104 * source} for the cache as well as the {@link Projection#getRules() rules} for how the paths are mapped in the cache. This 105 * cache will be consulted first for the requested information, and will be kept up to date as changes are made to the 106 * federated information. 107 * 108 * @param context the execution context in which the executor will be run; may not be null 109 * @param sourceName the name of the {@link RepositorySource} that is making use of this executor; may not be null or empty 110 * @param workspaces the configuration for each workspace, keyed by workspace name; may not be null 111 * @param defaultWorkspace the default workspace; null if there is no default 112 * @param connectionFactory the factory for {@link RepositoryConnection} instances 113 */ 114 public FederatingRequestProcessor( ExecutionContext context, 115 String sourceName, 116 Map<String, FederatedWorkspace> workspaces, 117 FederatedWorkspace defaultWorkspace, 118 RepositoryConnectionFactory connectionFactory ) { 119 super(sourceName, context, null); 120 CheckArg.isNotEmpty(workspaces, "workspaces"); 121 CheckArg.isNotNull(connectionFactory, "connectionFactory"); 122 this.workspaces = workspaces; 123 this.connectionFactory = connectionFactory; 124 this.logger = context.getLogger(getClass()); 125 this.connectionsBySourceName = new HashMap<String, RepositoryConnection>(); 126 this.defaultWorkspace = defaultWorkspace; // may be null 127 this.pathFactory = context.getValueFactories().getPathFactory(); 128 } 129 130 protected DateTime getCurrentTimeInUtc() { 131 return getExecutionContext().getValueFactories().getDateFactory().createUtc(); 132 } 133 134 /** 135 * {@inheritDoc} 136 * 137 * @see RequestProcessor#close() 138 */ 139 @Override 140 public void close() { 141 try { 142 super.close(); 143 } finally { 144 // Make sure to close ALL open connections ... 145 for (RepositoryConnection connection : connectionsBySourceName.values()) { 146 if (connection == null) continue; 147 try { 148 connection.close(); 149 } catch (Throwable t) { 150 logger.debug("Error while closing connection to {0}", connection.getSourceName()); 151 } 152 } 153 connectionsBySourceName.clear(); 154 } 155 } 156 157 protected RepositoryConnection getConnectionToCacheFor( FederatedWorkspace workspace ) throws RepositorySourceException { 158 return getConnection(workspace.getCacheProjection()); 159 } 160 161 protected RepositoryConnection getConnection( Projection projection ) throws RepositorySourceException { 162 String sourceName = projection.getSourceName(); 163 RepositoryConnection connection = connectionsBySourceName.get(sourceName); 164 if (connection == null) { 165 connection = connectionFactory.createConnection(sourceName); 166 connectionsBySourceName.put(sourceName, connection); 167 } 168 return connection; 169 } 170 171 protected Set<String> getOpenConnections() { 172 return connectionsBySourceName.keySet(); 173 } 174 175 /** 176 * Utility to obtain the federated workspace referenced by the request. This method supports using the default workspace if 177 * the workspace name is null. If no such workspace, the request is marked with an appropriate error. 178 * 179 * @param request the request; may not be null 180 * @param workspaceName the name of the workspace; may be null if the default workspace should be used 181 * @return the federated workspace, or null if none was found 182 */ 183 protected FederatedWorkspace getWorkspace( Request request, 184 String workspaceName ) { 185 FederatedWorkspace workspace = null; 186 if (workspaceName == null) { 187 if (defaultWorkspace != null) return defaultWorkspace; 188 // There is no default, so record the error ... 189 String msg = FederationI18n.noDefaultWorkspace.text(getSourceName()); 190 request.setError(new InvalidWorkspaceException(msg)); 191 } 192 workspace = workspaces.get(workspaceName); 193 if (workspace == null) { 194 // There is no workspace with this name, so record an error ... 195 String msg = FederationI18n.workspaceDoesNotExist.text(getSourceName(), workspaceName); 196 request.setError(new InvalidWorkspaceException(msg)); 197 } 198 return workspace; 199 } 200 201 /** 202 * {@inheritDoc} 203 * 204 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadAllChildrenRequest) 205 */ 206 @Override 207 public void process( ReadAllChildrenRequest request ) { 208 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 209 if (workspace == null) return; 210 ReadNodeRequest nodeInfo = getNode(request.of(), workspace); 211 if (nodeInfo.hasError()) return; 212 for (Location child : nodeInfo.getChildren()) { 213 request.addChild(child); 214 } 215 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 216 } 217 218 /** 219 * {@inheritDoc} 220 * 221 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadAllPropertiesRequest) 222 */ 223 @Override 224 public void process( ReadAllPropertiesRequest request ) { 225 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 226 if (workspace == null) return; 227 ReadNodeRequest nodeInfo = getNode(request.at(), workspace); 228 if (nodeInfo.hasError()) return; 229 for (Property property : nodeInfo.getProperties()) { 230 if (HIDDEN_PROPERTIES.contains(property.getName())) continue; 231 request.addProperty(property); 232 } 233 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 234 } 235 236 /** 237 * {@inheritDoc} 238 * 239 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.ReadNodeRequest) 240 */ 241 @Override 242 public void process( ReadNodeRequest request ) { 243 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 244 if (workspace == null) return; 245 ReadNodeRequest nodeInfo = getNode(request.at(), workspace); 246 if (nodeInfo.hasError()) return; 247 for (Property property : nodeInfo.getProperties()) { 248 if (HIDDEN_PROPERTIES.contains(property.getName())) continue; 249 request.addProperty(property); 250 } 251 for (Location child : nodeInfo.getChildren()) { 252 request.addChild(child); 253 } 254 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 255 } 256 257 /** 258 * {@inheritDoc} 259 * 260 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CreateNodeRequest) 261 */ 262 @Override 263 public void process( CreateNodeRequest request ) { 264 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 265 if (workspace == null) return; 266 267 // Can push this down if and only if the entire request is within a single federated source ... 268 SingleProjection projection = asSingleProjection(workspace, request.under(), request); 269 if (projection == null) return; 270 271 // Push down the request ... 272 Location parentLocation = Location.create(projection.pathInSource); 273 String workspaceName = projection.projection.getWorkspaceName(); 274 CreateNodeRequest sourceRequest = new CreateNodeRequest(parentLocation, workspaceName, request.named(), 275 request.properties()); 276 execute(sourceRequest, projection.projection); 277 278 // Copy/transform the results ... 279 Location location = projection.convertToRepository(sourceRequest.getActualLocationOfNode()); 280 if (sourceRequest.hasError()) { 281 request.setError(sourceRequest.getError()); 282 } else { 283 request.setActualLocationOfNode(location); 284 } 285 286 // Add the cache ... 287 Map<Name, Property> props = new HashMap<Name, Property>(); 288 for (Property property : request.properties()) { 289 props.put(property.getName(), property); 290 } 291 for (Property idProperty : location) { 292 props.put(idProperty.getName(), idProperty); 293 } 294 CreateNodeRequest cacheRequest = new CreateNodeRequest(parentLocation, workspace.getCacheProjection().getWorkspaceName(), 295 request.named(), props.values()); 296 executeInCache(cacheRequest, workspace); 297 } 298 299 /** 300 * {@inheritDoc} 301 * 302 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.DeleteBranchRequest) 303 */ 304 @Override 305 public void process( DeleteBranchRequest request ) { 306 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 307 if (workspace == null) return; 308 309 // Can push this down if and only if the entire request is within a single federated source ... 310 SingleProjection projection = asSingleProjection(workspace, request.at(), request); 311 if (projection == null) return; 312 313 // Push down the request ... 314 Location sourceLocation = Location.create(projection.pathInSource); 315 String workspaceName = projection.projection.getWorkspaceName(); 316 DeleteBranchRequest sourceRequest = new DeleteBranchRequest(sourceLocation, workspaceName); 317 execute(sourceRequest, projection.projection); 318 319 // Copy/transform the results ... 320 if (sourceRequest.hasError()) { 321 request.setError(sourceRequest.getError()); 322 } else { 323 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode())); 324 } 325 326 // Delete in the cache ... 327 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.at(), workspace.getCacheProjection() 328 .getWorkspaceName()); 329 executeInCache(cacheRequest, workspace); 330 } 331 332 /** 333 * {@inheritDoc} 334 * 335 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CopyBranchRequest) 336 */ 337 @Override 338 public void process( CopyBranchRequest request ) { 339 FederatedWorkspace fromWorkspace = getWorkspace(request, request.fromWorkspace()); 340 if (fromWorkspace == null) return; 341 FederatedWorkspace intoWorkspace = getWorkspace(request, request.intoWorkspace()); 342 if (intoWorkspace == null) return; 343 if (!fromWorkspace.equals(intoWorkspace)) { 344 // Otherwise there wasn't a single projection with a single path ... 345 String msg = FederationI18n.unableToPerformOperationSpanningWorkspaces.text(fromWorkspace.getName(), 346 intoWorkspace.getName()); 347 request.setError(new UnsupportedRequestException(msg)); 348 } 349 350 // Can push this down if and only if the entire request is within a single federated source ... 351 SingleProjection fromProjection = asSingleProjection(fromWorkspace, request.from(), request); 352 if (fromProjection == null) return; 353 SingleProjection intoProjection = asSingleProjection(intoWorkspace, request.into(), request); 354 if (intoProjection == null) return; 355 if (!intoProjection.projection.equals(fromProjection.projection)) { 356 // Otherwise there wasn't a single projection with a single path ... 357 String msg = FederationI18n.unableToPerformOperationUnlessLocationsAreFromSingleProjection.text(request.from(), 358 request.into(), 359 fromWorkspace.getName(), 360 fromProjection.projection.getRules(), 361 intoProjection.projection.getRules()); 362 request.setError(new UnsupportedRequestException(msg)); 363 } 364 365 // Push down the request ... 366 Location fromLocation = Location.create(fromProjection.pathInSource); 367 Location intoLocation = Location.create(intoProjection.pathInSource); 368 String workspaceName = fromProjection.projection.getWorkspaceName(); 369 CopyBranchRequest sourceRequest = new CopyBranchRequest(fromLocation, workspaceName, intoLocation, workspaceName, 370 request.desiredName(), request.conflictBehavior()); 371 execute(sourceRequest, fromProjection.projection); 372 373 // Copy/transform the results ... 374 if (sourceRequest.hasError()) { 375 request.setError(sourceRequest.getError()); 376 } else { 377 request.setActualLocations(fromProjection.convertToRepository(sourceRequest.getActualLocationBefore()), 378 intoProjection.convertToRepository(sourceRequest.getActualLocationAfter())); 379 } 380 381 // Delete from the cache the parent of the new location ... 382 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.into(), fromWorkspace.getCacheProjection() 383 .getWorkspaceName()); 384 executeInCache(cacheRequest, fromWorkspace); 385 } 386 387 /** 388 * {@inheritDoc} 389 * 390 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.MoveBranchRequest) 391 */ 392 @Override 393 public void process( MoveBranchRequest request ) { 394 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 395 if (workspace == null) return; 396 397 // Can push this down if and only if the entire request is within a single federated source ... 398 SingleProjection fromProjection = asSingleProjection(workspace, request.from(), request); 399 if (fromProjection == null) return; 400 SingleProjection intoProjection = asSingleProjection(workspace, request.into(), request); 401 if (intoProjection == null) return; 402 if (!intoProjection.projection.equals(fromProjection.projection)) { 403 // Otherwise there wasn't a single projection with a single path ... 404 String msg = FederationI18n.unableToPerformOperationUnlessLocationsAreFromSingleProjection.text(request.from(), 405 request.into(), 406 workspace.getName(), 407 fromProjection.projection.getRules(), 408 intoProjection.projection.getRules()); 409 request.setError(new UnsupportedRequestException(msg)); 410 } 411 SingleProjection beforeProjection = request.before() != null ? asSingleProjection(workspace, request.before(), request) : null; 412 413 414 // Push down the request ... 415 Location fromLocation = Location.create(fromProjection.pathInSource); 416 Location intoLocation = Location.create(intoProjection.pathInSource); 417 Location beforeLocation = beforeProjection != null ? Location.create(beforeProjection.pathInSource) : null; 418 String workspaceName = fromProjection.projection.getWorkspaceName(); 419 MoveBranchRequest sourceRequest = new MoveBranchRequest(fromLocation, intoLocation, beforeLocation, workspaceName, request.desiredName(), 420 request.conflictBehavior()); 421 execute(sourceRequest, fromProjection.projection); 422 423 // Copy/transform the results ... 424 if (sourceRequest.hasError()) { 425 request.setError(sourceRequest.getError()); 426 } else { 427 request.setActualLocations(fromProjection.convertToRepository(sourceRequest.getActualLocationBefore()), 428 intoProjection.convertToRepository(sourceRequest.getActualLocationAfter())); 429 } 430 // Delete from the cache ... 431 DeleteBranchRequest cacheRequest = new DeleteBranchRequest(request.from(), workspace.getCacheProjection() 432 .getWorkspaceName()); 433 executeInCache(cacheRequest, workspace); 434 // Mark the new parent node as being expired ... 435 cacheRequest = new DeleteBranchRequest(request.into(), workspace.getCacheProjection().getWorkspaceName()); 436 executeInCache(cacheRequest, workspace); 437 } 438 439 /** 440 * {@inheritDoc} 441 * 442 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.UpdatePropertiesRequest) 443 */ 444 @Override 445 public void process( UpdatePropertiesRequest request ) { 446 FederatedWorkspace workspace = getWorkspace(request, request.inWorkspace()); 447 if (workspace == null) return; 448 449 // Can push this down if and only if the entire request is within a single federated source ... 450 SingleProjection projection = asSingleProjection(workspace, request.on(), request); 451 if (projection == null) return; 452 453 // Push down the request ... 454 Location sourceLocation = Location.create(projection.pathInSource); 455 String workspaceName = projection.projection.getWorkspaceName(); 456 UpdatePropertiesRequest sourceRequest = new UpdatePropertiesRequest(sourceLocation, workspaceName, request.properties()); 457 execute(sourceRequest, projection.projection); 458 459 // Copy/transform the results ... 460 if (sourceRequest.hasError()) { 461 request.setError(sourceRequest.getError()); 462 } else { 463 request.setActualLocationOfNode(projection.convertToRepository(sourceRequest.getActualLocationOfNode())); 464 } 465 466 // Update the cache ... 467 UpdatePropertiesRequest cacheRequest = new UpdatePropertiesRequest(request.on(), workspace.getCacheProjection() 468 .getWorkspaceName(), 469 request.properties()); 470 executeInCache(cacheRequest, workspace); 471 } 472 473 /** 474 * {@inheritDoc} 475 * 476 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.VerifyWorkspaceRequest) 477 */ 478 @Override 479 public void process( VerifyWorkspaceRequest request ) { 480 FederatedWorkspace workspace = getWorkspace(request, request.workspaceName()); 481 if (workspace != null) { 482 request.setActualWorkspaceName(workspace.getName()); 483 Location root = Location.create(pathFactory.createRootPath()); 484 ReadNodeRequest nodeInfo = getNode(root, workspace); 485 if (nodeInfo.hasError()) return; 486 request.setActualRootLocation(nodeInfo.getActualLocationOfNode()); 487 } 488 } 489 490 /** 491 * {@inheritDoc} 492 * 493 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.GetWorkspacesRequest) 494 */ 495 @Override 496 public void process( GetWorkspacesRequest request ) { 497 request.setAvailableWorkspaceNames(workspaces.keySet()); 498 super.setCacheableInfo(request); 499 } 500 501 /** 502 * {@inheritDoc} 503 * 504 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CreateWorkspaceRequest) 505 */ 506 @Override 507 public void process( CreateWorkspaceRequest request ) { 508 throw new UnsupportedOperationException(); 509 } 510 511 /** 512 * {@inheritDoc} 513 * 514 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.CloneWorkspaceRequest) 515 */ 516 @Override 517 public void process( CloneWorkspaceRequest request ) { 518 throw new UnsupportedOperationException(); 519 } 520 521 /** 522 * {@inheritDoc} 523 * 524 * @see org.jboss.dna.graph.request.processor.RequestProcessor#process(org.jboss.dna.graph.request.DestroyWorkspaceRequest) 525 */ 526 @Override 527 public void process( DestroyWorkspaceRequest request ) { 528 throw new UnsupportedOperationException(); 529 } 530 531 @Immutable 532 protected class SingleProjection { 533 protected final Projection projection; 534 protected final Path pathInSource; 535 protected final Location federatedLocation; 536 537 protected SingleProjection( Projection projection, 538 Path pathInSource, 539 Location federatedLocation ) { 540 this.projection = projection; 541 this.federatedLocation = federatedLocation; 542 this.pathInSource = pathInSource; 543 } 544 545 protected Location convertToRepository( Location sourceLocation ) { 546 assert sourceLocation != null; 547 if (sourceLocation.hasPath()) { 548 Set<Path> paths = projection.getPathsInRepository(sourceLocation.getPath(), pathFactory); 549 assert paths.size() == 1; 550 return sourceLocation.with(paths.iterator().next()); 551 } 552 return sourceLocation; 553 } 554 } 555 556 protected SingleProjection asSingleProjection( FederatedWorkspace federatedWorkspace, 557 Location location, 558 Request request ) { 559 // Check the cache for this location ... 560 ReadNodeRequest nodeInfo = getNode(location, federatedWorkspace); 561 if (nodeInfo.hasError()) { 562 request.setError(nodeInfo.getError()); 563 return null; 564 } 565 Location actualLocation = nodeInfo.getActualLocationOfNode(); 566 Path pathInRepository = actualLocation.getPath(); 567 assert pathInRepository != null; 568 569 // Get the merge plan for the node ... 570 MergePlan plan = getMergePlan(nodeInfo); 571 assert plan != null; 572 if (plan.getRealContributionCount() == 1) { 573 for (Contribution contribution : plan) { 574 if (contribution.isEmpty() || contribution.isPlaceholder()) continue; 575 for (Projection projection : federatedWorkspace.getProjectionsFor(contribution.getSourceName())) { 576 Set<Path> paths = projection.getPathsInSource(pathInRepository, pathFactory); 577 if (paths.size() == 1) { 578 return new SingleProjection(projection, paths.iterator().next(), actualLocation); 579 } 580 } 581 } 582 } 583 584 // Otherwise there wasn't a single projection with a single path ... 585 StringBuilder projections = new StringBuilder(); 586 boolean first = true; 587 for (Contribution contribution : plan) { 588 if (contribution.isPlaceholder() || contribution.isEmpty()) continue; 589 if (first) first = false; 590 else projections.append(", "); 591 for (Projection projection : federatedWorkspace.getProjectionsFor(contribution.getSourceName())) { 592 Set<Path> paths = projection.getPathsInSource(pathInRepository, pathFactory); 593 if (paths.size() == 1) { 594 projections.append(FederationI18n.pathInProjection.text(paths.iterator().next(), projection.getRules())); 595 } else { 596 projections.append(FederationI18n.pathInProjection.text(paths, projection.getRules())); 597 } 598 } 599 } 600 String msg = FederationI18n.unableToPerformOperationUnlessLocationIsFromSingleProjection.text(location, 601 federatedWorkspace.getName(), 602 projections); 603 request.setError(new UnsupportedRequestException(msg)); 604 return null; 605 } 606 607 protected void execute( Request request, 608 Projection projection ) { 609 RepositoryConnection connection = getConnection(projection); 610 connection.execute(getExecutionContext(), request); 611 // Don't need to close, as we'll close all connections when this processor is closed 612 } 613 614 protected void executeInCache( Request request, 615 FederatedWorkspace workspace ) { 616 RepositoryConnection connection = getConnectionToCacheFor(workspace); 617 connection.execute(getExecutionContext(), request); 618 // Don't need to close, as we'll close all connections when this processor is closed 619 if (logger.isTraceEnabled()) { 620 traceCacheUpdate(request); 621 } 622 } 623 624 /** 625 * Get the node information from the underlying sources or, if possible, from the cache. 626 * 627 * @param location the location of the node to be returned 628 * @param workspace the federated workspace configuration; may be null 629 * @return the node information 630 * @throws RepositorySourceException 631 */ 632 protected ReadNodeRequest getNode( Location location, 633 FederatedWorkspace workspace ) throws RepositorySourceException { 634 // Check the cache first ... 635 ReadNodeRequest fromCache = new ReadNodeRequest(location, workspace.getCacheProjection().getWorkspaceName()); 636 executeInCache(fromCache, workspace); 637 638 // Look at the cache results from the cache for problems, or if found a plan in the cache look 639 // at the contributions. We'll be putting together the set of source names for which we need to 640 // get the contributions. 641 Set<String> sourceNames = null; 642 List<Contribution> contributions = new LinkedList<Contribution>(); 643 644 if (fromCache.hasError()) { 645 Throwable error = fromCache.getError(); 646 if (!(error instanceof PathNotFoundException)) return fromCache; 647 648 // The path was not found in the cache, so since we don't know whether the ancestors are federated 649 // from multiple source nodes, we need to populate the cache starting with the lowest ancestor 650 // that already exists in the cache. 651 PathNotFoundException notFound = (PathNotFoundException)fromCache.getError(); 652 Path lowestExistingAncestor = notFound.getLowestAncestorThatDoesExist(); 653 654 if (location.hasPath()) { 655 // Create a new instance so that we can update it ... 656 fromCache = new ReadNodeRequest(location, workspace.getCacheProjection().getWorkspaceName()); 657 Path path = location.getPath(); 658 Path ancestor = path.getParent(); 659 if (!ancestor.equals(lowestExistingAncestor)) { 660 // Load the nodes along the path below the existing ancestor, down to (but excluding) the desired path 661 Path pathToLoad = ancestor; 662 while (!pathToLoad.equals(lowestExistingAncestor)) { 663 Location locationToLoad = Location.create(pathToLoad); 664 loadContributionsFromSources(locationToLoad, workspace, null, contributions); // sourceNames may be 665 // null or empty 666 FederatedNode mergedNode = createFederatedNode(locationToLoad, workspace, fromCache, contributions, true); 667 if (mergedNode == null) { 668 // No source had a contribution ... 669 I18n msg = FederationI18n.nodeDoesNotExistAtPath; 670 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(path, ancestor))); 671 return fromCache; 672 } 673 MergePlan mergePlan = mergedNode.getMergePlan(); 674 if (mergePlan != null) { 675 Property mergePlanProperty = getExecutionContext().getPropertyFactory().create(DnaLexicon.MERGE_PLAN, 676 (Object)mergePlan); 677 fromCache.addProperty(mergePlanProperty); 678 } 679 contributions.clear(); 680 // Move to the next child along the path ... 681 pathToLoad = pathToLoad.getParent(); 682 } 683 } 684 685 } 686 687 // At this point, all ancestors exist ... 688 } else { 689 // There is no error, so look for the merge plan ... 690 MergePlan mergePlan = getMergePlan(fromCache); 691 if (mergePlan != null) { 692 // We found the merge plan, so check whether it's still valid ... 693 final DateTime now = getCurrentTimeInUtc(); 694 if (!mergePlan.isExpired(now)) { 695 // It is still valid, so check whether any contribution is from a non-existant projection ... 696 for (Contribution contribution : mergePlan) { 697 if (!workspace.contains(contribution.getSourceName(), contribution.getWorkspaceName())) { 698 // TODO: Record that the cached contribution is from a source that is no longer in this repository 699 } 700 } 701 return fromCache; 702 } 703 704 // At least one of the contributions is expired, so go through the contributions and place 705 // the valid contributions in the 'contributions' list; any expired contribution 706 // needs to be loaded by adding the name to the 'sourceNames' 707 if (mergePlan.getContributionCount() > 0) { 708 sourceNames = new HashSet<String>(); 709 for (Contribution contribution : mergePlan) { 710 if (contribution.isExpired(now)) { 711 sourceNames.add(contribution.getSourceName()); 712 contributions.add(contribution); 713 } 714 } 715 } 716 } 717 } 718 719 // Get the contributions from the sources given their names ... 720 location = fromCache.getActualLocationOfNode(); 721 if (location == null) { 722 // Not yet in the cache ... 723 location = fromCache.at(); 724 } 725 loadContributionsFromSources(location, workspace, sourceNames, contributions); // sourceNames may be null or empty 726 FederatedNode mergedNode = createFederatedNode(location, workspace, fromCache, contributions, true); 727 if (mergedNode == null) { 728 // No source had a contribution ... 729 if (location.hasPath()) { 730 Path ancestor = location.getPath().getParent(); 731 I18n msg = FederationI18n.nodeDoesNotExistAtPath; 732 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(location, ancestor))); 733 return fromCache; 734 } 735 I18n msg = FederationI18n.nodeDoesNotExistAtLocation; 736 fromCache.setError(new PathNotFoundException(location, null, msg.text(location))); 737 return fromCache; 738 } 739 return mergedNode; 740 } 741 742 protected FederatedNode createFederatedNode( Location location, 743 FederatedWorkspace federatedWorkspace, 744 ReadNodeRequest fromCache, 745 List<Contribution> contributions, 746 boolean updateCache ) throws RepositorySourceException { 747 assert location != null; 748 749 // If there are no contributions from any source ... 750 boolean foundNonEmptyContribution = false; 751 for (Contribution contribution : contributions) { 752 assert contribution != null; 753 if (!contribution.isEmpty()) { 754 foundNonEmptyContribution = true; 755 break; 756 } 757 } 758 if (!foundNonEmptyContribution) return null; 759 if (logger.isTraceEnabled()) { 760 NamespaceRegistry registry = getExecutionContext().getNamespaceRegistry(); 761 logger.trace("Loaded {0} from sources, resulting in these contributions:", location.getString(registry)); 762 int i = 0; 763 for (Contribution contribution : contributions) { 764 logger.trace(" {0} {1}", ++i, contribution.getString(registry)); 765 } 766 } 767 768 // Create the node, and use the existing UUID if one is found in the cache ... 769 ExecutionContext context = getExecutionContext(); 770 assert context != null; 771 FederatedNode mergedNode = new FederatedNode(location, federatedWorkspace.getName()); 772 773 // Merge the results into a single set of results ... 774 assert contributions.size() > 0; 775 federatedWorkspace.getMergingStrategy().merge(mergedNode, contributions, context); 776 if (mergedNode.getCachePolicy() == null) { 777 mergedNode.setCachePolicy(federatedWorkspace.getCachePolicy()); 778 } 779 if (updateCache) { 780 // Place the results into the cache ... 781 updateCache(federatedWorkspace, mergedNode, fromCache); 782 } 783 // And return the results ... 784 return mergedNode; 785 } 786 787 /** 788 * Load the node at the supplied location from the sources with the supplied name, returning the information. This method 789 * always obtains the information from the sources and does not use or update the cache. 790 * 791 * @param location the location of the node that is to be loaded 792 * @param federatedWorkspace the federated workspace 793 * @param sourceNames the names of the sources from which contributions are to be loaded; may be empty or null if all 794 * contributions from all sources are to be loaded 795 * @param contributions the list into which the contributions are to be placed 796 * @throws RepositorySourceException 797 */ 798 protected void loadContributionsFromSources( Location location, 799 FederatedWorkspace federatedWorkspace, 800 Set<String> sourceNames, 801 List<Contribution> contributions ) throws RepositorySourceException { 802 // At this point, there is no merge plan, so read information from the sources ... 803 final ExecutionContext context = getExecutionContext(); 804 805 CachePolicy cachePolicy = federatedWorkspace.getCachePolicy(); 806 // If the location has no path, then we have to submit a request to ALL sources ... 807 if (!location.hasPath()) { 808 for (Projection projection : federatedWorkspace.getSourceProjections()) { 809 final String source = projection.getSourceName(); 810 final String workspace = projection.getSourceName(); 811 if (sourceNames != null && !sourceNames.contains(source)) continue; 812 final RepositoryConnection sourceConnection = getConnection(projection); 813 if (sourceConnection == null) continue; // No source exists by this name 814 // Submit the request ... 815 ReadNodeRequest request = new ReadNodeRequest(location, federatedWorkspace.getName()); 816 sourceConnection.execute(context, request); 817 if (request.hasError()) continue; 818 819 // Figure out how long we can cache this contribution ... 820 long minimumTimeToLive = Long.MAX_VALUE; 821 if (cachePolicy != null) { 822 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive()); 823 } 824 CachePolicy requestCachePolicy = request.getCachePolicy(); 825 if (requestCachePolicy != null) { 826 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive()); 827 } else { 828 // See if the source has a default policy ... 829 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy(); 830 if (sourceCachePolicy != null) { 831 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive()); 832 } 833 } 834 // The expiration time should be the smallest of the minimum TTL values ... 835 DateTime expirationTime = null; 836 if (minimumTimeToLive < Long.MAX_VALUE) { 837 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS); 838 } 839 840 // Convert the locations of the children (relative to the source) to be relative to this node 841 Contribution contribution = Contribution.create(source, 842 workspace, 843 request.getActualLocationOfNode(), 844 expirationTime, 845 request.getProperties(), 846 request.getChildren()); 847 contributions.add(contribution); 848 } 849 if (contributions.isEmpty() && logger.isTraceEnabled()) { 850 NamespaceRegistry registry = getExecutionContext().getNamespaceRegistry(); 851 logger.trace("Failed to load {0} from any source", location.getString(registry)); 852 } 853 return; 854 } 855 856 // Otherwise, we can do it by path and projections ... 857 Path path = location.getPath(); 858 for (Projection projection : federatedWorkspace.getSourceProjections()) { 859 final String source = projection.getSourceName(); 860 final String workspace = projection.getWorkspaceName(); 861 if (sourceNames != null && !sourceNames.contains(source)) continue; 862 final RepositoryConnection sourceConnection = getConnection(projection); 863 if (sourceConnection == null) continue; // No source exists by this name 864 // Get the cached information ... 865 DateTime expirationTime = null; 866 if (cachePolicy != null) { 867 expirationTime = getCurrentTimeInUtc().plus(cachePolicy.getTimeToLive(), TimeUnit.MILLISECONDS); 868 } 869 // Get the paths-in-source where we should fetch node contributions ... 870 Set<Path> pathsInSource = projection.getPathsInSource(path, pathFactory); 871 if (pathsInSource.isEmpty()) { 872 // The source has no contributions, but see whether the project exists BELOW this path. 873 // We do this by getting the top-level repository paths of the projection, and then 874 // use those to figure out the children of the nodes. 875 Contribution contribution = null; 876 List<Path> topLevelPaths = projection.getTopLevelPathsInRepository(pathFactory); 877 Location input = Location.create(path); 878 switch (topLevelPaths.size()) { 879 case 0: 880 break; 881 case 1: { 882 Path topLevelPath = topLevelPaths.iterator().next(); 883 if (path.isAncestorOf(topLevelPath)) { 884 Location child = Location.create(topLevelPath); 885 contribution = Contribution.createPlaceholder(source, workspace, input, expirationTime, child); 886 } 887 break; 888 } 889 default: { 890 // We assume that the top-level paths do not overlap ... 891 List<Location> children = new ArrayList<Location>(topLevelPaths.size()); 892 for (Path topLevelPath : topLevelPaths) { 893 if (path.isAncestorOf(topLevelPath)) { 894 children.add(Location.create(topLevelPath)); 895 } 896 } 897 if (children.size() > 0) { 898 contribution = Contribution.createPlaceholder(source, workspace, input, expirationTime, children); 899 } 900 } 901 } 902 if (contribution == null) contribution = Contribution.create(source, workspace, expirationTime); 903 contributions.add(contribution); 904 } else { 905 // There is at least one (real) contribution ... 906 907 // Get the contributions ... 908 final int numPaths = pathsInSource.size(); 909 if (numPaths == 1) { 910 Path pathInSource = pathsInSource.iterator().next(); 911 ReadNodeRequest fromSource = new ReadNodeRequest(Location.create(pathInSource), workspace); 912 sourceConnection.execute(getExecutionContext(), fromSource); 913 if (!fromSource.hasError()) { 914 Collection<Property> properties = fromSource.getProperties(); 915 List<Location> children = fromSource.getChildren(); 916 917 // Figure out how long we can cache this contribution ... 918 long minimumTimeToLive = Long.MAX_VALUE; 919 if (cachePolicy != null) { 920 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive()); 921 } 922 CachePolicy requestCachePolicy = fromSource.getCachePolicy(); 923 if (requestCachePolicy != null) { 924 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive()); 925 } else { 926 // See if the source has a default policy ... 927 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy(); 928 if (sourceCachePolicy != null) { 929 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive()); 930 } 931 } 932 // The expiration time should be the smallest of the minimum TTL values ... 933 expirationTime = null; 934 if (minimumTimeToLive < Long.MAX_VALUE) { 935 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS); 936 } 937 938 Location actualLocation = fromSource.getActualLocationOfNode(); 939 Contribution contribution = Contribution.create(source, 940 workspace, 941 actualLocation, 942 expirationTime, 943 properties, 944 children); 945 contributions.add(contribution); 946 } 947 } else { 948 List<Request> fromSourceCommands = new ArrayList<Request>(numPaths); 949 for (Path pathInSource : pathsInSource) { 950 fromSourceCommands.add(new ReadNodeRequest(Location.create(pathInSource), workspace)); 951 } 952 Request request = CompositeRequest.with(fromSourceCommands); 953 sourceConnection.execute(context, request); 954 for (Request requestObj : fromSourceCommands) { 955 ReadNodeRequest fromSource = (ReadNodeRequest)requestObj; 956 if (fromSource.hasError()) continue; 957 958 // Figure out how long we can cache this contribution ... 959 long minimumTimeToLive = Long.MAX_VALUE; 960 if (cachePolicy != null) { 961 minimumTimeToLive = Math.min(minimumTimeToLive, cachePolicy.getTimeToLive()); 962 } 963 CachePolicy requestCachePolicy = fromSource.getCachePolicy(); 964 if (requestCachePolicy != null) { 965 minimumTimeToLive = Math.min(minimumTimeToLive, requestCachePolicy.getTimeToLive()); 966 } else { 967 // See if the source has a default policy ... 968 CachePolicy sourceCachePolicy = sourceConnection.getDefaultCachePolicy(); 969 if (sourceCachePolicy != null) { 970 minimumTimeToLive = Math.min(minimumTimeToLive, sourceCachePolicy.getTimeToLive()); 971 } 972 } 973 // The expiration time should be the smallest of the minimum TTL values ... 974 expirationTime = null; 975 if (minimumTimeToLive < Long.MAX_VALUE) { 976 expirationTime = getCurrentTimeInUtc().plus(minimumTimeToLive, TimeUnit.MILLISECONDS); 977 } 978 979 List<Location> children = fromSource.getChildren(); 980 Contribution contribution = Contribution.create(source, 981 workspace, 982 fromSource.getActualLocationOfNode(), 983 expirationTime, 984 fromSource.getProperties(), 985 children); 986 contributions.add(contribution); 987 } 988 } 989 } 990 } 991 } 992 993 protected MergePlan getMergePlan( ReadNodeRequest request ) { 994 Property mergePlanProperty = request.getPropertiesByName().get(DnaLexicon.MERGE_PLAN); 995 if (mergePlanProperty == null || mergePlanProperty.isEmpty()) { 996 return null; 997 } 998 Object value = mergePlanProperty.getValues().next(); 999 return value instanceof MergePlan ? (MergePlan)value : null; 1000 } 1001 1002 protected void updateCache( FederatedWorkspace federatedWorkspace, 1003 FederatedNode mergedNode, 1004 ReadNodeRequest fromCache ) throws RepositorySourceException { 1005 final ExecutionContext context = getExecutionContext(); 1006 final Location location = mergedNode.at(); 1007 final Path path = location.getPath(); 1008 final String cacheWorkspace = federatedWorkspace.getCacheProjection().getWorkspaceName(); 1009 assert path != null; 1010 List<Request> requests = new ArrayList<Request>(); 1011 Name childName = null; 1012 1013 // If the merged node has a merge plan, then add it to the properties if it is not already there ... 1014 Map<Name, Property> properties = mergedNode.getPropertiesByName(); 1015 MergePlan mergePlan = mergedNode.getMergePlan(); 1016 if (mergePlan != null && !properties.containsKey(DnaLexicon.MERGE_PLAN)) { 1017 // Record the merge plan on the merged node ... 1018 Property mergePlanProperty = getExecutionContext().getPropertyFactory().create(DnaLexicon.MERGE_PLAN, 1019 (Object)mergePlan); 1020 properties.put(mergePlanProperty.getName(), mergePlanProperty); 1021 } 1022 1023 // Make sure the UUID is being stored ... 1024 PropertyFactory propertyFactory = getExecutionContext().getPropertyFactory(); 1025 Property uuidProperty = properties.get(DnaLexicon.UUID); 1026 if (uuidProperty == null) uuidProperty = properties.get(JcrLexicon.UUID); 1027 if (uuidProperty == null) { 1028 UUID uuid = mergedNode.at().getUuid(); 1029 if (uuid == null) uuid = UUID.randomUUID(); 1030 uuidProperty = propertyFactory.create(DnaLexicon.UUID, uuid); 1031 properties.put(uuidProperty.getName(), uuidProperty); 1032 } 1033 1034 // If the node didn't exist in the first place ... 1035 if (mergedNode.hasError()) { 1036 // We need to create the node... 1037 if (path.isRoot()) { 1038 // We don't need to re-create the root, just update the properties and/or children ... 1039 } else { 1040 // This is not the root node, so we need to create the node (or replace it if it exists) ... 1041 final Location parentLocation = Location.create(path.getParent()); 1042 childName = path.getLastSegment().getName(); 1043 requests.add(new CreateNodeRequest(parentLocation, cacheWorkspace, childName, NodeConflictBehavior.REPLACE, 1044 mergedNode.getProperties())); 1045 // Now create all of the children that this federated node knows of ... 1046 for (Location child : mergedNode.getChildren()) { 1047 childName = child.getPath().getLastSegment().getName(); 1048 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, child)); 1049 } 1050 } 1051 } else { 1052 // The node existed, so figure out what to update ... 1053 if (fromCache.getChildren().equals(mergedNode.getChildren())) { 1054 // Just update the properties ... 1055 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1056 } else { 1057 // The children have changed, so figure out how ... 1058 if (fromCache.getChildren().isEmpty()) { 1059 // No children in the cache, so just update the properties of the node ... 1060 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1061 1062 // And create all of the children that this federated node knows of ... 1063 for (Location child : mergedNode.getChildren()) { 1064 childName = child.getPath().getLastSegment().getName(); 1065 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, 1066 child)); 1067 } 1068 } else if (mergedNode.getChildren().isEmpty()) { 1069 // There were children in the cache but not in the merged node, so update the cached properties 1070 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1071 1072 // and delete all the children ... 1073 for (Location child : fromCache.getChildren()) { 1074 requests.add(new DeleteBranchRequest(child, cacheWorkspace)); 1075 } 1076 } else { 1077 // There were children in the cache and in the merged node. The easy way is to just remove the 1078 // branch from the cache, the create it again ... 1079 if (path.isRoot()) { 1080 requests.add(new UpdatePropertiesRequest(location, cacheWorkspace, properties)); 1081 1082 // and delete all the children ... 1083 for (Location child : fromCache.getChildren()) { 1084 requests.add(new DeleteBranchRequest(child, cacheWorkspace)); 1085 } 1086 1087 // Now create all of the children that this federated node knows of ... 1088 for (Location child : mergedNode.getChildren()) { 1089 childName = child.getPath().getLastSegment().getName(); 1090 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, 1091 child)); 1092 } 1093 } else { 1094 requests.add(new DeleteBranchRequest(location, cacheWorkspace)); 1095 1096 // This is not the root node, so we need to create the node (or replace it if it exists) ... 1097 final Location parentLocation = Location.create(path.getParent()); 1098 childName = path.getLastSegment().getName(); 1099 requests.add(new CreateNodeRequest(parentLocation, cacheWorkspace, childName, 1100 NodeConflictBehavior.REPLACE, mergedNode.getProperties())); 1101 // Now create all of the children that this federated node knows of ... 1102 for (Location child : mergedNode.getChildren()) { 1103 childName = child.getPath().getLastSegment().getName(); 1104 requests.add(new CreateNodeRequest(location, cacheWorkspace, childName, NodeConflictBehavior.APPEND, 1105 child)); 1106 } 1107 } 1108 } 1109 } 1110 } 1111 1112 if (logger.isTraceEnabled()) { 1113 traceCacheUpdates(requests); 1114 } 1115 1116 // Execute all the requests ... 1117 final RepositoryConnection cacheConnection = getConnectionToCacheFor(federatedWorkspace); 1118 cacheConnection.execute(context, CompositeRequest.with(requests)); 1119 1120 // If the children did not have UUIDs, then find the actual locations for each of the cached nodes ... 1121 if (requests.size() > 1) { 1122 Iterator<Request> requestIter = requests.iterator(); 1123 requestIter.next(); // Skip the first request, which creates/updates the node (we want children) 1124 List<Location> children = mergedNode.getChildren(); 1125 for (int i = 0; i != children.size(); ++i) { 1126 Request request = requestIter.next(); 1127 while (!(request instanceof CreateNodeRequest)) { // skip non-create requests 1128 request = requestIter.next(); 1129 } 1130 Location actual = ((CreateNodeRequest)request).getActualLocationOfNode(); 1131 Location child = children.get(i); 1132 if (!child.hasIdProperties()) { 1133 assert child.getPath().equals(actual.getPath()); 1134 children.set(i, actual); 1135 } 1136 } 1137 } 1138 } 1139 1140 private void traceCacheUpdates( Iterable<Request> requests ) { 1141 NamespaceRegistry registry = getExecutionContext().getNamespaceRegistry(); 1142 logger.trace("Updating cache:"); 1143 for (Request request : requests) { 1144 if (!(request instanceof ChangeRequest)) continue; 1145 if (request instanceof CreateNodeRequest) { 1146 CreateNodeRequest create = (CreateNodeRequest)request; 1147 logger.trace(" creating {0} under {1} with properties {2}", 1148 create.named().getString(registry), 1149 create.under().getString(registry), 1150 readable(registry, create.properties())); 1151 } else if (request instanceof UpdatePropertiesRequest) { 1152 UpdatePropertiesRequest update = (UpdatePropertiesRequest)request; 1153 logger.trace(" updating {0} with properties {1}", update.on().getString(registry), readable(registry, 1154 update.properties() 1155 .values())); 1156 } else { 1157 logger.trace(" " + request.toString()); 1158 } 1159 } 1160 } 1161 1162 private void traceCacheUpdate( Request request ) { 1163 NamespaceRegistry registry = getExecutionContext().getNamespaceRegistry(); 1164 if (!(request instanceof ChangeRequest)) return; 1165 logger.trace("Updating cache:"); 1166 if (request instanceof CreateNodeRequest) { 1167 CreateNodeRequest create = (CreateNodeRequest)request; 1168 logger.trace(" creating {0} under {1} with properties {2}", 1169 create.named().getString(registry), 1170 create.under().getString(registry), 1171 readable(registry, create.properties())); 1172 } else if (request instanceof UpdatePropertiesRequest) { 1173 UpdatePropertiesRequest update = (UpdatePropertiesRequest)request; 1174 logger.trace(" updating {0} with properties {1}", update.on().getString(registry), readable(registry, 1175 update.properties() 1176 .values())); 1177 } else { 1178 logger.trace(" " + request.toString()); 1179 } 1180 } 1181 1182 private String readable( NamespaceRegistry registry, 1183 Collection<Property> properties ) { 1184 if (properties.isEmpty()) return ""; 1185 StringBuilder sb = new StringBuilder(); 1186 boolean first = true; 1187 for (Property prop : properties) { 1188 if (first) first = false; 1189 else sb.append(","); 1190 sb.append(prop.getString(registry)); 1191 } 1192 return sb.toString(); 1193 } 1194 }