package org.apache.flink.runtime.minicluster;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.BlobClient;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.client.ClientUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherId;
import org.apache.flink.runtime.dispatcher.MemoryArchivedExecutionGraphStore;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent;
import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.ReporterSetup;
import org.apache.flink.runtime.metrics.groups.ProcessMetricGroup;
import org.apache.flink.runtime.metrics.util.MetricUtils;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.StandaloneResourceManagerFactory;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster.class */
public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
    private static final Logger LOG = LoggerFactory.getLogger(MiniCluster.class);
    private final MiniClusterConfiguration miniClusterConfiguration;
    private final Time rpcTimeout;

    @GuardedBy("lock")
    private final List<TaskExecutor> taskManagers;

    @GuardedBy("lock")
    private MetricRegistryImpl metricRegistry;

    @GuardedBy("lock")
    private ProcessMetricGroup processMetricGroup;

    @GuardedBy("lock")
    private RpcService commonRpcService;

    @GuardedBy("lock")
    private ExecutorService ioExecutor;

    @GuardedBy("lock")
    private final Collection<RpcService> rpcServices;

    @GuardedBy("lock")
    private HighAvailabilityServices haServices;

    @GuardedBy("lock")
    private BlobServer blobServer;

    @GuardedBy("lock")
    private HeartbeatServices heartbeatServices;

    @GuardedBy("lock")
    private BlobCacheService blobCacheService;

    @GuardedBy("lock")
    private LeaderRetrievalService resourceManagerLeaderRetriever;

    @GuardedBy("lock")
    private LeaderRetrievalService dispatcherLeaderRetriever;

    @GuardedBy("lock")
    private LeaderRetrievalService clusterRestEndpointLeaderRetrievalService;

    @GuardedBy("lock")
    private RpcGatewayRetriever<DispatcherId, DispatcherGateway> dispatcherGatewayRetriever;

    @GuardedBy("lock")
    private RpcGatewayRetriever<ResourceManagerId, ResourceManagerGateway> resourceManagerGatewayRetriever;

    @GuardedBy("lock")
    private LeaderRetriever webMonitorLeaderRetriever;

    @GuardedBy("lock")
    private RpcServiceFactory taskManagerRpcServiceFactory;
    private final Object lock = new Object();
    private final TerminatingFatalErrorHandlerFactory taskManagerTerminatingFatalErrorHandlerFactory = new TerminatingFatalErrorHandlerFactory();

    @GuardedBy("lock")
    private Collection<DispatcherResourceManagerComponent> dispatcherResourceManagerComponents = new ArrayList(1);
    private CompletableFuture<Void> terminationFuture = CompletableFuture.completedFuture(null);
    private volatile boolean running = false;

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$CommonRpcServiceFactory.class */
    protected static class CommonRpcServiceFactory implements RpcServiceFactory {
        private final RpcService commonRpcService;

        CommonRpcServiceFactory(RpcService rpcService) {
            this.commonRpcService = rpcService;
        }

        @Override // org.apache.flink.runtime.minicluster.MiniCluster.RpcServiceFactory
        public RpcService createRpcService() {
            return this.commonRpcService;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$DedicatedRpcServiceFactory.class */
    protected class DedicatedRpcServiceFactory implements RpcServiceFactory {
        private final AkkaRpcServiceConfiguration akkaRpcServiceConfig;
        private final String jobManagerBindAddress;

        DedicatedRpcServiceFactory(AkkaRpcServiceConfiguration akkaRpcServiceConfiguration, String str) {
            this.akkaRpcServiceConfig = akkaRpcServiceConfiguration;
            this.jobManagerBindAddress = str;
        }

        @Override // org.apache.flink.runtime.minicluster.MiniCluster.RpcServiceFactory
        public RpcService createRpcService() {
            RpcService createRpcService = MiniCluster.this.createRpcService(this.akkaRpcServiceConfig, true, this.jobManagerBindAddress);
            synchronized (MiniCluster.this.lock) {
                MiniCluster.this.rpcServices.add(createRpcService);
            }
            return createRpcService;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$RpcServiceFactory.class */
    public interface RpcServiceFactory {
        RpcService createRpcService();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$ShutDownFatalErrorHandler.class */
    public class ShutDownFatalErrorHandler implements FatalErrorHandler {
        private ShutDownFatalErrorHandler() {
        }

        @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
        public void onFatalError(Throwable th) {
            MiniCluster.LOG.warn("Error in MiniCluster. Shutting the MiniCluster down.", th);
            MiniCluster.this.closeAsync();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$TerminatingFatalErrorHandler.class */
    public class TerminatingFatalErrorHandler implements FatalErrorHandler {
        private final int index;

        private TerminatingFatalErrorHandler(int i) {
            this.index = i;
        }

        @Override // org.apache.flink.runtime.rpc.FatalErrorHandler
        public void onFatalError(Throwable th) {
            if (MiniCluster.this.running) {
                MiniCluster.LOG.error("TaskManager #{} failed.", Integer.valueOf(this.index), th);
                synchronized (MiniCluster.this.lock) {
                    ((TaskExecutor) MiniCluster.this.taskManagers.get(this.index)).closeAsync();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/minicluster/MiniCluster$TerminatingFatalErrorHandlerFactory.class */
    public class TerminatingFatalErrorHandlerFactory {
        private TerminatingFatalErrorHandlerFactory() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        @GuardedBy("lock")
        public TerminatingFatalErrorHandler create(int i) {
            return new TerminatingFatalErrorHandler(i);
        }
    }

    public MiniCluster(MiniClusterConfiguration miniClusterConfiguration) {
        this.miniClusterConfiguration = (MiniClusterConfiguration) Preconditions.checkNotNull(miniClusterConfiguration, "config may not be null");
        this.rpcServices = new ArrayList(3 + miniClusterConfiguration.getNumTaskManagers());
        this.rpcTimeout = miniClusterConfiguration.getRpcTimeout();
        this.taskManagers = new ArrayList(miniClusterConfiguration.getNumTaskManagers());
    }

    public CompletableFuture<URI> getRestAddress() {
        CompletableFuture thenApply;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running.");
            thenApply = this.webMonitorLeaderRetriever.getLeaderFuture().thenApply(FunctionUtils.uncheckedFunction(tuple2 -> {
                return new URI((String) tuple2.f0);
            }));
        }
        return thenApply;
    }

    public ClusterInformation getClusterInformation() {
        ClusterInformation clusterInformation;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running.");
            clusterInformation = new ClusterInformation("localhost", this.blobServer.getPort());
        }
        return clusterInformation;
    }

    protected Executor getIOExecutor() {
        return this.ioExecutor;
    }

    public boolean isRunning() {
        return this.running;
    }

    public void start() throws Exception {
        RpcServiceFactory dedicatedRpcServiceFactory;
        synchronized (this.lock) {
            Preconditions.checkState(!this.running, "MiniCluster is already running");
            LOG.info("Starting Flink Mini Cluster");
            LOG.debug("Using configuration {}", this.miniClusterConfiguration);
            UnmodifiableConfiguration configuration = this.miniClusterConfiguration.getConfiguration();
            boolean z = this.miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;
            try {
                initializeIOFormatClasses(configuration);
                LOG.info("Starting Metrics Registry");
                this.metricRegistry = createMetricRegistry(configuration);
                LOG.info("Starting RPC Service(s)");
                AkkaRpcServiceConfiguration fromConfiguration = AkkaRpcServiceConfiguration.fromConfiguration(configuration);
                if (z) {
                    this.commonRpcService = createRpcService(fromConfiguration, false, null);
                    CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(this.commonRpcService);
                    this.taskManagerRpcServiceFactory = commonRpcServiceFactory;
                    dedicatedRpcServiceFactory = commonRpcServiceFactory;
                } else {
                    this.commonRpcService = createRpcService(fromConfiguration, true, null);
                    String jobManagerBindAddress = this.miniClusterConfiguration.getJobManagerBindAddress();
                    String taskManagerBindAddress = this.miniClusterConfiguration.getTaskManagerBindAddress();
                    dedicatedRpcServiceFactory = new DedicatedRpcServiceFactory(fromConfiguration, jobManagerBindAddress);
                    this.taskManagerRpcServiceFactory = new DedicatedRpcServiceFactory(fromConfiguration, taskManagerBindAddress);
                }
                this.metricRegistry.startQueryService(MetricUtils.startMetricsRpcService(configuration, this.commonRpcService.getAddress()), null);
                this.processMetricGroup = MetricUtils.instantiateProcessMetricGroup(this.metricRegistry, RpcUtils.getHostname(this.commonRpcService), ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
                this.ioExecutor = Executors.newFixedThreadPool(Hardware.getNumberCPUCores(), new ExecutorThreadFactory("mini-cluster-io"));
                this.haServices = createHighAvailabilityServices(configuration, this.ioExecutor);
                this.blobServer = new BlobServer(configuration, this.haServices.createBlobStore());
                this.blobServer.start();
                this.heartbeatServices = HeartbeatServices.fromConfiguration(configuration);
                this.blobCacheService = new BlobCacheService(configuration, this.haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), this.blobServer.getPort()));
                startTaskManagers();
                setupDispatcherResourceManagerComponents(configuration, dedicatedRpcServiceFactory, new RpcMetricQueryServiceRetriever(this.metricRegistry.getMetricQueryServiceRpcService()));
                this.resourceManagerLeaderRetriever = this.haServices.getResourceManagerLeaderRetriever();
                this.dispatcherLeaderRetriever = this.haServices.getDispatcherLeaderRetriever();
                this.clusterRestEndpointLeaderRetrievalService = this.haServices.getClusterRestEndpointLeaderRetriever();
                this.dispatcherGatewayRetriever = new RpcGatewayRetriever<>(this.commonRpcService, DispatcherGateway.class, DispatcherId::fromUuid, 20, Time.milliseconds(20L));
                this.resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(this.commonRpcService, ResourceManagerGateway.class, ResourceManagerId::fromUuid, 20, Time.milliseconds(20L));
                this.webMonitorLeaderRetriever = new LeaderRetriever();
                this.resourceManagerLeaderRetriever.start(this.resourceManagerGatewayRetriever);
                this.dispatcherLeaderRetriever.start(this.dispatcherGatewayRetriever);
                this.clusterRestEndpointLeaderRetrievalService.start(this.webMonitorLeaderRetriever);
                this.terminationFuture = new CompletableFuture<>();
                this.running = true;
                LOG.info("Flink Mini Cluster started successfully");
            } catch (Exception e) {
                try {
                    close();
                } catch (Exception e2) {
                    e.addSuppressed(e2);
                }
                throw e;
            }
        }
    }

    @GuardedBy("lock")
    private void setupDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory rpcServiceFactory, MetricQueryServiceRetriever metricQueryServiceRetriever) throws Exception {
        this.dispatcherResourceManagerComponents.addAll(createDispatcherResourceManagerComponents(configuration, rpcServiceFactory, this.haServices, this.blobServer, this.heartbeatServices, this.metricRegistry, metricQueryServiceRetriever, new ShutDownFatalErrorHandler()));
        ArrayList arrayList = new ArrayList(this.dispatcherResourceManagerComponents.size());
        for (DispatcherResourceManagerComponent dispatcherResourceManagerComponent : this.dispatcherResourceManagerComponents) {
            CompletableFuture<ApplicationStatus> shutDownFuture = dispatcherResourceManagerComponent.getShutDownFuture();
            dispatcherResourceManagerComponent.getClass();
            FutureUtils.assertNoException(shutDownFuture.thenRun(dispatcherResourceManagerComponent::closeAsync));
            arrayList.add(shutDownFuture);
        }
        FutureUtils.assertNoException(FutureUtils.completeAll(arrayList).thenRun(this::closeAsync));
    }

    @VisibleForTesting
    protected Collection<? extends DispatcherResourceManagerComponent> createDispatcherResourceManagerComponents(Configuration configuration, RpcServiceFactory rpcServiceFactory, HighAvailabilityServices highAvailabilityServices, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, MetricQueryServiceRetriever metricQueryServiceRetriever, FatalErrorHandler fatalErrorHandler) throws Exception {
        return Collections.singleton(createDispatcherResourceManagerComponentFactory().create(configuration, this.ioExecutor, rpcServiceFactory.createRpcService(), highAvailabilityServices, blobServer, heartbeatServices, metricRegistry, new MemoryArchivedExecutionGraphStore(), metricQueryServiceRetriever, fatalErrorHandler));
    }

    @Nonnull
    private DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory() {
        return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(StandaloneResourceManagerFactory.INSTANCE);
    }

    @VisibleForTesting
    protected HighAvailabilityServices createHighAvailabilityServices(Configuration configuration, Executor executor) throws Exception {
        LOG.info("Starting high-availability services");
        return HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(configuration, executor);
    }

    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> completableFuture;
        synchronized (this.lock) {
            if (this.running) {
                LOG.info("Shutting down Flink Mini Cluster");
                try {
                    long j = this.miniClusterConfiguration.getConfiguration().getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
                    ArrayList arrayList = new ArrayList(2 + this.miniClusterConfiguration.getNumTaskManagers());
                    arrayList.addAll(terminateTaskExecutors());
                    arrayList.add(shutDownResourceManagerComponents());
                    FutureUtils.composeAfterwards(FutureUtils.runAfterwards(FutureUtils.composeAfterwards(FutureUtils.composeAfterwards(FutureUtils.completeAll(arrayList), this::closeMetricSystem), this::terminateRpcServices), this::terminateMiniClusterServices), () -> {
                        return terminateExecutors(j);
                    }).whenComplete((r4, th) -> {
                        if (th != null) {
                            this.terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException(th));
                        } else {
                            this.terminationFuture.complete(null);
                        }
                    });
                    this.running = false;
                } catch (Throwable th2) {
                    this.running = false;
                    throw th2;
                }
            }
            completableFuture = this.terminationFuture;
        }
        return completableFuture;
    }

    private CompletableFuture<Void> closeMetricSystem() {
        FutureUtils.ConjunctFuture<Void> completeAll;
        synchronized (this.lock) {
            ArrayList arrayList = new ArrayList(2);
            if (this.processMetricGroup != null) {
                this.processMetricGroup.close();
                this.processMetricGroup = null;
            }
            if (this.metricRegistry != null) {
                arrayList.add(this.metricRegistry.shutdown());
                this.metricRegistry = null;
            }
            completeAll = FutureUtils.completeAll(arrayList);
        }
        return completeAll;
    }

    @GuardedBy("lock")
    private void startTaskManagers() throws Exception {
        int numTaskManagers = this.miniClusterConfiguration.getNumTaskManagers();
        LOG.info("Starting {} TaskManger(s)", Integer.valueOf(numTaskManagers));
        for (int i = 0; i < numTaskManagers; i++) {
            startTaskExecutor();
        }
    }

    @VisibleForTesting
    void startTaskExecutor() throws Exception {
        synchronized (this.lock) {
            TaskExecutor startTaskManager = TaskManagerRunner.startTaskManager(this.miniClusterConfiguration.getConfiguration(), new ResourceID(UUID.randomUUID().toString()), this.taskManagerRpcServiceFactory.createRpcService(), this.haServices, this.heartbeatServices, this.metricRegistry, this.blobCacheService, useLocalCommunication(), this.taskManagerTerminatingFatalErrorHandlerFactory.create(this.taskManagers.size()));
            startTaskManager.start();
            this.taskManagers.add(startTaskManager);
        }
    }

    @VisibleForTesting
    protected boolean useLocalCommunication() {
        return this.miniClusterConfiguration.getNumTaskManagers() == 1;
    }

    @GuardedBy("lock")
    private Collection<? extends CompletableFuture<Void>> terminateTaskExecutors() {
        ArrayList arrayList = new ArrayList(this.taskManagers.size());
        for (int i = 0; i < this.taskManagers.size(); i++) {
            arrayList.add(terminateTaskExecutor(i));
        }
        return arrayList;
    }

    @VisibleForTesting
    @Nonnull
    protected CompletableFuture<Void> terminateTaskExecutor(int i) {
        CompletableFuture<Void> closeAsync;
        synchronized (this.lock) {
            closeAsync = this.taskManagers.get(i).closeAsync();
        }
        return closeAsync;
    }

    public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestMultipleJobDetails(this.rpcTimeout).thenApply(multipleJobsDetails -> {
                return (List) multipleJobsDetails.getJobs().stream().map(jobDetails -> {
                    return new JobStatusMessage(jobDetails.getJobId(), jobDetails.getJobName(), jobDetails.getStatus(), jobDetails.getStartTime());
                }).collect(Collectors.toList());
            });
        });
    }

    public CompletableFuture<JobStatus> getJobStatus(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestJobStatus(jobID, this.rpcTimeout);
        });
    }

    public CompletableFuture<Acknowledge> cancelJob(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.cancelJob(jobID, this.rpcTimeout);
        });
    }

    public CompletableFuture<String> triggerSavepoint(JobID jobID, String str, boolean z) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.triggerSavepoint(jobID, str, z, this.rpcTimeout);
        });
    }

    public CompletableFuture<String> stopWithSavepoint(JobID jobID, String str, boolean z) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.stopWithSavepoint(jobID, str, z, this.rpcTimeout);
        });
    }

    public CompletableFuture<Acknowledge> disposeSavepoint(String str) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.disposeSavepoint(str, this.rpcTimeout);
        });
    }

    public CompletableFuture<? extends AccessExecutionGraph> getExecutionGraph(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestJob(jobID, this.rpcTimeout);
        });
    }

    private <T> CompletableFuture<T> runDispatcherCommand(Function<DispatcherGateway, CompletableFuture<T>> function) {
        return getDispatcherGatewayFuture().thenApply((Function<? super DispatcherGateway, ? extends U>) function).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    public void runDetached(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(jobGraph, "job is null");
        try {
            submitJob(jobGraph).get();
        } catch (ExecutionException e) {
            throw new JobExecutionException(jobGraph.getJobID(), ExceptionUtils.stripExecutionException(e));
        }
    }

    @Override // org.apache.flink.runtime.minicluster.JobExecutor
    public JobExecutionResult executeJobBlocking(JobGraph jobGraph) throws JobExecutionException, InterruptedException {
        Preconditions.checkNotNull(jobGraph, "job is null");
        try {
            try {
                return ((JobResult) submitJob(jobGraph).thenCompose(jobSubmissionResult -> {
                    return requestJobResult(jobGraph.getJobID());
                }).get()).toJobExecutionResult(Thread.currentThread().getContextClassLoader());
            } catch (IOException | ClassNotFoundException e) {
                throw new JobExecutionException(jobGraph.getJobID(), e);
            }
        } catch (ExecutionException e2) {
            throw new JobExecutionException(jobGraph.getJobID(), "Could not retrieve JobResult.", ExceptionUtils.stripExecutionException(e2));
        }
    }

    public CompletableFuture<JobSubmissionResult> submitJob(JobGraph jobGraph) {
        CompletableFuture<DispatcherGateway> dispatcherGatewayFuture = getDispatcherGatewayFuture();
        return uploadAndSetJobFiles(createBlobServerAddress(dispatcherGatewayFuture), jobGraph).thenCombine((CompletionStage) dispatcherGatewayFuture, (r6, dispatcherGateway) -> {
            return dispatcherGateway.submitJob(jobGraph, this.rpcTimeout);
        }).thenCompose((Function<? super V, ? extends CompletionStage<U>>) Function.identity()).thenApply(acknowledge -> {
            return new JobSubmissionResult(jobGraph.getJobID());
        });
    }

    public CompletableFuture<JobResult> requestJobResult(JobID jobID) {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestJobResult(jobID, RpcUtils.INF_TIMEOUT);
        });
    }

    public CompletableFuture<ClusterOverview> requestClusterOverview() {
        return runDispatcherCommand(dispatcherGateway -> {
            return dispatcherGateway.requestClusterOverview(RpcUtils.INF_TIMEOUT);
        });
    }

    @VisibleForTesting
    protected CompletableFuture<DispatcherGateway> getDispatcherGatewayFuture() {
        CompletableFuture future;
        synchronized (this.lock) {
            Preconditions.checkState(this.running, "MiniCluster is not yet running.");
            future = this.dispatcherGatewayRetriever.getFuture();
        }
        return future;
    }

    private CompletableFuture<Void> uploadAndSetJobFiles(CompletableFuture<InetSocketAddress> completableFuture, JobGraph jobGraph) {
        return completableFuture.thenAccept(inetSocketAddress -> {
            try {
                ClientUtils.extractAndUploadJobGraphFiles(jobGraph, () -> {
                    return new BlobClient(inetSocketAddress, this.miniClusterConfiguration.getConfiguration());
                });
            } catch (FlinkException e) {
                throw new CompletionException((Throwable) e);
            }
        });
    }

    private CompletableFuture<InetSocketAddress> createBlobServerAddress(CompletableFuture<DispatcherGateway> completableFuture) {
        return completableFuture.thenApply(dispatcherGateway -> {
            return dispatcherGateway.getBlobServerPort(this.rpcTimeout).thenApply(num -> {
                return new InetSocketAddress(dispatcherGateway.getHostname(), num.intValue());
            });
        }).thenCompose((Function<? super U, ? extends CompletionStage<U>>) Function.identity());
    }

    protected MetricRegistryImpl createMetricRegistry(Configuration configuration) {
        return new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(configuration), ReporterSetup.fromConfiguration(configuration));
    }

    protected RpcService createRpcService(AkkaRpcServiceConfiguration akkaRpcServiceConfiguration, boolean z, String str) {
        return new AkkaRpcService(AkkaUtils.createActorSystem(AkkaUtils.testDispatcherConfig().withFallback(z ? AkkaUtils.getAkkaConfig(akkaRpcServiceConfiguration.getConfiguration(), str, 0) : AkkaUtils.getAkkaConfig(akkaRpcServiceConfiguration.getConfiguration()))), akkaRpcServiceConfiguration);
    }

    @GuardedBy("lock")
    private CompletableFuture<Void> shutDownResourceManagerComponents() {
        ArrayList arrayList = new ArrayList(this.dispatcherResourceManagerComponents.size());
        Iterator<DispatcherResourceManagerComponent> it = this.dispatcherResourceManagerComponents.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().closeAsync());
        }
        return FutureUtils.runAfterwards(FutureUtils.completeAll(arrayList), () -> {
            Exception exc = null;
            synchronized (this.lock) {
                if (this.resourceManagerLeaderRetriever != null) {
                    try {
                        this.resourceManagerLeaderRetriever.stop();
                    } catch (Exception e) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                    }
                    this.resourceManagerLeaderRetriever = null;
                }
                if (this.dispatcherLeaderRetriever != null) {
                    try {
                        this.dispatcherLeaderRetriever.stop();
                    } catch (Exception e2) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                    }
                    this.dispatcherLeaderRetriever = null;
                }
                if (this.clusterRestEndpointLeaderRetrievalService != null) {
                    try {
                        this.clusterRestEndpointLeaderRetrievalService.stop();
                    } catch (Exception e3) {
                        exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
                    }
                    this.clusterRestEndpointLeaderRetrievalService = null;
                }
            }
            if (exc != null) {
                throw exc;
            }
        });
    }

    private void terminateMiniClusterServices() throws Exception {
        Exception exc = null;
        synchronized (this.lock) {
            if (this.blobCacheService != null) {
                try {
                    this.blobCacheService.close();
                } catch (Exception e) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e, (Throwable) null);
                }
                this.blobCacheService = null;
            }
            if (this.blobServer != null) {
                try {
                    this.blobServer.close();
                } catch (Exception e2) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e2, exc);
                }
                this.blobServer = null;
            }
            if (this.haServices != null) {
                try {
                    this.haServices.closeAndCleanupAllData();
                } catch (Exception e3) {
                    exc = (Exception) ExceptionUtils.firstOrSuppressed(e3, exc);
                }
                this.haServices = null;
            }
            if (exc != null) {
                throw exc;
            }
        }
    }

    @Nonnull
    private CompletableFuture<Void> terminateRpcServices() {
        FutureUtils.ConjunctFuture<Void> completeAll;
        synchronized (this.lock) {
            ArrayList arrayList = new ArrayList(1 + this.rpcServices.size());
            arrayList.add(this.commonRpcService.stopService());
            Iterator<RpcService> it = this.rpcServices.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().stopService());
            }
            this.commonRpcService = null;
            this.rpcServices.clear();
            completeAll = FutureUtils.completeAll(arrayList);
        }
        return completeAll;
    }

    private CompletableFuture<Void> terminateExecutors(long j) {
        synchronized (this.lock) {
            if (this.ioExecutor != null) {
                return ExecutorUtils.nonBlockingShutdown(j, TimeUnit.MILLISECONDS, new ExecutorService[]{this.ioExecutor});
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    private void initializeIOFormatClasses(Configuration configuration) {
        FileOutputFormat.initDefaultsFromConfiguration(configuration);
    }
}
