package org.apache.crail.core;

import java.io.IOException;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.crail.CrailBuffer;
import org.apache.crail.conf.CrailConstants;
import org.apache.crail.metadata.BlockInfo;
import org.apache.crail.metadata.FileInfo;
import org.apache.crail.rpc.RpcConnection;
import org.apache.crail.rpc.RpcErrors;
import org.apache.crail.rpc.RpcFuture;
import org.apache.crail.rpc.RpcGetBlock;
import org.apache.crail.storage.StorageEndpoint;
import org.apache.crail.storage.StorageFuture;
import org.apache.crail.utils.BlockCache;
import org.apache.crail.utils.BufferCheckpoint;
import org.apache.crail.utils.CrailUtils;
import org.apache.crail.utils.EndpointCache;
import org.apache.crail.utils.NextBlockCache;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/crail/core/CoreStream.class */
public abstract class CoreStream {
    private static final Logger LOG = CrailUtils.getLogger();
    protected CoreDataStore fs;
    protected CoreNode node;
    private EndpointCache endpointCache;
    private RpcConnection namenodeClientRpc;
    private BlockCache.FileBlockCache blockCache;
    private NextBlockCache.FileNextBlockCache nextBlockCache;
    private BufferCheckpoint bufferCheckpoint;
    private FileInfo fileInfo;
    private long position;
    private long syncedCapacity;
    private long streamId;
    private CoreIOStatistics ioStats = new CoreIOStatistics("core");
    private HashMap<Integer, CoreSubOperation> blockMap = new HashMap<>();
    private LinkedList<RpcFuture<RpcGetBlock>> pendingBlocks = new LinkedList<>();

    abstract StorageFuture trigger(StorageEndpoint storageEndpoint, CoreSubOperation coreSubOperation, CrailBuffer crailBuffer, BlockInfo blockInfo) throws Exception;

    /* JADX INFO: Access modifiers changed from: package-private */
    public abstract void update(long j);

