package org.apache.accumulo.master.replication;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.replication.ReplicationTableOfflineException;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.fate.util.UtilWaitThread;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooUtil;
import org.apache.accumulo.server.replication.DistributedWorkQueueWorkAssignerHelper;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.WorkAssigner;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.server.zookeeper.DistributedWorkQueue;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/master/replication/DistributedWorkQueueWorkAssigner.class */
public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
    private static final Logger log = LoggerFactory.getLogger(DistributedWorkQueueWorkAssigner.class);
    protected AccumuloClient client;
    protected AccumuloConfiguration conf;
    protected DistributedWorkQueue workQueue;
    protected int maxQueueSize;
    protected ZooCache zooCache;

    protected boolean isWorkRequired(Replication.Status status) {
        return StatusUtil.isWorkRequired(status);
    }

    protected void setClient(AccumuloClient accumuloClient) {
        this.client = accumuloClient;
    }

    protected void setWorkQueue(DistributedWorkQueue distributedWorkQueue) {
        this.workQueue = distributedWorkQueue;
    }

    protected void setMaxQueueSize(int i) {
        this.maxQueueSize = i;
    }

    protected void setZooCache(ZooCache zooCache) {
        this.zooCache = zooCache;
    }

    protected void initializeWorkQueue(AccumuloConfiguration accumuloConfiguration) {
        this.workQueue = new DistributedWorkQueue(ZooUtil.getRoot(this.client.instanceOperations().getInstanceID()) + "/replication/workqueue", accumuloConfiguration);
    }

    public void configure(AccumuloConfiguration accumuloConfiguration, AccumuloClient accumuloClient) {
        this.conf = accumuloConfiguration;
        this.client = accumuloClient;
    }

    public void assignWork() {
        if (this.workQueue == null) {
            initializeWorkQueue(this.conf);
        }
        initializeQueuedWork();
        if (this.zooCache == null) {
            this.zooCache = new ZooCache(this.workQueue.getZooReaderWriter());
        }
        this.maxQueueSize = this.conf.getCount(Property.REPLICATION_MAX_WORK_QUEUE);
        createWork();
        cleanupFinishedWork();
    }

    protected void createWork() {
        try {
            Scanner<Map.Entry> scanner = ReplicationTable.getScanner(this.client);
            ReplicationSchema.OrderSection.limit(scanner);
            Text text = new Text();
            for (Map.Entry entry : scanner) {
                if (getQueueSize() > this.maxQueueSize) {
                    log.warn("Queued replication work exceeds configured maximum ({}), sleeping to allow work to occur", Integer.valueOf(this.maxQueueSize));
                    return;
                }
                String file = ReplicationSchema.OrderSection.getFile((Key) entry.getKey(), text);
                ReplicationSchema.OrderSection.getTableId((Key) entry.getKey(), text);
                log.info("Determining if {} from {} needs to be replicated", file, text.toString());
                try {
                    Scanner<Map.Entry> scanner2 = ReplicationTable.getScanner(this.client);
                    ReplicationSchema.WorkSection.limit(scanner2);
                    scanner2.setRange(Range.exact(file));
                    int i = 0;
                    int i2 = 0;
                    for (Map.Entry entry2 : scanner2) {
                        i2++;
                        try {
                            Replication.Status fromValue = StatusUtil.fromValue((Value) entry2.getValue());
                            ReplicationTarget target = ReplicationSchema.WorkSection.getTarget((Key) entry2.getKey(), text);
                            Set<String> queuedWork = getQueuedWork(target);
                            Path path = new Path(file);
                            String queueKey = DistributedWorkQueueWorkAssignerHelper.getQueueKey(path.getName(), target);
                            if (shouldQueueWork(target)) {
                                if (!isWorkRequired(fromValue)) {
                                    log.debug("Not queueing work for {} to {} because {} doesn't need replication", new Object[]{file, target, ProtobufUtil.toString(fromValue)});
                                    if (queuedWork.contains(queueKey)) {
                                        log.debug("Removing {} from replication state to {} because replication is complete", queueKey, target.getPeerName());
                                        removeQueuedWork(target, queueKey);
                                    }
                                } else if (queueWork(path, target)) {
                                    i++;
                                }
                            } else if (!isWorkRequired(fromValue) && queuedWork.contains(queueKey)) {
                                log.debug("Removing {} from replication state to {} because replication is complete", queueKey, target.getPeerName());
                                removeQueuedWork(target, queueKey);
                            }
                        } catch (InvalidProtocolBufferException e) {
                            log.warn("Could not deserialize protobuf from work entry for {} to {}, will retry", new Object[]{file, ReplicationTarget.from(((Key) entry2.getKey()).getColumnQualifier()), e});
                        }
                    }
                    log.debug("Read {} replication entries from the WorkSection of the replication table", Integer.valueOf(i2));
                    log.info("Assigned {} replication work entries for {}", Integer.valueOf(i), file);
                } catch (ReplicationTableOfflineException e2) {
                    log.warn("Replication table is offline. Will retry...");
                    UtilWaitThread.sleepUninterruptibly(5L, TimeUnit.SECONDS);
                    return;
                }
            }
        } catch (ReplicationTableOfflineException e3) {
        }
    }

    protected abstract boolean shouldQueueWork(ReplicationTarget replicationTarget);

    protected abstract int getQueueSize();

    protected abstract void initializeQueuedWork();

    protected abstract boolean queueWork(Path path, ReplicationTarget replicationTarget);

    protected abstract Set<String> getQueuedWork(ReplicationTarget replicationTarget);

    protected abstract void removeQueuedWork(ReplicationTarget replicationTarget, String str);

    protected abstract void cleanupFinishedWork();
}
