001    /*
002     * JBoss, Home of Professional Open Source.
003     * Copyright 2008, Red Hat Middleware LLC, and individual contributors
004     * as indicated by the @author tags. See the copyright.txt file in the
005     * distribution for a full listing of individual contributors. 
006     *
007     * This is free software; you can redistribute it and/or modify it
008     * under the terms of the GNU Lesser General Public License as
009     * published by the Free Software Foundation; either version 2.1 of
010     * the License, or (at your option) any later version.
011     *
012     * This software is distributed in the hope that it will be useful,
013     * but WITHOUT ANY WARRANTY; without even the implied warranty of
014     * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
015     * Lesser General Public License for more details.
016     *
017     * You should have received a copy of the GNU Lesser General Public
018     * License along with this software; if not, write to the Free
019     * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
020     * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
021     */
022    package org.jboss.dna.connector.federation.executor;
023    
024    import java.util.ArrayList;
025    import java.util.Collection;
026    import java.util.Collections;
027    import java.util.HashMap;
028    import java.util.HashSet;
029    import java.util.LinkedList;
030    import java.util.List;
031    import java.util.Map;
032    import java.util.Set;
033    import java.util.UUID;
034    import java.util.concurrent.TimeUnit;
035    import net.jcip.annotations.NotThreadSafe;
036    import org.jboss.dna.common.i18n.I18n;
037    import org.jboss.dna.common.util.CheckArg;
038    import org.jboss.dna.common.util.Logger;
039    import org.jboss.dna.connector.federation.FederationI18n;
040    import org.jboss.dna.connector.federation.Projection;
041    import org.jboss.dna.connector.federation.contribution.Contribution;
042    import org.jboss.dna.connector.federation.merge.FederatedNode;
043    import org.jboss.dna.connector.federation.merge.MergePlan;
044    import org.jboss.dna.connector.federation.merge.strategy.MergeStrategy;
045    import org.jboss.dna.connector.federation.merge.strategy.OneContributionMergeStrategy;
046    import org.jboss.dna.connector.federation.merge.strategy.SimpleMergeStrategy;
047    import org.jboss.dna.graph.DnaLexicon;
048    import org.jboss.dna.graph.ExecutionContext;
049    import org.jboss.dna.graph.Location;
050    import org.jboss.dna.graph.cache.CachePolicy;
051    import org.jboss.dna.graph.connectors.RepositoryConnection;
052    import org.jboss.dna.graph.connectors.RepositoryConnectionFactory;
053    import org.jboss.dna.graph.connectors.RepositorySource;
054    import org.jboss.dna.graph.connectors.RepositorySourceException;
055    import org.jboss.dna.graph.properties.DateTime;
056    import org.jboss.dna.graph.properties.Path;
057    import org.jboss.dna.graph.properties.PathFactory;
058    import org.jboss.dna.graph.properties.PathNotFoundException;
059    import org.jboss.dna.graph.properties.Property;
060    import org.jboss.dna.graph.requests.CompositeRequest;
061    import org.jboss.dna.graph.requests.CopyBranchRequest;
062    import org.jboss.dna.graph.requests.CreateNodeRequest;
063    import org.jboss.dna.graph.requests.DeleteBranchRequest;
064    import org.jboss.dna.graph.requests.MoveBranchRequest;
065    import org.jboss.dna.graph.requests.ReadAllChildrenRequest;
066    import org.jboss.dna.graph.requests.ReadAllPropertiesRequest;
067    import org.jboss.dna.graph.requests.ReadNodeRequest;
068    import org.jboss.dna.graph.requests.Request;
069    import org.jboss.dna.graph.requests.UpdatePropertiesRequest;
070    import org.jboss.dna.graph.requests.processor.RequestProcessor;
071    
072    /**
073     * @author Randall Hauch
074     */
075    @NotThreadSafe
076    public class FederatingCommandExecutor extends RequestProcessor {
077    
078        private final CachePolicy defaultCachePolicy;
079        private final Projection cacheProjection;
080        private final List<Projection> sourceProjections;
081        private final Set<String> sourceNames;
082        private final RepositoryConnectionFactory connectionFactory;
083        private MergeStrategy mergingStrategy;
084        /** The set of all connections, including the cache connection */
085        private final Map<String, RepositoryConnection> connectionsBySourceName;
086        /** A direct reference to the cache connection */
087        private RepositoryConnection cacheConnection;
088        private Logger logger;
089    
090        /**
091         * Create a command executor that federates (merges) the information from multiple sources described by the source
092         * projections. The resulting command executor does not first consult a cache for the merged information; if a cache is
093         * desired, see
094         * {@link #FederatingCommandExecutor(ExecutionContext, String, Projection, CachePolicy, List, RepositoryConnectionFactory)
095         * constructor} that takes a {@link Projection cache projection}.
096         * 
097         * @param context the execution context in which the executor will be run; may not be null
098         * @param sourceName the name of the {@link RepositorySource} that is making use of this executor; may not be null or empty
099         * @param sourceProjections the source projections; may not be null
100         * @param connectionFactory the factory for {@link RepositoryConnection} instances
101         */
102        public FederatingCommandExecutor( ExecutionContext context,
103                                          String sourceName,
104                                          List<Projection> sourceProjections,
105                                          RepositoryConnectionFactory connectionFactory ) {
106            this(context, sourceName, null, null, sourceProjections, connectionFactory);
107        }
108    
109        /**
110         * Create a command executor that federates (merges) the information from multiple sources described by the source
111         * projections. The resulting command executor will use the supplied {@link Projection cache projection} to identify the
112         * {@link Projection#getSourceName() repository source} for the cache as well as the {@link Projection#getRules() rules} for
113         * how the paths are mapped in the cache. This cache will be consulted first for the requested information, and will be kept
114         * up to date as changes are made to the federated information.
115         * 
116         * @param context the execution context in which the executor will be run; may not be null
117         * @param sourceName the name of the {@link RepositorySource} that is making use of this executor; may not be null or empty
118         * @param cacheProjection the projection used for the cached information; may be null if there is no cache
119         * @param defaultCachePolicy the default caching policy that outlines the length of time that information should be cached, or
120         *        null if there is no cache or no specific cache policy
121         * @param sourceProjections the source projections; may not be null
122         * @param connectionFactory the factory for {@link RepositoryConnection} instances
123         */
124        public FederatingCommandExecutor( ExecutionContext context,
125                                          String sourceName,
126                                          Projection cacheProjection,
127                                          CachePolicy defaultCachePolicy,
128                                          List<Projection> sourceProjections,
129                                          RepositoryConnectionFactory connectionFactory ) {
130            super(sourceName, context);
131            CheckArg.isNotNull(sourceProjections, "sourceProjections");
132            CheckArg.isNotNull(connectionFactory, "connectionFactory");
133            assert cacheProjection != null ? defaultCachePolicy != null : defaultCachePolicy == null;
134            this.cacheProjection = cacheProjection;
135            this.defaultCachePolicy = defaultCachePolicy;
136            this.sourceProjections = sourceProjections;
137            this.connectionFactory = connectionFactory;
138            this.logger = context.getLogger(getClass());
139            this.connectionsBySourceName = new HashMap<String, RepositoryConnection>();
140            this.sourceNames = new HashSet<String>();
141            for (Projection projection : this.sourceProjections) {
142                this.sourceNames.add(projection.getSourceName());
143            }
144            setMergingStrategy(null);
145        }
146    
147        /**
148         * @param mergingStrategy Sets mergingStrategy to the specified value.
149         */
150        public void setMergingStrategy( MergeStrategy mergingStrategy ) {
151            if (mergingStrategy != null) {
152                this.mergingStrategy = mergingStrategy;
153            } else {
154                if (this.sourceProjections.size() == 1 && this.sourceProjections.get(0).isSimple()) {
155                    this.mergingStrategy = new OneContributionMergeStrategy();
156                } else {
157                    this.mergingStrategy = new SimpleMergeStrategy();
158                }
159            }
160            assert this.mergingStrategy != null;
161        }
162    
163        /**
164         * Get an unmodifiable list of the immutable source projections.
165         * 
166         * @return the set of projections used as sources; never null
167         */
168        public List<Projection> getSourceProjections() {
169            return Collections.unmodifiableList(sourceProjections);
170        }
171    
172        /**
173         * Get the projection defining the cache.
174         * 
175         * @return the cache projection
176         */
177        public Projection getCacheProjection() {
178            return cacheProjection;
179        }
180    
181        protected DateTime getCurrentTimeInUtc() {
182            return getExecutionContext().getValueFactories().getDateFactory().createUtc();
183        }
184    
185        /**
186         * {@inheritDoc}
187         * 
188         * @see RequestProcessor#close()
189         */
190        @Override
191        public void close() {
192            try {
193                super.close();
194            } finally {
195                // Make sure to close ALL open connections ...
196                for (RepositoryConnection connection : connectionsBySourceName.values()) {
197                    if (connection == null) continue;
198                    try {
199                        connection.close();
200                    } catch (Throwable t) {
201                        logger.debug("Error while closing connection to {0}", connection.getSourceName());
202                    }
203                }
204                connectionsBySourceName.clear();
205                try {
206                    if (this.cacheConnection != null) this.cacheConnection.close();
207                } finally {
208                    this.cacheConnection = null;
209                }
210            }
211        }
212    
213        protected RepositoryConnection getConnectionToCache() throws RepositorySourceException {
214            if (this.cacheConnection == null) {
215                this.cacheConnection = getConnection(this.cacheProjection);
216            }
217            assert this.cacheConnection != null;
218            return this.cacheConnection;
219        }
220    
221        protected RepositoryConnection getConnection( Projection projection ) throws RepositorySourceException {
222            String sourceName = projection.getSourceName();
223            RepositoryConnection connection = connectionsBySourceName.get(sourceName);
224            if (connection == null) {
225                connection = connectionFactory.createConnection(sourceName);
226                connectionsBySourceName.put(sourceName, connection);
227            }
228            return connection;
229        }
230    
231        protected Set<String> getOpenConnections() {
232            return connectionsBySourceName.keySet();
233        }
234    
235        /**
236         * {@inheritDoc}
237         * 
238         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.ReadAllChildrenRequest)
239         */
240        @Override
241        public void process( ReadAllChildrenRequest request ) {
242            ReadNodeRequest nodeInfo = getNode(request.of());
243            if (nodeInfo.hasError()) return;
244            for (Location child : nodeInfo.getChildren()) {
245                request.addChild(child);
246            }
247            request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode());
248        }
249    
250        /**
251         * {@inheritDoc}
252         * 
253         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.ReadAllPropertiesRequest)
254         */
255        @Override
256        public void process( ReadAllPropertiesRequest request ) {
257            ReadNodeRequest nodeInfo = getNode(request.at());
258            if (nodeInfo.hasError()) return;
259            for (Property property : nodeInfo.getProperties()) {
260                request.addProperty(property);
261            }
262            request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode());
263        }
264    
265        /**
266         * {@inheritDoc}
267         * 
268         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.ReadNodeRequest)
269         */
270        @Override
271        public void process( ReadNodeRequest request ) {
272            ReadNodeRequest nodeInfo = getNode(request.at());
273            if (nodeInfo.hasError()) return;
274            for (Property property : nodeInfo.getProperties()) {
275                request.addProperty(property);
276            }
277            for (Location child : nodeInfo.getChildren()) {
278                request.addChild(child);
279            }
280            request.setActualLocationOfNode(nodeInfo.getActualLocationOfNode());
281        }
282    
283        /**
284         * {@inheritDoc}
285         * 
286         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.CopyBranchRequest)
287         */
288        @Override
289        public void process( CopyBranchRequest request ) {
290            throw new UnsupportedOperationException();
291        }
292    
293        /**
294         * {@inheritDoc}
295         * 
296         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.CreateNodeRequest)
297         */
298        @Override
299        public void process( CreateNodeRequest request ) {
300            throw new UnsupportedOperationException();
301        }
302    
303        /**
304         * {@inheritDoc}
305         * 
306         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.DeleteBranchRequest)
307         */
308        @Override
309        public void process( DeleteBranchRequest request ) {
310            throw new UnsupportedOperationException();
311        }
312    
313        /**
314         * {@inheritDoc}
315         * 
316         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.MoveBranchRequest)
317         */
318        @Override
319        public void process( MoveBranchRequest request ) {
320            throw new UnsupportedOperationException();
321        }
322    
323        /**
324         * {@inheritDoc}
325         * 
326         * @see org.jboss.dna.graph.requests.processor.RequestProcessor#process(org.jboss.dna.graph.requests.UpdatePropertiesRequest)
327         */
328        @Override
329        public void process( UpdatePropertiesRequest request ) {
330            throw new UnsupportedOperationException();
331        }
332    
333        /**
334         * Get the node information from the underlying sources or, if possible, from the cache.
335         * 
336         * @param location the location of the node to be returned
337         * @return the node information
338         * @throws RepositorySourceException
339         */
340        protected ReadNodeRequest getNode( Location location ) throws RepositorySourceException {
341            // Check the cache first ...
342            final ExecutionContext context = getExecutionContext();
343            RepositoryConnection cacheConnection = getConnectionToCache();
344            ReadNodeRequest fromCache = new ReadNodeRequest(location);
345            cacheConnection.execute(context, fromCache);
346    
347            // Look at the cache results from the cache for problems, or if found a plan in the cache look
348            // at the contributions. We'll be putting together the set of source names for which we need to
349            // get the contributions.
350            Set<String> sourceNames = null;
351            List<Contribution> contributions = new LinkedList<Contribution>();
352    
353            if (fromCache.hasError()) {
354                Throwable error = fromCache.getError();
355                if (!(error instanceof PathNotFoundException)) return fromCache;
356    
357                // The path was not found in the cache, so since we don't know whether the ancestors are federated
358                // from multiple source nodes, we need to populate the cache starting with the lowest ancestor
359                // that already exists in the cache.
360                PathNotFoundException notFound = (PathNotFoundException)fromCache.getError();
361                Path lowestExistingAncestor = notFound.getLowestAncestorThatDoesExist();
362                if (location.hasPath()) {
363                    Path path = location.getPath();
364                    Path ancestor = path.getParent();
365                    if (!ancestor.equals(lowestExistingAncestor)) {
366                        // Load the nodes along the path below the existing ancestor, down to (but excluding) the desired path
367                        Path pathToLoad = ancestor;
368                        while (!pathToLoad.equals(lowestExistingAncestor)) {
369                            Location locationToLoad = new Location(pathToLoad);
370                            loadContributionsFromSources(locationToLoad, null, contributions); // sourceNames may be null or empty
371                            FederatedNode mergedNode = createFederatedNode(locationToLoad, contributions, true);
372                            if (mergedNode == null) {
373                                // No source had a contribution ...
374                                I18n msg = FederationI18n.nodeDoesNotExistAtPath;
375                                fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(path, ancestor)));
376                                return fromCache;
377                            }
378                            contributions.clear();
379                            // Move to the next child along the path ...
380                            pathToLoad = pathToLoad.getParent();
381                        }
382                    }
383    
384                }
385    
386                // At this point, all ancestors exist ...
387            } else {
388                // There is no error, so look for the merge plan ...
389                MergePlan mergePlan = getMergePlan(fromCache);
390                if (mergePlan != null) {
391                    // We found the merge plan, so check whether it's still valid ...
392                    final DateTime now = getCurrentTimeInUtc();
393                    if (mergePlan.isExpired(now)) {
394                        // It is still valid, so check whether any contribution is from a non-existant projection ...
395                        for (Contribution contribution : mergePlan) {
396                            if (!this.sourceNames.contains(contribution.getSourceName())) {
397                                // TODO: Record that the cached contribution is from a source that is no longer in this repository
398                            }
399                        }
400                        return fromCache;
401                    }
402    
403                    // At least one of the contributions is expired, so go through the contributions and place
404                    // the valid contributions in the 'contributions' list; any expired contribution
405                    // needs to be loaded by adding the name to the 'sourceNames'
406                    if (mergePlan.getContributionCount() > 0) {
407                        sourceNames = new HashSet<String>(sourceNames);
408                        for (Contribution contribution : mergePlan) {
409                            if (!contribution.isExpired(now)) {
410                                sourceNames.remove(contribution.getSourceName());
411                                contributions.add(contribution);
412                            }
413                        }
414                    }
415                }
416            }
417    
418            // Get the contributions from the sources given their names ...
419            location = fromCache.getActualLocationOfNode();
420            if (location == null) location = fromCache.at(); // not yet in the cache
421            loadContributionsFromSources(location, sourceNames, contributions); // sourceNames may be null or empty
422            FederatedNode mergedNode = createFederatedNode(location, contributions, true);
423            if (mergedNode == null) {
424                // No source had a contribution ...
425                if (location.hasPath()) {
426                    Path ancestor = location.getPath().getParent();
427                    I18n msg = FederationI18n.nodeDoesNotExistAtPath;
428                    fromCache.setError(new PathNotFoundException(location, ancestor, msg.text(location, ancestor)));
429                    return fromCache;
430                }
431                I18n msg = FederationI18n.nodeDoesNotExistAtLocation;
432                fromCache.setError(new PathNotFoundException(location, null, msg.text(location)));
433                return fromCache;
434            }
435            return mergedNode;
436        }
437    
438        protected FederatedNode createFederatedNode( Location location,
439                                                     List<Contribution> contributions,
440                                                     boolean updateCache ) throws RepositorySourceException {
441            assert location != null;
442    
443            // If there are no contributions from any source ...
444            boolean foundNonEmptyContribution = false;
445            for (Contribution contribution : contributions) {
446                assert contribution != null;
447                if (!contribution.isEmpty()) {
448                    foundNonEmptyContribution = true;
449                    break;
450                }
451            }
452            if (!foundNonEmptyContribution) return null;
453            if (logger.isTraceEnabled()) {
454                logger.trace("Loaded {0} from sources, resulting in these contributions:", location);
455                int i = 0;
456                for (Contribution contribution : contributions) {
457                    logger.trace("  {0} {1}", ++i, contribution);
458                }
459            }
460    
461            // Create the node, and use the existing UUID if one is found in the cache ...
462            ExecutionContext context = getExecutionContext();
463            assert context != null;
464            UUID uuid = null;
465            Property uuidProperty = location.getIdProperty(DnaLexicon.UUID);
466            // If the actual location has no UUID identification property ...
467            if (uuidProperty == null || uuidProperty.isEmpty()) {
468                uuid = context.getValueFactories().getUuidFactory().create();
469                uuidProperty = context.getPropertyFactory().create(DnaLexicon.UUID, uuid);
470                // Replace the actual location with one that includes the new UUID property ...
471                location = location.with(uuidProperty);
472            } else {
473                assert uuidProperty.isEmpty() == false;
474                uuid = context.getValueFactories().getUuidFactory().create(uuidProperty.getValues().next());
475            }
476            assert uuid != null;
477            FederatedNode mergedNode = new FederatedNode(location, uuid);
478    
479            // Merge the results into a single set of results ...
480            assert contributions.size() > 0;
481            mergingStrategy.merge(mergedNode, contributions, context);
482            if (mergedNode.getCachePolicy() == null) {
483                mergedNode.setCachePolicy(defaultCachePolicy);
484            }
485            if (updateCache) {
486                // Place the results into the cache ...
487                updateCache(mergedNode);
488            }
489            // And return the results ...
490            return mergedNode;
491        }
492    
493        /**
494         * Load the node at the supplied location from the sources with the supplied name, returning the information. This method
495         * always obtains the information from the sources and does not use or update the cache.
496         * 
497         * @param location the location of the node that is to be loaded
498         * @param sourceNames the names of the sources from which contributions are to be loaded; may be empty or null if all
499         *        contributions from all sources are to be loaded
500         * @param contributions the list into which the contributions are to be placed
501         * @throws RepositorySourceException
502         */
503        protected void loadContributionsFromSources( Location location,
504                                                     Set<String> sourceNames,
505                                                     List<Contribution> contributions ) throws RepositorySourceException {
506            // At this point, there is no merge plan, so read information from the sources ...
507            final ExecutionContext context = getExecutionContext();
508            final PathFactory pathFactory = context.getValueFactories().getPathFactory();
509    
510            // If the location has no path, then we have to submit a request to ALL sources ...
511            if (!location.hasPath()) {
512                for (Projection projection : this.sourceProjections) {
513                    final String source = projection.getSourceName();
514                    if (sourceNames != null && !sourceNames.contains(source)) continue;
515                    final RepositoryConnection sourceConnection = getConnection(projection);
516                    if (sourceConnection == null) continue; // No source exists by this name
517                    // Get the cached information ...
518                    CachePolicy cachePolicy = sourceConnection.getDefaultCachePolicy();
519                    if (cachePolicy == null) cachePolicy = this.defaultCachePolicy;
520                    DateTime expirationTime = null;
521                    if (cachePolicy != null) {
522                        expirationTime = getCurrentTimeInUtc().plus(cachePolicy.getTimeToLive(), TimeUnit.MILLISECONDS);
523                    }
524                    // Submit the request ...
525                    ReadNodeRequest request = new ReadNodeRequest(location);
526                    sourceConnection.execute(context, request);
527                    if (request.hasError()) continue;
528                    DateTime expTime = request.getCachePolicy() == null ? expirationTime : getCurrentTimeInUtc().plus(request.getCachePolicy().getTimeToLive(),
529                                                                                                                      TimeUnit.MILLISECONDS);
530                    // Convert the locations of the children (relative to the source) to be relative to this node
531                    Contribution contribution = Contribution.create(source,
532                                                                    request.getActualLocationOfNode(),
533                                                                    expTime,
534                                                                    request.getProperties(),
535                                                                    request.getChildren());
536                    contributions.add(contribution);
537                }
538            }
539    
540            // Otherwise, we can do it by path and projections ...
541            Path path = location.getPath();
542            for (Projection projection : this.sourceProjections) {
543                final String source = projection.getSourceName();
544                if (sourceNames != null && !sourceNames.contains(source)) continue;
545                final RepositoryConnection sourceConnection = getConnection(projection);
546                if (sourceConnection == null) continue; // No source exists by this name
547                // Get the cached information ...
548                CachePolicy cachePolicy = sourceConnection.getDefaultCachePolicy();
549                if (cachePolicy == null) cachePolicy = this.defaultCachePolicy;
550                DateTime expirationTime = null;
551                if (cachePolicy != null) {
552                    expirationTime = getCurrentTimeInUtc().plus(cachePolicy.getTimeToLive(), TimeUnit.MILLISECONDS);
553                }
554                // Get the paths-in-source where we should fetch node contributions ...
555                Set<Path> pathsInSource = projection.getPathsInSource(path, pathFactory);
556                if (pathsInSource.isEmpty()) {
557                    // The source has no contributions, but see whether the project exists BELOW this path.
558                    // We do this by getting the top-level repository paths of the projection, and then
559                    // use those to figure out the children of the nodes.
560                    Contribution contribution = null;
561                    List<Path> topLevelPaths = projection.getTopLevelPathsInRepository(pathFactory);
562                    Location input = new Location(path);
563                    switch (topLevelPaths.size()) {
564                        case 0:
565                            break;
566                        case 1: {
567                            Path topLevelPath = topLevelPaths.iterator().next();
568                            if (path.isAncestorOf(topLevelPath)) {
569                                Location child = new Location(topLevelPath);
570                                contribution = Contribution.createPlaceholder(source, input, expirationTime, child);
571                            }
572                            break;
573                        }
574                        default: {
575                            // We assume that the top-level paths do not overlap ...
576                            List<Location> children = new ArrayList<Location>(topLevelPaths.size());
577                            for (Path topLevelPath : topLevelPaths) {
578                                if (path.isAncestorOf(topLevelPath)) {
579                                    children.add(new Location(topLevelPath));
580                                }
581                            }
582                            if (children.size() > 0) {
583                                contribution = Contribution.createPlaceholder(source, input, expirationTime, children);
584                            }
585                        }
586                    }
587                    if (contribution == null) contribution = Contribution.create(source, expirationTime);
588                    contributions.add(contribution);
589                } else {
590                    // There is at least one (real) contribution ...
591    
592                    // Get the contributions ...
593                    final int numPaths = pathsInSource.size();
594                    if (numPaths == 1) {
595                        Path pathInSource = pathsInSource.iterator().next();
596                        ReadNodeRequest fromSource = new ReadNodeRequest(new Location(pathInSource));
597                        sourceConnection.execute(getExecutionContext(), fromSource);
598                        if (!fromSource.hasError()) {
599                            Collection<Property> properties = fromSource.getProperties();
600                            List<Location> children = fromSource.getChildren();
601                            DateTime expTime = fromSource.getCachePolicy() == null ? expirationTime : getCurrentTimeInUtc().plus(fromSource.getCachePolicy().getTimeToLive(),
602                                                                                                                                 TimeUnit.MILLISECONDS);
603                            Location actualLocation = fromSource.getActualLocationOfNode();
604                            Contribution contribution = Contribution.create(source, actualLocation, expTime, properties, children);
605                            contributions.add(contribution);
606                        }
607                    } else {
608                        List<ReadNodeRequest> fromSourceCommands = new ArrayList<ReadNodeRequest>(numPaths);
609                        for (Path pathInSource : pathsInSource) {
610                            fromSourceCommands.add(new ReadNodeRequest(new Location(pathInSource)));
611                        }
612                        Request request = CompositeRequest.with(fromSourceCommands);
613                        sourceConnection.execute(context, request);
614                        for (ReadNodeRequest fromSource : fromSourceCommands) {
615                            if (fromSource.hasError()) continue;
616                            DateTime expTime = fromSource.getCachePolicy() == null ? expirationTime : getCurrentTimeInUtc().plus(fromSource.getCachePolicy().getTimeToLive(),
617                                                                                                                                 TimeUnit.MILLISECONDS);
618                            List<Location> children = fromSource.getChildren();
619                            Contribution contribution = Contribution.create(source,
620                                                                            fromSource.getActualLocationOfNode(),
621                                                                            expTime,
622                                                                            fromSource.getProperties(),
623                                                                            children);
624                            contributions.add(contribution);
625                        }
626                    }
627                }
628            }
629        }
630    
631        protected MergePlan getMergePlan( ReadNodeRequest request ) {
632            Property mergePlanProperty = request.getPropertiesByName().get(DnaLexicon.MERGE_PLAN);
633            if (mergePlanProperty == null || mergePlanProperty.isEmpty()) {
634                return null;
635            }
636            Object value = mergePlanProperty.getValues().next();
637            return value instanceof MergePlan ? (MergePlan)value : null;
638        }
639    
640        protected void updateCache( FederatedNode mergedNode ) throws RepositorySourceException {
641            final ExecutionContext context = getExecutionContext();
642            final RepositoryConnection cacheConnection = getConnectionToCache();
643            final Location path = mergedNode.at();
644    
645            List<Request> requests = new ArrayList<Request>();
646            requests.add(new CreateNodeRequest(path, mergedNode.getProperties()));
647            for (Location child : mergedNode.getChildren()) {
648                requests.add(new CreateNodeRequest(child));
649            }
650            cacheConnection.execute(context, CompositeRequest.with(requests));
651        }
652    }