    /* JADX INFO: Access modifiers changed from: package-private */
    public CoreStream(CoreNode coreNode, long j, long j2) throws Exception {
        this.node = coreNode;
        this.fs = coreNode.getFileSystem();
        this.fileInfo = coreNode.getFileInfo();
        this.endpointCache = this.fs.getDatanodeEndpointCache();
        this.namenodeClientRpc = this.fs.getNamenodeClientRpc();
        this.blockCache = this.fs.getBlockCache(this.fileInfo.getFd());
        this.nextBlockCache = this.fs.getNextBlockCache(this.fileInfo.getFd());
        this.bufferCheckpoint = this.fs.getBufferCheckpoint();
        this.position = j2;
        this.syncedCapacity = this.fileInfo.getCapacity();
        this.streamId = j;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final CoreDataOperation dataOperation(CrailBuffer crailBuffer) throws Exception {
        this.blockMap.clear();
        this.pendingBlocks.clear();
        CoreDataOperation coreDataOperation = new CoreDataOperation(this, crailBuffer);
        while (coreDataOperation.remaining() > 0) {
            int minFileBuf = CrailUtils.minFileBuf(blockRemaining(), coreDataOperation.remaining());
            CoreSubOperation coreSubOperation = new CoreSubOperation(this.fileInfo.getFd(), this.position, coreDataOperation.getCurrentBufferPosition(), minFileBuf);
            this.ioStats.incTotalOps(minFileBuf);
            if (this.blockCache.containsKey(coreSubOperation.key())) {
                coreDataOperation.add(prepareAndTrigger(coreSubOperation, crailBuffer, this.blockCache.get(coreSubOperation.key())));
                this.ioStats.incCachedOps();
            } else if (this.nextBlockCache.containsKey(coreSubOperation.key())) {
                RpcFuture<RpcGetBlock> rpcFuture = this.nextBlockCache.get(coreSubOperation.key());
                this.blockMap.put(Integer.valueOf(rpcFuture.getTicket()), coreSubOperation);
                this.pendingBlocks.add(rpcFuture);
            } else {
                this.syncedCapacity = this.fileInfo.getCapacity();
                RpcFuture<RpcGetBlock> block = this.namenodeClientRpc.getBlock(this.fileInfo.getFd(), this.fileInfo.getToken(), this.position, this.syncedCapacity);
                this.blockMap.put(Integer.valueOf(block.getTicket()), coreSubOperation);
                this.pendingBlocks.add(block);
            }
            this.position += minFileBuf;
            coreDataOperation.incProcessedLen(minFileBuf);
        }
        RpcFuture<RpcGetBlock> poll = this.pendingBlocks.poll();
        while (true) {
            RpcFuture<RpcGetBlock> rpcFuture2 = poll;
            if (rpcFuture2 == null) {
                if (!coreDataOperation.isProcessed()) {
                    throw new IOException("Internal error, processed data != operation length");
                }
                crailBuffer.limit(coreDataOperation.getBufferLimit());
                crailBuffer.position(coreDataOperation.getCurrentBufferPosition());
                return coreDataOperation;
            }
            if (rpcFuture2.isDone()) {
                this.ioStats.incNonblockingOps();
                if (rpcFuture2.isPrefetched()) {
                    this.ioStats.incPrefetchedNonblockingOps();
                }
            } else {
                this.ioStats.incBlockingOps();
                if (rpcFuture2.isPrefetched()) {
                    this.ioStats.incPrefetchedBlockingOps();
                }
            }
            RpcGetBlock rpcGetBlock = rpcFuture2.get(CrailConstants.RPC_TIMEOUT, TimeUnit.MILLISECONDS);
            if (!rpcFuture2.isDone()) {
                throw new IOException("rpc timeout ");
            }
            if (rpcGetBlock.getError() != RpcErrors.ERR_OK) {
                LOG.info("inputStream: " + RpcErrors.messages[rpcGetBlock.getError()]);
                throw new IOException(RpcErrors.messages[rpcGetBlock.getError()]);
            }
            BlockInfo blockInfo = rpcGetBlock.getBlockInfo();
            CoreSubOperation coreSubOperation2 = this.blockMap.get(Integer.valueOf(rpcFuture2.getTicket()));
            coreDataOperation.add(prepareAndTrigger(coreSubOperation2, crailBuffer, blockInfo));
            this.blockCache.put(coreSubOperation2.key(), blockInfo);
            poll = this.pendingBlocks.poll();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void prefetchMetadata() throws Exception {
        long createKey = CoreSubOperation.createKey(this.fileInfo.getFd(), this.position);
        if (this.blockCache.containsKey(createKey) || this.nextBlockCache.containsKey(createKey)) {
            return;
        }
        this.syncedCapacity = this.fileInfo.getCapacity();
        RpcFuture<RpcGetBlock> block = this.namenodeClientRpc.getBlock(this.fileInfo.getFd(), this.fileInfo.getToken(), this.position, this.syncedCapacity);
        block.setPrefetched(true);
        this.nextBlockCache.put(createKey, block);
        this.ioStats.incPrefetchedOps();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void seek(long j) throws IOException {
        long min = Math.min(this.fileInfo.getCapacity(), Math.max(0L, j));
        if (min != j) {
            throw new IOException("seek position out of range, pos " + j + ", fileCapacity " + this.fileInfo.getCapacity());
        }
        this.position = min;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Void> sync() throws IOException {
        Future noOperation;
        if (this.fileInfo.getToken() <= 0 || this.syncedCapacity >= this.fileInfo.getCapacity()) {
            noOperation = new NoOperation();
        } else {
            this.syncedCapacity = this.fileInfo.getCapacity();
            noOperation = new SyncNodeFuture(this.namenodeClientRpc.setFile(this.fileInfo, false));
        }
        return noOperation;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void updateIOStats() {
        this.ioStats.setCapacity(this.fileInfo.getCapacity());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getStreamId() {
        return this.streamId;
    }

    public long position() {
        return this.position;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CoreIOStatistics getCoreStatistics() {
        return this.ioStats;
    }

    public CoreNode getFile() {
        return this.node;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferCheckpoint getBufferCheckpoint() {
        return this.bufferCheckpoint;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setCapacity(long j) {
        this.fileInfo.setCapacity(j);
    }

    private long blockRemaining() {
        return CrailConstants.BLOCK_SIZE - (this.position % CrailConstants.BLOCK_SIZE);
    }

    private StorageFuture prepareAndTrigger(CoreSubOperation coreSubOperation, CrailBuffer crailBuffer, BlockInfo blockInfo) throws Exception {
        try {
            StorageEndpoint dataEndpoint = this.endpointCache.getDataEndpoint(blockInfo.getDnInfo());
            crailBuffer.clear();
            crailBuffer.position(coreSubOperation.getBufferPosition());
            crailBuffer.limit(crailBuffer.position() + coreSubOperation.getLen());
            StorageFuture trigger = trigger(dataEndpoint, coreSubOperation, crailBuffer, blockInfo);
            incStats(dataEndpoint.isLocal());
            return trigger;
        } catch (IOException e) {
            LOG.info("ERROR: failed data operation");
            e.printStackTrace();
            throw e;
        }
    }

    private void incStats(boolean z) {
        if (CrailConstants.STATISTICS) {
            if (z) {
                this.ioStats.incLocalOps();
                if (this.fileInfo.getType().isDirectory()) {
                    this.ioStats.incLocalDirOps();
                    return;
                }
                return;
            }
            this.ioStats.incRemoteOps();
            if (this.fileInfo.getType().isDirectory()) {
                this.ioStats.incRemoteDirOps();
            }
        }
    }
}
