001 /* 002 * JBoss, Home of Professional Open Source. 003 * Copyright 2008, Red Hat Middleware LLC, and individual contributors 004 * as indicated by the @author tags. See the copyright.txt file in the 005 * distribution for a full listing of individual contributors. 006 * 007 * This is free software; you can redistribute it and/or modify it 008 * under the terms of the GNU Lesser General Public License as 009 * published by the Free Software Foundation; either version 2.1 of 010 * the License, or (at your option) any later version. 011 * 012 * This software is distributed in the hope that it will be useful, 013 * but WITHOUT ANY WARRANTY; without even the implied warranty of 014 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 015 * Lesser General Public License for more details. 016 * 017 * You should have received a copy of the GNU Lesser General Public 018 * License along with this software; if not, write to the Free 019 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 020 * 02110-1301 USA, or see the FSF site: http://www.fsf.org. 021 */ 022 package org.jboss.dna.connector.federation.executor; 023 024 import java.util.ArrayList; 025 import java.util.Collection; 026 import java.util.Collections; 027 import java.util.HashMap; 028 import java.util.HashSet; 029 import java.util.LinkedList; 030 import java.util.List; 031 import java.util.Map; 032 import java.util.Set; 033 import java.util.UUID; 034 import java.util.concurrent.TimeUnit; 035 import net.jcip.annotations.NotThreadSafe; 036 import org.jboss.dna.common.i18n.I18n; 037 import org.jboss.dna.common.util.CheckArg; 038 import org.jboss.dna.common.util.Logger; 039 import org.jboss.dna.connector.federation.FederationI18n; 040 import org.jboss.dna.connector.federation.Projection; 041 import org.jboss.dna.connector.federation.contribution.Contribution; 042 import org.jboss.dna.connector.federation.merge.FederatedNode; 043 import org.jboss.dna.connector.federation.merge.MergePlan; 044 import org.jboss.dna.connector.federation.merge.strategy.MergeStrategy; 045 import org.jboss.dna.connector.federation.merge.strategy.OneContributionMergeStrategy; 046 import org.jboss.dna.connector.federation.merge.strategy.SimpleMergeStrategy; 047 import org.jboss.dna.graph.DnaLexicon; 048 import org.jboss.dna.graph.ExecutionContext; 049 import org.jboss.dna.graph.Location; 050 import org.jboss.dna.graph.cache.CachePolicy; 051 import org.jboss.dna.graph.connectors.RepositoryConnection; 052 import org.jboss.dna.graph.connectors.RepositoryConnectionFactory; 053 import org.jboss.dna.graph.connectors.RepositorySource; 054 import org.jboss.dna.graph.connectors.RepositorySourceException; 055 import org.jboss.dna.graph.properties.DateTime; 056 import org.jboss.dna.graph.properties.Path; 057 import org.jboss.dna.graph.properties.PathFactory; 058 import org.jboss.dna.graph.properties.PathNotFoundException; 059 import org.jboss.dna.graph.properties.Property; 060 import org.jboss.dna.graph.requests.CompositeRequest; 061 import org.jboss.dna.graph.requests.CopyBranchRequest; 062 import org.jboss.dna.graph.requests.CreateNodeRequest; 063 import org.jboss.dna.graph.requests.DeleteBranchRequest; 064 import org.jboss.dna.graph.requests.MoveBranchRequest; 065 import org.jboss.dna.graph.requests.ReadAllChildrenRequest; 066 import org.jboss.dna.graph.requests.ReadAllPropertiesRequest; 067 import org.jboss.dna.graph.requests.ReadNodeRequest; 068 import org.jboss.dna.graph.requests.Request; 069 import org.jboss.dna.graph.requests.UpdatePropertiesRequest; 070 import org.jboss.dna.graph.requests.processor.RequestProcessor; 071 072 /** 073 * @author Randall Hauch 074 */ 075 @NotThreadSafe 076 public class FederatingCommandExecutor extends RequestProcessor { 077 078 private final CachePolicy defaultCachePolicy; 079 private final Projection cacheProjection; 080 private final List<Projection> sourceProjections; 081 private final Set<String> sourceNames; 082 private final RepositoryConnectionFactory connectionFactory; 083 private MergeStrategy mergingStrategy; 084 /** The set of all connections, including the cache connection */ 085 private final Map<String, RepositoryConnection> connectionsBySourceName; 086 /** A direct reference to the cache connection */ 087 private RepositoryConnection cacheConnection; 088 private Logger logger; 089 090 /** 091 * Create a command executor that federates (merges) the information from multiple sources described by the source 092 * projections. The resulting command executor does not first consult a cache for the merged information; if a cache is 093 * desired, see 094 * {@link #FederatingCommandExecutor(ExecutionContext, String, Projection, CachePolicy, List, RepositoryConnectionFactory) 095 * constructor} that takes a {@link Projection cache projection}. 096 * 097 * @param context the execution context in which the executor will be run; may not be null 098 * @param sourceName the name of the {@link RepositorySource} that is making use of this executor; may not be null or empty 099 * @param sourceProjections the source projections; may not be null 100 * @param connectionFactory the factory for {@link RepositoryConnection} instances 101 */ 102 public FederatingCommandExecutor( ExecutionContext context, 103 String sourceName, 104 List<Projection> sourceProjections, 105 RepositoryConnectionFactory connectionFactory ) { 106 this(context, sourceName, null, null, sourceProjections, connectionFactory); 107 } 108 109 /** 110 * Create a command executor that federates (merges) the information from multiple sources described by the source 111 * projections. The resulting command executor will use the supplied {@link Projection cache projection} to identify the 112 * {@link Projection#getSourceName() repository source} for the cache as well as the {@link Projection#getRules() rules} for 113 * how the paths are mapped in the cache. This cache will be consulted first for the requested information, and will be kept 114 * up to date as changes are made to the federated information. 115 * 116 * @param context the execution context in which the executor will be run; may not be null 117 * @param sourceName the name of the {@link RepositorySource} that is making use of this executor; may not be null or empty 118 * @param cacheProjection the projection used for the cached information; may be null if there is no cache 119 * @param defaultCachePolicy the default caching policy that outlines the length of time that information should be cached, or 120 * null if there is no cache or no specific cache policy 121 * @param sourceProjections the source projections; may not be null 122 * @param connectionFactory the factory for {@link RepositoryConnection} instances 123 */ 124 public FederatingCommandExecutor( ExecutionContext context, 125 String sourceName, 126 Projection cacheProjection, 127 CachePolicy defaultCachePolicy, 128 List<Projection> sourceProjections, 129 RepositoryConnectionFactory connectionFactory ) { 130 super(sourceName, context); 131 CheckArg.isNotNull(sourceProjections, "sourceProjections"); 132 CheckArg.isNotNull(connectionFactory, "connectionFactory"); 133 assert cacheProjection != null ? defaultCachePolicy != null : defaultCachePolicy == null; 134 this.cacheProjection = cacheProjection; 135 this.defaultCachePolicy = defaultCachePolicy; 136 this.sourceProjections = sourceProjections; 137 this.connectionFactory = connectionFactory; 138 this.logger = context.getLogger(getClass()); 139 this.connectionsBySourceName = new HashMap<String, RepositoryConnection>(); 140 this.sourceNames = new HashSet<String>(); 141 for (Projection projection : this.sourceProjections) { 142 this.sourceNames.add(projection.getSourceName()); 143 } 144 setMergingStrategy(null); 145 } 146 147 /** 148 * @param mergingStrategy Sets mergingStrategy to the specified value. 149 */ 150 public void setMergingStrategy( MergeStrategy mergingStrategy ) { 151 if (mergingStrategy != null) { 152 this.mergingStrategy = mergingStrategy; 153 } else { 154 if (this.sourceProjections.size() == 1 && this.sourceProjections.get(0).isSimple()) { 155 this.mergingStrategy = new OneContributionMergeStrategy(); 156 } else { 157 this.mergingStrategy = new SimpleMergeStrategy(); 158 } 159 } 160 assert this.mergingStrategy != null; 161 } 162 163 /** 164 * Get an unmodifiable list of the immutable source projections. 165 * 166 * @return the set of projections used as sources; never null 167 */ 168 public List<Projection> getSourceProjections() { 169 return Collections.unmodifiableList(sourceProjections); 170 } 171 172 /** 173 * Get the projection defining the cache. 174 * 175 * @return the cache projection 176 */ 177 public Projection getCacheProjection() { 178 return cacheProjection; 179 } 180 181 protected DateTime getCurrentTimeInUtc() { 182 return getExecutionContext().getValueFactories().getDateFactory().createUtc(); 183 } 184 185 /** 186 * {@inheritDoc} 187 * 188 * @see RequestProcessor#close() 189 */ 190 @Override 191 public void close() { 192 try { 193 super.close(); 194 } finally { 195 // Make sure to close ALL open connections ... 196 for (RepositoryConnection connection : connectionsBySourceName.values()) { 197 if (connection == null) continue; 198 try { 199 connection.close(); 200 } catch (Throwable t) { 201 logger.debug("Error while closing connection to {0}", connection.getSourceName()); 202 } 203 } 204 connectionsBySourceName.clear(); 205 try { 206 if (this.cacheConnection != null) this.cacheConnection.close(); 207 } finally { 208 this.cacheConnection = null; 209 } 210 } 211 } 212 213 protected RepositoryConnection getConnectionToCache() throws RepositorySourceException { 214 if (this.cacheConnection == null) { 215 this.cacheConnection = getConnection(this.cacheProjection); 216 } 217 assert this.cacheConnection != null; 218 return this.cacheConnection; 219 } 220 221 protected RepositoryConnection getConnection( Projection projection ) throws RepositorySourceException { 222 String sourceName = projection.getSourceName(); 223 RepositoryConnection connection = connectionsBySourceName.get(sourceName); 224 if (connection == null) { 225 connection = connectionFactory.createConnection(sourceName); 226 connectionsBySourceName.put(sourceName, connection); 227 } 228 return connection; 229 } 230 231 protected Set<String> getOpenConnections() { 232 return connectionsBySourceName.keySet(); 233 } 234 235 /** 236 * {@inheritDoc} 237 * 238 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.ReadAllChildrenRequest) 239 */ 240 @Override 241 public void process( ReadAllChildrenRequest request ) { 242 ReadNodeRequest nodeInfo = getNode(request.of()); 243 if (nodeInfo.hasError()) return; 244 for (Location child : nodeInfo.getChildren()) { 245 request.addChild(child); 246 } 247 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 248 } 249 250 /** 251 * {@inheritDoc} 252 * 253 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.ReadAllPropertiesRequest) 254 */ 255 @Override 256 public void process( ReadAllPropertiesRequest request ) { 257 ReadNodeRequest nodeInfo = getNode(request.at()); 258 if (nodeInfo.hasError()) return; 259 for (Property property : nodeInfo.getProperties()) { 260 request.addProperty(property); 261 } 262 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 263 } 264 265 /** 266 * {@inheritDoc} 267 * 268 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.ReadNodeRequest) 269 */ 270 @Override 271 public void process( ReadNodeRequest request ) { 272 ReadNodeRequest nodeInfo = getNode(request.at()); 273 if (nodeInfo.hasError()) return; 274 for (Property property : nodeInfo.getProperties()) { 275 request.addProperty(property); 276 } 277 for (Location child : nodeInfo.getChildren()) { 278 request.addChild(child); 279 } 280 request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode()); 281 } 282 283 /** 284 * {@inheritDoc} 285 * 286 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.CopyBranchRequest) 287 */ 288 @Override 289 public void process( CopyBranchRequest request ) { 290 throw new UnsupportedOperationException(); 291 } 292 293 /** 294 * {@inheritDoc} 295 * 296 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.CreateNodeRequest) 297 */ 298 @Override 299 public void process( CreateNodeRequest request ) { 300 throw new UnsupportedOperationException(); 301 } 302 303 /** 304 * {@inheritDoc} 305 * 306 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.DeleteBranchRequest) 307 */ 308 @Override 309 public void process( DeleteBranchRequest request ) { 310 throw new UnsupportedOperationException(); 311 } 312 313 /** 314 * {@inheritDoc} 315 * 316 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.MoveBranchRequest) 317 */ 318 @Override 319 public void process( MoveBranchRequest request ) { 320 throw new UnsupportedOperationException(); 321 } 322 323 /** 324 * {@inheritDoc} 325 * 326 * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.UpdatePropertiesRequest) 327 */ 328 @Override 329 public void process( UpdatePropertiesRequest request ) { 330 throw new UnsupportedOperationException(); 331 } 332 333 /** 334 * Get the node information from the underlying sources or, if possible, from the cache. 335 * 336 * @param location the location of the node to be returned 337 * @return the node information 338 * @throws RepositorySourceException 339 */ 340 protected ReadNodeRequest getNode( Location location ) throws RepositorySourceException { 341 // Check the cache first ... 342 final ExecutionContext context = getExecutionContext(); 343 RepositoryConnection cacheConnection = getConnectionToCache(); 344 ReadNodeRequest fromCache = new ReadNodeRequest(location); 345 cacheConnection.execute(context, fromCache); 346 347 // Look at the cache results from the cache for problems, or if found a plan in the cache look 348 // at the contributions. We'll be putting together the set of source names for which we need to 349 // get the contributions. 350 Set<String> sourceNames = null; 351 List<Contribution> contributions = new LinkedList<Contribution>(); 352 353 if (fromCache.hasError()) { 354 Throwable error = fromCache.getError(); 355 if (!(error instanceof PathNotFoundException)) return fromCache; 356 357 // The path was not found in the cache, so since we don't know whether the ancestors are federated 358 // from multiple source nodes, we need to populate the cache starting with the lowest ancestor 359 // that already exists in the cache. 360 PathNotFoundException notFound = (PathNotFoundException)fromCache.getError(); 361 Path lowestExistingAncestor = notFound.getLowestAncestorThatDoesExist(); 362 if (location.hasPath()) { 363 Path path = location.getPath(); 364 Path ancestor = path.getParent(); 365 if (!ancestor.equals(lowestExistingAncestor)) { 366 // Load the nodes along the path below the existing ancestor, down to (but excluding) the desired path 367 Path pathToLoad = ancestor; 368 while (!pathToLoad.equals(lowestExistingAncestor)) { 369 Location locationToLoad = new Location(pathToLoad); 370 loadContributionsFromSources(locationToLoad, null, contributions); // sourceNames may be null or empty 371 FederatedNode mergedNode = createFederatedNode(locationToLoad, contributions, true); 372 if (mergedNode == null) { 373 // No source had a contribution ... 374 I18n msg = FederationI18n.nodeDoesNotExistAtPath; 375 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(path, ancestor))); 376 return fromCache; 377 } 378 contributions.clear(); 379 // Move to the next child along the path ... 380 pathToLoad = pathToLoad.getParent(); 381 } 382 } 383 384 } 385 386 // At this point, all ancestors exist ... 387 } else { 388 // There is no error, so look for the merge plan ... 389 MergePlan mergePlan = getMergePlan(fromCache); 390 if (mergePlan != null) { 391 // We found the merge plan, so check whether it's still valid ... 392 final DateTime now = getCurrentTimeInUtc(); 393 if (mergePlan.isExpired(now)) { 394 // It is still valid, so check whether any contribution is from a non-existant projection ... 395 for (Contribution contribution : mergePlan) { 396 if (!this.sourceNames.contains(contribution.getSourceName())) { 397 // TODO: Record that the cached contribution is from a source that is no longer in this repository 398 } 399 } 400 return fromCache; 401 } 402 403 // At least one of the contributions is expired, so go through the contributions and place 404 // the valid contributions in the 'contributions' list; any expired contribution 405 // needs to be loaded by adding the name to the 'sourceNames' 406 if (mergePlan.getContributionCount() > 0) { 407 sourceNames = new HashSet<String>(sourceNames); 408 for (Contribution contribution : mergePlan) { 409 if (!contribution.isExpired(now)) { 410 sourceNames.remove(contribution.getSourceName()); 411 contributions.add(contribution); 412 } 413 } 414 } 415 } 416 } 417 418 // Get the contributions from the sources given their names ... 419 location = fromCache.getActualLocationOfNode(); 420 if (location == null) location = fromCache.at(); // not yet in the cache 421 loadContributionsFromSources(location, sourceNames, contributions); // sourceNames may be null or empty 422 FederatedNode mergedNode = createFederatedNode(location, contributions, true); 423 if (mergedNode == null) { 424 // No source had a contribution ... 425 if (location.hasPath()) { 426 Path ancestor = location.getPath().getParent(); 427 I18n msg = FederationI18n.nodeDoesNotExistAtPath; 428 fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(location, ancestor))); 429 return fromCache; 430 } 431 I18n msg = FederationI18n.nodeDoesNotExistAtLocation; 432 fromCache.setError(new PathNotFoundException(location, null, msg.text(location))); 433 return fromCache; 434 } 435 return mergedNode; 436 } 437 438 protected FederatedNode createFederatedNode( Location location, 439 List<Contribution> contributions, 440 boolean updateCache ) throws RepositorySourceException { 441 assert location != null; 442 443 // If there are no contributions from any source ... 444 boolean foundNonEmptyContribution = false; 445 for (Contribution contribution : contributions) { 446 assert contribution != null; 447 if (!contribution.isEmpty()) { 448 foundNonEmptyContribution = true; 449 break; 450 } 451 } 452 if (!foundNonEmptyContribution) return null; 453 if (logger.isTraceEnabled()) { 454 logger.trace("Loaded {0} from sources, resulting in these contributions:", location); 455 int i = 0; 456 for (Contribution contribution : contributions) { 457 logger.trace(" {0} {1}", ++i, contribution); 458 } 459 } 460 461 // Create the node, and use the existing UUID if one is found in the cache ... 462 ExecutionContext context = getExecutionContext(); 463 assert context != null; 464 UUID uuid = null; 465 Property uuidProperty = location.getIdProperty(DnaLexicon.UUID); 466 // If the actual location has no UUID identification property ... 467 if (uuidProperty == null || uuidProperty.isEmpty()) { 468 uuid = context.getValueFactories().getUuidFactory().create(); 469 uuidProperty = context.getPropertyFactory().create(DnaLexicon.UUID, uuid); 470 // Replace the actual location with one that includes the new UUID property ... 471 location = location.with(uuidProperty); 472 } else { 473 assert uuidProperty.isEmpty() == false; 474 uuid = context.getValueFactories().getUuidFactory().create(uuidProperty.getValues().next()); 475 } 476 assert uuid != null; 477 FederatedNode mergedNode = new FederatedNode(location, uuid); 478 479 // Merge the results into a single set of results ... 480 assert contributions.size() > 0; 481 mergingStrategy.merge(mergedNode, contributions, context); 482 if (mergedNode.getCachePolicy() == null) { 483 mergedNode.setCachePolicy(defaultCachePolicy); 484 } 485 if (updateCache) { 486 // Place the results into the cache ... 487 updateCache(mergedNode); 488 } 489 // And return the results ... 490 return mergedNode; 491 } 492 493 /** 494 * Load the node at the supplied location from the sources with the supplied name, returning the information. This method 495 * always obtains the information from the sources and does not use or update the cache. 496 * 497 * @param location the location of the node that is to be loaded 498 * @param sourceNames the names of the sources from which contributions are to be loaded; may be empty or null if all 499 * contributions from all sources are to be loaded 500 * @param contributions the list into which the contributions are to be placed 501 * @throws RepositorySourceException 502 */ 503 protected void loadContributionsFromSources( Location location, 504 Set<String> sourceNames, 505 List<Contribution> contributions ) throws RepositorySourceException { 506 // At this point, there is no merge plan, so read information from the sources ... 507 final ExecutionContext context = getExecutionContext(); 508 final PathFactory pathFactory = context.getValueFactories().getPathFactory(); 509 510 // If the location has no path, then we have to submit a request to ALL sources ... 511 if (!location.hasPath()) { 512 for (Projection projection : this.sourceProjections) { 513 final String source = projection.getSourceName(); 514 if (sourceNames != null && !sourceNames.contains(source)) continue; 515 final RepositoryConnection sourceConnection = getConnection(projection); 516 if (sourceConnection == null) continue; // No source exists by this name 517 // Get the cached information ... 518 CachePolicy cachePolicy = sourceConnection.getDefaultCachePolicy(); 519 if (cachePolicy == null) cachePolicy = this.defaultCachePolicy; 520 DateTime expirationTime = null; 521 if (cachePolicy != null) { 522 expirationTime = getCurrentTimeInUtc().plus(cachePolicy.getTimeToLive(), TimeUnit.MILLISECONDS); 523 } 524 // Submit the request ... 525 ReadNodeRequest request = new ReadNodeRequest(location); 526 sourceConnection.execute(context, request); 527 if (request.hasError()) continue; 528 DateTime expTime = request.getCachePolicy() == null ? expirationTime : getCurrentTimeInUtc().plus(request.getCachePolicy().getTimeToLive(), 529 TimeUnit.MILLISECONDS); 530 // Convert the locations of the children (relative to the source) to be relative to this node 531 Contribution contribution = Contribution.create(source, 532 request.getActualLocationOfNode(), 533 expTime, 534 request.getProperties(), 535 request.getChildren()); 536 contributions.add(contribution); 537 } 538 } 539 540 // Otherwise, we can do it by path and projections ... 541 Path path = location.getPath(); 542 for (Projection projection : this.sourceProjections) { 543 final String source = projection.getSourceName(); 544 if (sourceNames != null && !sourceNames.contains(source)) continue; 545 final RepositoryConnection sourceConnection = getConnection(projection); 546 if (sourceConnection == null) continue; // No source exists by this name 547 // Get the cached information ... 548 CachePolicy cachePolicy = sourceConnection.getDefaultCachePolicy(); 549 if (cachePolicy == null) cachePolicy = this.defaultCachePolicy; 550 DateTime expirationTime = null; 551 if (cachePolicy != null) { 552 expirationTime = getCurrentTimeInUtc().plus(cachePolicy.getTimeToLive(), TimeUnit.MILLISECONDS); 553 } 554 // Get the paths-in-source where we should fetch node contributions ... 555 Set<Path> pathsInSource = projection.getPathsInSource(path, pathFactory); 556 if (pathsInSource.isEmpty()) { 557 // The source has no contributions, but see whether the project exists BELOW this path. 558 // We do this by getting the top-level repository paths of the projection, and then 559 // use those to figure out the children of the nodes. 560 Contribution contribution = null; 561 List<Path> topLevelPaths = projection.getTopLevelPathsInRepository(pathFactory); 562 Location input = new Location(path); 563 switch (topLevelPaths.size()) { 564 case 0: 565 break; 566 case 1: { 567 Path topLevelPath = topLevelPaths.iterator().next(); 568 if (path.isAncestorOf(topLevelPath)) { 569 Location child = new Location(topLevelPath); 570 contribution = Contribution.createPlaceholder(source, input, expirationTime, child); 571 } 572 break; 573 } 574 default: { 575 // We assume that the top-level paths do not overlap ... 576 List<Location> children = new ArrayList<Location>(topLevelPaths.size()); 577 for (Path topLevelPath : topLevelPaths) { 578 if (path.isAncestorOf(topLevelPath)) { 579 children.add(new Location(topLevelPath)); 580 } 581 } 582 if (children.size() > 0) { 583 contribution = Contribution.createPlaceholder(source, input, expirationTime, children); 584 } 585 } 586 } 587 if (contribution == null) contribution = Contribution.create(source, expirationTime); 588 contributions.add(contribution); 589 } else { 590 // There is at least one (real) contribution ... 591 592 // Get the contributions ... 593 final int numPaths = pathsInSource.size(); 594 if (numPaths == 1) { 595 Path pathInSource = pathsInSource.iterator().next(); 596 ReadNodeRequest fromSource = new ReadNodeRequest(new Location(pathInSource)); 597 sourceConnection.execute(getExecutionContext(), fromSource); 598 if (!fromSource.hasError()) { 599 Collection<Property> properties = fromSource.getProperties(); 600 List<Location> children = fromSource.getChildren(); 601 DateTime expTime = fromSource.getCachePolicy() == null ? expirationTime : getCurrentTimeInUtc().plus(fromSource.getCachePolicy().getTimeToLive(), 602 TimeUnit.MILLISECONDS); 603 Location actualLocation = fromSource.getActualLocationOfNode(); 604 Contribution contribution = Contribution.create(source, actualLocation, expTime, properties, children); 605 contributions.add(contribution); 606 } 607 } else { 608 List<ReadNodeRequest> fromSourceCommands = new ArrayList<ReadNodeRequest>(numPaths); 609 for (Path pathInSource : pathsInSource) { 610 fromSourceCommands.add(new ReadNodeRequest(new Location(pathInSource))); 611 } 612 Request request = CompositeRequest.with(fromSourceCommands); 613 sourceConnection.execute(context, request); 614 for (ReadNodeRequest fromSource : fromSourceCommands) { 615 if (fromSource.hasError()) continue; 616 DateTime expTime = fromSource.getCachePolicy() == null ? expirationTime : getCurrentTimeInUtc().plus(fromSource.getCachePolicy().getTimeToLive(), 617 TimeUnit.MILLISECONDS); 618 List<Location> children = fromSource.getChildren(); 619 Contribution contribution = Contribution.create(source, 620 fromSource.getActualLocationOfNode(), 621 expTime, 622 fromSource.getProperties(), 623 children); 624 contributions.add(contribution); 625 } 626 } 627 } 628 } 629 } 630 631 protected MergePlan getMergePlan( ReadNodeRequest request ) { 632 Property mergePlanProperty = request.getPropertiesByName().get(DnaLexicon.MERGE_PLAN); 633 if (mergePlanProperty == null || mergePlanProperty.isEmpty()) { 634 return null; 635 } 636 Object value = mergePlanProperty.getValues().next(); 637 return value instanceof MergePlan ? (MergePlan)value : null; 638 } 639 640 protected void updateCache( FederatedNode mergedNode ) throws RepositorySourceException { 641 final ExecutionContext context = getExecutionContext(); 642 final RepositoryConnection cacheConnection = getConnectionToCache(); 643 final Location path = mergedNode.at(); 644 645 List<Request> requests = new ArrayList<Request>(); 646 requests.add(new CreateNodeRequest(path, mergedNode.getProperties())); 647 for (Location child : mergedNode.getChildren()) { 648 requests.add(new CreateNodeRequest(child)); 649 } 650 cacheConnection.execute(context, CompositeRequest.with(requests)); 651 } 652 }