package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.class */
public class SingleInputGate extends InputGate {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
    private final String owningTaskName;
    private final IntermediateDataSetID consumedResultId;
    private final ResultPartitionType consumedPartitionType;
    private final int consumedSubpartitionIndex;
    private final int numberOfInputChannels;
    private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
    private final BitSet enqueuedInputChannelsWithData;
    private final BitSet channelsWithEndOfPartitionEvents;
    private final PartitionProducerStateProvider partitionProducerStateProvider;
    private BufferPool bufferPool;
    private boolean hasReceivedAllEndOfPartitionEvents;
    private boolean requestedPartitionsFlag;
    private int numberOfUninitializedChannels;
    private Timer retriggerLocalRequestTimer;
    private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
    private final CompletableFuture<Void> closeFuture;

    @Nullable
    private final BufferDecompressor bufferDecompressor;
    private final Object requestLock = new Object();
    private final ArrayDeque<InputChannel> inputChannelsWithData = new ArrayDeque<>();
    private final List<TaskEvent> pendingEvents = new ArrayList();

    public SingleInputGate(String str, IntermediateDataSetID intermediateDataSetID, ResultPartitionType resultPartitionType, int i, int i2, PartitionProducerStateProvider partitionProducerStateProvider, SupplierWithException<BufferPool, IOException> supplierWithException, @Nullable BufferDecompressor bufferDecompressor) {
        this.owningTaskName = (String) Preconditions.checkNotNull(str);
        this.consumedResultId = (IntermediateDataSetID) Preconditions.checkNotNull(intermediateDataSetID);
        this.consumedPartitionType = (ResultPartitionType) Preconditions.checkNotNull(resultPartitionType);
        this.bufferPoolFactory = (SupplierWithException) Preconditions.checkNotNull(supplierWithException);
        Preconditions.checkArgument(i >= 0);
        this.consumedSubpartitionIndex = i;
        Preconditions.checkArgument(i2 > 0);
        this.numberOfInputChannels = i2;
        this.inputChannels = new HashMap(i2);
        this.channelsWithEndOfPartitionEvents = new BitSet(i2);
        this.enqueuedInputChannelsWithData = new BitSet(i2);
        this.partitionProducerStateProvider = (PartitionProducerStateProvider) Preconditions.checkNotNull(partitionProducerStateProvider);
        this.bufferDecompressor = bufferDecompressor;
        this.closeFuture = new CompletableFuture<>();
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void setup() throws IOException, InterruptedException {
        Preconditions.checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
        assignExclusiveSegments();
        setBufferPool((BufferPool) this.bufferPoolFactory.get());
        requestPartitions();
    }

    @VisibleForTesting
    void requestPartitions() throws IOException, InterruptedException {
        synchronized (this.requestLock) {
            if (!this.requestedPartitionsFlag) {
                if (this.closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");
                }
                if (this.numberOfInputChannels != this.inputChannels.size()) {
                    throw new IllegalStateException(String.format("Bug in input gate setup logic: mismatch between number of total input channels [%s] and the currently set number of input channels [%s].", Integer.valueOf(this.inputChannels.size()), Integer.valueOf(this.numberOfInputChannels)));
                }
                Iterator<InputChannel> it = this.inputChannels.values().iterator();
                while (it.hasNext()) {
                    it.next().requestSubpartition(this.consumedSubpartitionIndex);
                }
            }
            this.requestedPartitionsFlag = true;
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    public IntermediateDataSetID getConsumedResultId() {
        return this.consumedResultId;
    }

    public ResultPartitionType getConsumedPartitionType() {
        return this.consumedPartitionType;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferProvider getBufferProvider() {
        return this.bufferPool;
    }

    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    public int getNumberOfQueuedBuffers() {
        for (int i = 0; i < 3; i++) {
            try {
                int i2 = 0;
                Iterator<InputChannel> it = this.inputChannels.values().iterator();
                while (it.hasNext()) {
                    i2 += it.next().unsynchronizedGetNumberOfQueuedBuffers();
                }
                return i2;
            } catch (Exception e) {
            }
        }
        return 0;
    }

    public CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    public void setBufferPool(BufferPool bufferPool) {
        Preconditions.checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool hasalready been set for this input gate.");
        this.bufferPool = (BufferPool) Preconditions.checkNotNull(bufferPool);
    }

    @VisibleForTesting
    public void assignExclusiveSegments() throws IOException {
        synchronized (this.requestLock) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                if (inputChannel instanceof RemoteInputChannel) {
                    ((RemoteInputChannel) inputChannel).assignExclusiveSegments();
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void setInputChannel(IntermediateResultPartitionID intermediateResultPartitionID, InputChannel inputChannel) {
        synchronized (this.requestLock) {
            if (this.inputChannels.put(Preconditions.checkNotNull(intermediateResultPartitionID), Preconditions.checkNotNull(inputChannel)) == null && (inputChannel instanceof UnknownInputChannel)) {
                this.numberOfUninitializedChannels++;
            }
        }
    }

    public void updateInputChannel(ResourceID resourceID, NettyShuffleDescriptor nettyShuffleDescriptor) throws IOException, InterruptedException {
        LocalInputChannel localInputChannel;
        synchronized (this.requestLock) {
            if (this.closeFuture.isDone()) {
                return;
            }
            IntermediateResultPartitionID partitionId = nettyShuffleDescriptor.getResultPartitionID().getPartitionId();
            InputChannel inputChannel = this.inputChannels.get(partitionId);
            if (inputChannel instanceof UnknownInputChannel) {
                UnknownInputChannel unknownInputChannel = (UnknownInputChannel) inputChannel;
                if (nettyShuffleDescriptor.isLocalTo(resourceID)) {
                    localInputChannel = unknownInputChannel.toLocalInputChannel();
                } else {
                    RemoteInputChannel remoteInputChannel = unknownInputChannel.toRemoteInputChannel(nettyShuffleDescriptor.getConnectionId());
                    remoteInputChannel.assignExclusiveSegments();
                    localInputChannel = remoteInputChannel;
                }
                LOG.debug("{}: Updated unknown input channel to {}.", this.owningTaskName, localInputChannel);
                this.inputChannels.put(partitionId, localInputChannel);
                if (this.requestedPartitionsFlag) {
                    localInputChannel.requestSubpartition(this.consumedSubpartitionIndex);
                }
                Iterator<TaskEvent> it = this.pendingEvents.iterator();
                while (it.hasNext()) {
                    localInputChannel.sendTaskEvent(it.next());
                }
                int i = this.numberOfUninitializedChannels - 1;
                this.numberOfUninitializedChannels = i;
                if (i == 0) {
                    this.pendingEvents.clear();
                }
            }
        }
    }

    public void retriggerPartitionRequest(IntermediateResultPartitionID intermediateResultPartitionID) throws IOException {
        synchronized (this.requestLock) {
            if (!this.closeFuture.isDone()) {
                InputChannel inputChannel = this.inputChannels.get(intermediateResultPartitionID);
                Preconditions.checkNotNull(inputChannel, "Unknown input channel with ID " + intermediateResultPartitionID);
                LOG.debug("{}: Retriggering partition request {}:{}.", new Object[]{this.owningTaskName, inputChannel.partitionId, Integer.valueOf(this.consumedSubpartitionIndex)});
                if (inputChannel.getClass() == RemoteInputChannel.class) {
                    ((RemoteInputChannel) inputChannel).retriggerSubpartitionRequest(this.consumedSubpartitionIndex);
                } else {
                    if (inputChannel.getClass() != LocalInputChannel.class) {
                        throw new IllegalStateException("Unexpected type of channel to retrigger partition: " + inputChannel.getClass());
                    }
                    LocalInputChannel localInputChannel = (LocalInputChannel) inputChannel;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    localInputChannel.retriggerSubpartitionRequest(this.retriggerLocalRequestTimer, this.consumedSubpartitionIndex);
                }
            }
        }
    }

    @VisibleForTesting
    Timer getRetriggerLocalRequestTimer() {
        return this.retriggerLocalRequestTimer;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws IOException {
        boolean z = false;
        synchronized (this.requestLock) {
            if (!this.closeFuture.isDone()) {
                try {
                    LOG.debug("{}: Releasing {}.", this.owningTaskName, this);
                    if (this.retriggerLocalRequestTimer != null) {
                        this.retriggerLocalRequestTimer.cancel();
                    }
                    Iterator<InputChannel> it = this.inputChannels.values().iterator();
                    while (it.hasNext()) {
                        try {
                            it.next().releaseAllResources();
                        } catch (IOException e) {
                            LOG.warn("{}: Error during release of channel resources: {}.", new Object[]{this.owningTaskName, e.getMessage(), e});
                        }
                    }
                    if (this.bufferPool != null) {
                        this.bufferPool.lazyDestroy();
                    }
                    z = true;
                    this.closeFuture.complete(null);
                } catch (Throwable th) {
                    this.closeFuture.complete(null);
                    throw th;
                }
            }
        }
        if (z) {
            synchronized (this.inputChannelsWithData) {
                this.inputChannelsWithData.notifyAll();
            }
        }
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate, org.apache.flink.runtime.io.PullingAsyncDataInput
    public boolean isFinished() {
        return this.hasReceivedAllEndOfPartitionEvents;
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
        return getNextBufferOrEvent(true);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate, org.apache.flink.runtime.io.PullingAsyncDataInput
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        return getNextBufferOrEvent(false);
    }

    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean z) throws IOException, InterruptedException {
        if (this.hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }
        if (this.closeFuture.isDone()) {
            throw new CancelTaskException("Input gate is already closed.");
        }
        Optional<InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability>> waitAndGetNextData = waitAndGetNextData(z);
        if (!waitAndGetNextData.isPresent()) {
            return Optional.empty();
        }
        InputGate.InputWithData<InputChannel, InputChannel.BufferAndAvailability> inputWithData = waitAndGetNextData.get();
        return Optional.of(transformToBufferOrEvent(inputWithData.data.buffer(), inputWithData.moreAvailable, inputWithData.input));
    }

    /* JADX WARN: Code restructure failed: missing block: B:20:0x006d, code lost:
    
        r2 = r0.get();
        r3 = r0.get();
     */
    /* JADX WARN: Code restructure failed: missing block: B:21:0x0080, code lost:
    
        if (r6.inputChannelsWithData.isEmpty() != false) goto L21;
     */
    /* JADX WARN: Code restructure failed: missing block: B:22:0x0083, code lost:
    
        r4 = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:23:0x0088, code lost:
    
        r0 = java.util.Optional.of(new org.apache.flink.runtime.io.network.partition.consumer.InputGate.InputWithData(r2, r3, r4));
     */
    /* JADX WARN: Code restructure failed: missing block: B:25:0x0091, code lost:
    
        return r0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x0087, code lost:
    
        r4 = false;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private java.util.Optional<org.apache.flink.runtime.io.network.partition.consumer.InputGate.InputWithData<org.apache.flink.runtime.io.network.partition.consumer.InputChannel, org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability>> waitAndGetNextData(boolean r7) throws java.io.IOException, java.lang.InterruptedException {
        /*
            r6 = this;
        L0:
            r0 = r6
            r1 = r7
            java.util.Optional r0 = r0.getChannel(r1)
            r8 = r0
            r0 = r8
            boolean r0 = r0.isPresent()
            if (r0 != 0) goto L11
            java.util.Optional r0 = java.util.Optional.empty()
            return r0
        L11:
            r0 = r8
            java.lang.Object r0 = r0.get()
            org.apache.flink.runtime.io.network.partition.consumer.InputChannel r0 = (org.apache.flink.runtime.io.network.partition.consumer.InputChannel) r0
            java.util.Optional r0 = r0.getNextBuffer()
            r9 = r0
            r0 = r6
            java.util.ArrayDeque<org.apache.flink.runtime.io.network.partition.consumer.InputChannel> r0 = r0.inputChannelsWithData
            r1 = r0
            r10 = r1
            monitor-enter(r0)
            r0 = r9
            boolean r0 = r0.isPresent()     // Catch: java.lang.Throwable -> L98
            if (r0 == 0) goto L55
            r0 = r9
            java.lang.Object r0 = r0.get()     // Catch: java.lang.Throwable -> L98
            org.apache.flink.runtime.io.network.partition.consumer.InputChannel$BufferAndAvailability r0 = (org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability) r0     // Catch: java.lang.Throwable -> L98
            boolean r0 = r0.moreAvailable()     // Catch: java.lang.Throwable -> L98
            if (r0 == 0) goto L55
            r0 = r6
            java.util.ArrayDeque<org.apache.flink.runtime.io.network.partition.consumer.InputChannel> r0 = r0.inputChannelsWithData     // Catch: java.lang.Throwable -> L98
            r1 = r8
            java.lang.Object r1 = r1.get()     // Catch: java.lang.Throwable -> L98
            boolean r0 = r0.add(r1)     // Catch: java.lang.Throwable -> L98
            r0 = r6
            java.util.BitSet r0 = r0.enqueuedInputChannelsWithData     // Catch: java.lang.Throwable -> L98
            r1 = r8
            java.lang.Object r1 = r1.get()     // Catch: java.lang.Throwable -> L98
            org.apache.flink.runtime.io.network.partition.consumer.InputChannel r1 = (org.apache.flink.runtime.io.network.partition.consumer.InputChannel) r1     // Catch: java.lang.Throwable -> L98
            int r1 = r1.getChannelIndex()     // Catch: java.lang.Throwable -> L98
            r0.set(r1)     // Catch: java.lang.Throwable -> L98
        L55:
            r0 = r6
            java.util.ArrayDeque<org.apache.flink.runtime.io.network.partition.consumer.InputChannel> r0 = r0.inputChannelsWithData     // Catch: java.lang.Throwable -> L98
            boolean r0 = r0.isEmpty()     // Catch: java.lang.Throwable -> L98
            if (r0 == 0) goto L66
            r0 = r6
            org.apache.flink.runtime.io.AvailabilityProvider$AvailabilityHelper r0 = r0.availabilityHelper     // Catch: java.lang.Throwable -> L98
            r0.resetUnavailable()     // Catch: java.lang.Throwable -> L98
        L66:
            r0 = r9
            boolean r0 = r0.isPresent()     // Catch: java.lang.Throwable -> L98
            if (r0 == 0) goto L92
            org.apache.flink.runtime.io.network.partition.consumer.InputGate$InputWithData r0 = new org.apache.flink.runtime.io.network.partition.consumer.InputGate$InputWithData     // Catch: java.lang.Throwable -> L98
            r1 = r0
            r2 = r8
            java.lang.Object r2 = r2.get()     // Catch: java.lang.Throwable -> L98
            r3 = r9
            java.lang.Object r3 = r3.get()     // Catch: java.lang.Throwable -> L98
            r4 = r6
            java.util.ArrayDeque<org.apache.flink.runtime.io.network.partition.consumer.InputChannel> r4 = r4.inputChannelsWithData     // Catch: java.lang.Throwable -> L98
            boolean r4 = r4.isEmpty()     // Catch: java.lang.Throwable -> L98
            if (r4 != 0) goto L87
            r4 = 1
            goto L88
        L87:
            r4 = 0
        L88:
            r1.<init>(r2, r3, r4)     // Catch: java.lang.Throwable -> L98
            java.util.Optional r0 = java.util.Optional.of(r0)     // Catch: java.lang.Throwable -> L98
            r1 = r10
            monitor-exit(r1)     // Catch: java.lang.Throwable -> L98
            return r0
        L92:
            r0 = r10
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L98
            goto La0
        L98:
            r11 = move-exception
            r0 = r10
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L98
            r0 = r11
            throw r0
        La0:
            goto L0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.waitAndGetNextData(boolean):java.util.Optional");
    }

    private BufferOrEvent transformToBufferOrEvent(Buffer buffer, boolean z, InputChannel inputChannel) throws IOException, InterruptedException {
        return buffer.isBuffer() ? transformBuffer(buffer, z, inputChannel) : transformEvent(buffer, z, inputChannel);
    }

    private BufferOrEvent transformBuffer(Buffer buffer, boolean z, InputChannel inputChannel) {
        return new BufferOrEvent(decompressBufferIfNeeded(buffer), inputChannel.getChannelIndex(), z);
    }

    private BufferOrEvent transformEvent(Buffer buffer, boolean z, InputChannel inputChannel) throws IOException, InterruptedException {
        try {
            AbstractEvent fromBuffer = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
            buffer.recycleBuffer();
            if (fromBuffer.getClass() == EndOfPartitionEvent.class) {
                this.channelsWithEndOfPartitionEvents.set(inputChannel.getChannelIndex());
                if (this.channelsWithEndOfPartitionEvents.cardinality() == this.numberOfInputChannels) {
                    Preconditions.checkState((z && pollNext().isPresent()) ? false : true);
                    z = false;
                    this.hasReceivedAllEndOfPartitionEvents = true;
                    markAvailable();
                }
                inputChannel.releaseAllResources();
            }
            return new BufferOrEvent(fromBuffer, inputChannel.getChannelIndex(), z, buffer.getSize());
        } catch (Throwable th) {
            buffer.recycleBuffer();
            throw th;
        }
    }

    private Buffer decompressBufferIfNeeded(Buffer buffer) {
        if (!buffer.isCompressed()) {
            return buffer;
        }
        try {
            Preconditions.checkNotNull(this.bufferDecompressor, "Buffer decompressor not set.");
            return this.bufferDecompressor.decompressToIntermediateBuffer(buffer);
        } finally {
            buffer.recycleBuffer();
        }
    }

    private void markAvailable() {
        CompletableFuture<?> unavailableToResetAvailable;
        synchronized (this.inputChannelsWithData) {
            unavailableToResetAvailable = this.availabilityHelper.getUnavailableToResetAvailable();
        }
        unavailableToResetAvailable.complete(null);
    }

    @Override // org.apache.flink.runtime.io.network.partition.consumer.InputGate
    public void sendTaskEvent(TaskEvent taskEvent) throws IOException {
        synchronized (this.requestLock) {
            Iterator<InputChannel> it = this.inputChannels.values().iterator();
            while (it.hasNext()) {
                it.next().sendTaskEvent(taskEvent);
            }
            if (this.numberOfUninitializedChannels > 0) {
                this.pendingEvents.add(taskEvent);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void notifyChannelNonEmpty(InputChannel inputChannel) {
        queueChannel((InputChannel) Preconditions.checkNotNull(inputChannel));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void triggerPartitionStateCheck(ResultPartitionID resultPartitionID) {
        this.partitionProducerStateProvider.requestPartitionProducerState(this.consumedResultId, resultPartitionID, responseHandle -> {
            if (new RemoteChannelStateChecker(resultPartitionID, this.owningTaskName).isProducerReadyOrAbortConsumption(responseHandle)) {
                try {
                    retriggerPartitionRequest(resultPartitionID.getPartitionId());
                } catch (IOException e) {
                    responseHandle.failConsumption(e);
                }
            }
        });
    }

    private void queueChannel(InputChannel inputChannel) {
        CompletableFuture<?> completableFuture = null;
        synchronized (this.inputChannelsWithData) {
            if (this.enqueuedInputChannelsWithData.get(inputChannel.getChannelIndex())) {
                return;
            }
            int size = this.inputChannelsWithData.size();
            this.inputChannelsWithData.add(inputChannel);
            this.enqueuedInputChannelsWithData.set(inputChannel.getChannelIndex());
            if (size == 0) {
                this.inputChannelsWithData.notifyAll();
                completableFuture = this.availabilityHelper.getUnavailableToResetAvailable();
            }
            if (completableFuture != null) {
                completableFuture.complete(null);
            }
        }
    }

    private Optional<InputChannel> getChannel(boolean z) throws InterruptedException {
        synchronized (this.inputChannelsWithData) {
            while (this.inputChannelsWithData.size() == 0) {
                if (this.closeFuture.isDone()) {
                    throw new IllegalStateException("Released");
                }
                if (!z) {
                    this.availabilityHelper.resetUnavailable();
                    return Optional.empty();
                }
                this.inputChannelsWithData.wait();
            }
            InputChannel remove = this.inputChannelsWithData.remove();
            this.enqueuedInputChannelsWithData.clear(remove.getChannelIndex());
            return Optional.of(remove);
        }
    }

    public Map<IntermediateResultPartitionID, InputChannel> getInputChannels() {
        return this.inputChannels;
    }
}
