package org.jrdf.query.server.distributed;

import java.net.URI;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.jrdf.query.answer.Answer;
import org.jrdf.query.answer.StreamingAnswerSparqlParser;
import org.jrdf.query.answer.StreamingAnswerSparqlParserImpl;
import org.jrdf.query.answer.StreamingAnswerSparqlParserSelectAnswer;
import org.jrdf.query.client.CallableQueryClient;
import org.jrdf.query.client.CallableQueryClientImpl;
import org.jrdf.query.client.QueryClient;
import org.jrdf.query.client.SparqlAnswerHandler;
import org.jrdf.util.param.ParameterUtil;

/* loaded from: input_file:lib/jrdf-0.5.6.3.jar:org/jrdf/query/server/distributed/DistributedQueryClientImpl.class */
public class DistributedQueryClientImpl implements QueryClient {
    private ExecutorService executor;
    private Collection<CallableQueryClient> queryClients = new LinkedList();
    private Set<Future<Answer>> answers;
    private StreamingAnswerSparqlParser multiAnswerParser;

    public DistributedQueryClientImpl(Collection<URI> collection, SparqlAnswerHandler sparqlAnswerHandler) {
        Iterator<URI> it = collection.iterator();
        while (it.hasNext()) {
            this.queryClients.add(new CallableQueryClientImpl(it.next(), sparqlAnswerHandler));
        }
        this.executor = new ScheduledThreadPoolExecutor(collection.size());
        this.multiAnswerParser = new StreamingAnswerSparqlParserImpl(new Answer[0]);
    }

    @Override // org.jrdf.query.client.QueryClient
    public void setQuery(String str, Map<String, String> map) {
        Iterator<CallableQueryClient> it = this.queryClients.iterator();
        while (it.hasNext()) {
            it.next().setQuery(str, map);
        }
    }

    @Override // org.jrdf.query.client.QueryClient
    public Answer executeQuery() {
        ParameterUtil.checkNotNull(this.queryClients);
        this.answers = new HashSet();
        executeQuries();
        aggregateResults();
        return new StreamingAnswerSparqlParserSelectAnswer(this.multiAnswerParser);
    }

    @Override // org.jrdf.query.client.QueryClient
    public Answer executeQuery(String str, Map<String, String> map) {
        setQuery(str, map);
        return executeQuery();
    }

    public void cancelExecution() {
        Iterator<Future<Answer>> it = this.answers.iterator();
        while (it.hasNext()) {
            cancelExecution(it.next());
        }
    }

    private void aggregateResults() {
        long currentTimeMillis = System.currentTimeMillis();
        for (Future<Answer> future : this.answers) {
            try {
                this.multiAnswerParser.addAnswer(future.get());
            } catch (Exception e) {
                cancelExecution(future);
            }
        }
        System.out.println("Distributed querying time: " + (System.currentTimeMillis() - currentTimeMillis));
    }

    private void executeQuries() {
        Iterator<CallableQueryClient> it = this.queryClients.iterator();
        while (it.hasNext()) {
            this.answers.add(this.executor.submit(it.next()));
        }
    }

    private void cancelExecution(Future<Answer> future) {
        future.cancel(true);
    }
}
