package org.apache.rocketmq.store.ha;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.CommitLog;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.PutMessageSpinLock;
import org.apache.rocketmq.store.PutMessageStatus;

/* loaded from: input_file:org/apache/rocketmq/store/ha/HAService.class */
public class HAService {
    private static final InternalLogger log = InternalLoggerFactory.getLogger("RocketmqStore");
    private final AcceptSocketService acceptSocketService;
    private final DefaultMessageStore defaultMessageStore;
    private final AtomicInteger connectionCount = new AtomicInteger(0);
    private final List<HAConnection> connectionList = new LinkedList();
    private final WaitNotifyObject waitNotifyObject = new WaitNotifyObject();
    private final AtomicLong push2SlaveMaxOffset = new AtomicLong(0);
    private final GroupTransferService groupTransferService = new GroupTransferService();
    private final HAClient haClient = new HAClient();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/ha/HAService$AcceptSocketService.class */
    public class AcceptSocketService extends ServiceThread {
        private final SocketAddress socketAddressListen;
        private ServerSocketChannel serverSocketChannel;
        private Selector selector;

        public AcceptSocketService(int i) {
            this.socketAddressListen = new InetSocketAddress(i);
        }

        public void beginAccept() throws Exception {
            this.serverSocketChannel = ServerSocketChannel.open();
            this.selector = RemotingUtil.openSelector();
            this.serverSocketChannel.socket().setReuseAddress(true);
            this.serverSocketChannel.socket().bind(this.socketAddressListen);
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocketChannel.register(this.selector, 16);
        }

        public void shutdown(boolean z) {
            super.shutdown(z);
            try {
                this.serverSocketChannel.close();
                this.selector.close();
            } catch (IOException e) {
                HAService.log.error("AcceptSocketService shutdown exception", e);
            }
        }

        public void run() {
            HAService.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    this.selector.select(1000L);
                    Set<SelectionKey> selectedKeys = this.selector.selectedKeys();
                    if (selectedKeys != null) {
                        for (SelectionKey selectionKey : selectedKeys) {
                            if ((selectionKey.readyOps() & 16) != 0) {
                                SocketChannel accept = ((ServerSocketChannel) selectionKey.channel()).accept();
                                if (accept != null) {
                                    HAService.log.info("HAService receive new connection, " + accept.socket().getRemoteSocketAddress());
                                    try {
                                        HAConnection hAConnection = new HAConnection(HAService.this, accept);
                                        hAConnection.start();
                                        HAService.this.addConnection(hAConnection);
                                    } catch (Exception e) {
                                        HAService.log.error("new HAConnection exception", e);
                                        accept.close();
                                    }
                                }
                            } else {
                                HAService.log.warn("Unexpected ops in select " + selectionKey.readyOps());
                            }
                        }
                        selectedKeys.clear();
                    }
                } catch (Exception e2) {
                    HAService.log.error(getServiceName() + " service has exception.", e2);
                }
            }
            HAService.log.info(getServiceName() + " service end");
        }

        public String getServiceName() {
            return AcceptSocketService.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/ha/HAService$GroupTransferService.class */
    public class GroupTransferService extends ServiceThread {
        private final WaitNotifyObject notifyTransferObject = new WaitNotifyObject();
        private final PutMessageSpinLock lock = new PutMessageSpinLock();
        private volatile LinkedList<CommitLog.GroupCommitRequest> requestsWrite = new LinkedList<>();
        private volatile LinkedList<CommitLog.GroupCommitRequest> requestsRead = new LinkedList<>();

        GroupTransferService() {
        }

        public void putRequest(CommitLog.GroupCommitRequest groupCommitRequest) {
            this.lock.lock();
            try {
                this.requestsWrite.add(groupCommitRequest);
                wakeup();
            } finally {
                this.lock.unlock();
            }
        }

        public void notifyTransferSome() {
            this.notifyTransferObject.wakeup();
        }

        private void swapRequests() {
            this.lock.lock();
            try {
                LinkedList<CommitLog.GroupCommitRequest> linkedList = this.requestsWrite;
                this.requestsWrite = this.requestsRead;
                this.requestsRead = linkedList;
            } finally {
                this.lock.unlock();
            }
        }

        private void doWaitTransfer() {
            if (this.requestsRead.isEmpty()) {
                return;
            }
            Iterator<CommitLog.GroupCommitRequest> it = this.requestsRead.iterator();
            while (it.hasNext()) {
                CommitLog.GroupCommitRequest next = it.next();
                boolean z = HAService.this.push2SlaveMaxOffset.get() >= next.getNextOffset();
                long deadLine = next.getDeadLine();
                while (!z && deadLine - System.nanoTime() > 0) {
                    this.notifyTransferObject.waitForRunning(1000L);
                    z = HAService.this.push2SlaveMaxOffset.get() >= next.getNextOffset();
                }
                next.wakeupCustomer(z ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
            }
            this.requestsRead = new LinkedList<>();
        }

        public void run() {
            HAService.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    waitForRunning(10L);
                    doWaitTransfer();
                } catch (Exception e) {
                    HAService.log.warn(getServiceName() + " service has exception. ", e);
                }
            }
            HAService.log.info(getServiceName() + " service end");
        }

        protected void onWaitEnd() {
            swapRequests();
        }

        public String getServiceName() {
            return GroupTransferService.class.getSimpleName();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/rocketmq/store/ha/HAService$HAClient.class */
    public class HAClient extends ServiceThread {
        private static final int READ_MAX_BUFFER_SIZE = 4194304;
        private SocketChannel socketChannel;
        private final AtomicReference<String> masterAddress = new AtomicReference<>();
        private final ByteBuffer reportOffset = ByteBuffer.allocate(8);
        private long lastWriteTimestamp = System.currentTimeMillis();
        private long currentReportedOffset = 0;
        private int dispatchPosition = 0;
        private ByteBuffer byteBufferRead = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
        private ByteBuffer byteBufferBackup = ByteBuffer.allocate(READ_MAX_BUFFER_SIZE);
        private Selector selector = RemotingUtil.openSelector();

        public HAClient() throws IOException {
        }

        public void updateMasterAddress(String str) {
            String str2 = this.masterAddress.get();
            if (str2 == null || !str2.equals(str)) {
                this.masterAddress.set(str);
                HAService.log.info("update master address, OLD: " + str2 + " NEW: " + str);
            }
        }

        private boolean isTimeToReportOffset() {
            return HAService.this.defaultMessageStore.getSystemClock().now() - this.lastWriteTimestamp > ((long) HAService.this.defaultMessageStore.getMessageStoreConfig().getHaSendHeartbeatInterval());
        }

        private boolean reportSlaveMaxOffset(long j) {
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            this.reportOffset.putLong(j);
            this.reportOffset.position(0);
            this.reportOffset.limit(8);
            for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) {
                try {
                    this.socketChannel.write(this.reportOffset);
                } catch (IOException e) {
                    HAService.log.error(getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e);
                    return false;
                }
            }
            this.lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
            return !this.reportOffset.hasRemaining();
        }

        private void reallocateByteBuffer() {
            int i = READ_MAX_BUFFER_SIZE - this.dispatchPosition;
            if (i > 0) {
                this.byteBufferRead.position(this.dispatchPosition);
                this.byteBufferBackup.position(0);
                this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
                this.byteBufferBackup.put(this.byteBufferRead);
            }
            swapByteBuffer();
            this.byteBufferRead.position(i);
            this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
            this.dispatchPosition = 0;
        }

        private void swapByteBuffer() {
            ByteBuffer byteBuffer = this.byteBufferRead;
            this.byteBufferRead = this.byteBufferBackup;
            this.byteBufferBackup = byteBuffer;
        }

        private boolean processReadEvent() {
            int i = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                    int read = this.socketChannel.read(this.byteBufferRead);
                    if (read > 0) {
                        i = 0;
                        if (!dispatchReadRequest()) {
                            HAService.log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else {
                        if (read != 0) {
                            HAService.log.info("HAClient, processReadEvent read socket < 0");
                            return false;
                        }
                        i++;
                        if (i >= 3) {
                            return true;
                        }
                    }
                } catch (IOException e) {
                    HAService.log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }
            return true;
        }

        private boolean dispatchReadRequest() {
            do {
                int position = this.byteBufferRead.position() - this.dispatchPosition;
                if (position >= 12) {
                    long j = this.byteBufferRead.getLong(this.dispatchPosition);
                    int i = this.byteBufferRead.getInt(this.dispatchPosition + 8);
                    long maxPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                    if (maxPhyOffset != 0 && maxPhyOffset != j) {
                        HAService.log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + maxPhyOffset + " MASTER: " + j);
                        return false;
                    }
                    if (position >= 12 + i) {
                        HAService.this.defaultMessageStore.appendToCommitLog(j, this.byteBufferRead.array(), this.dispatchPosition + 12, i);
                        this.dispatchPosition += 12 + i;
                    }
                }
                if (this.byteBufferRead.hasRemaining()) {
                    return true;
                }
                reallocateByteBuffer();
                return true;
            } while (reportSlaveMaxOffsetPlus());
            return false;
        }

        private boolean reportSlaveMaxOffsetPlus() {
            boolean z = true;
            long maxPhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            if (maxPhyOffset > this.currentReportedOffset) {
                this.currentReportedOffset = maxPhyOffset;
                z = reportSlaveMaxOffset(this.currentReportedOffset);
                if (!z) {
                    closeMaster();
                    HAService.log.error("HAClient, reportSlaveMaxOffset error, " + this.currentReportedOffset);
                }
            }
            return z;
        }

        private boolean connectMaster() throws ClosedChannelException {
            SocketAddress string2SocketAddress;
            if (null == this.socketChannel) {
                String str = this.masterAddress.get();
                if (str != null && (string2SocketAddress = RemotingUtil.string2SocketAddress(str)) != null) {
                    this.socketChannel = RemotingUtil.connect(string2SocketAddress);
                    if (this.socketChannel != null) {
                        this.socketChannel.register(this.selector, 1);
                    }
                }
                this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
                this.lastWriteTimestamp = System.currentTimeMillis();
            }
            return this.socketChannel != null;
        }

        private void closeMaster() {
            if (null != this.socketChannel) {
                try {
                    SelectionKey keyFor = this.socketChannel.keyFor(this.selector);
                    if (keyFor != null) {
                        keyFor.cancel();
                    }
                    this.socketChannel.close();
                    this.socketChannel = null;
                } catch (IOException e) {
                    HAService.log.warn("closeMaster exception. ", e);
                }
                this.lastWriteTimestamp = 0L;
                this.dispatchPosition = 0;
                this.byteBufferBackup.position(0);
                this.byteBufferBackup.limit(READ_MAX_BUFFER_SIZE);
                this.byteBufferRead.position(0);
                this.byteBufferRead.limit(READ_MAX_BUFFER_SIZE);
            }
        }

        public void run() {
            HAService.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                } catch (Exception e) {
                    HAService.log.warn(getServiceName() + " service has exception. ", e);
                    waitForRunning(5000L);
                }
                if (connectMaster()) {
                    if (isTimeToReportOffset() && !reportSlaveMaxOffset(this.currentReportedOffset)) {
                        closeMaster();
                    }
                    this.selector.select(1000L);
                    if (!processReadEvent()) {
                        closeMaster();
                    }
                    if (reportSlaveMaxOffsetPlus()) {
                        long now = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                        if (now > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                            HAService.log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + now);
                            closeMaster();
                            HAService.log.warn("HAClient, master not response some time, so close connection");
                        }
                    }
                } else {
                    waitForRunning(5000L);
                }
            }
            HAService.log.info(getServiceName() + " service end");
        }

        public void shutdown() {
            super.shutdown();
            closeMaster();
        }

        public String getServiceName() {
            return HAClient.class.getSimpleName();
        }
    }

    public HAService(DefaultMessageStore defaultMessageStore) throws IOException {
        this.defaultMessageStore = defaultMessageStore;
        this.acceptSocketService = new AcceptSocketService(defaultMessageStore.getMessageStoreConfig().getHaListenPort());
    }

    public void updateMasterAddress(String str) {
        if (this.haClient != null) {
            this.haClient.updateMasterAddress(str);
        }
    }

    public void putRequest(CommitLog.GroupCommitRequest groupCommitRequest) {
        this.groupTransferService.putRequest(groupCommitRequest);
    }

    public boolean isSlaveOK(long j) {
        return (this.connectionCount.get() > 0) && j - this.push2SlaveMaxOffset.get() < ((long) this.defaultMessageStore.getMessageStoreConfig().getHaSlaveFallbehindMax());
    }

    public void notifyTransferSome(long j) {
        long j2 = this.push2SlaveMaxOffset.get();
        while (true) {
            long j3 = j2;
            if (j <= j3) {
                return;
            }
            if (this.push2SlaveMaxOffset.compareAndSet(j3, j)) {
                this.groupTransferService.notifyTransferSome();
                return;
            }
            j2 = this.push2SlaveMaxOffset.get();
        }
    }

    public AtomicInteger getConnectionCount() {
        return this.connectionCount;
    }

    public void start() throws Exception {
        this.acceptSocketService.beginAccept();
        this.acceptSocketService.start();
        this.groupTransferService.start();
        this.haClient.start();
    }

    public void addConnection(HAConnection hAConnection) {
        synchronized (this.connectionList) {
            this.connectionList.add(hAConnection);
        }
    }

    public void removeConnection(HAConnection hAConnection) {
        synchronized (this.connectionList) {
            this.connectionList.remove(hAConnection);
        }
    }

    public void shutdown() {
        this.haClient.shutdown();
        this.acceptSocketService.shutdown(true);
        destroyConnections();
        this.groupTransferService.shutdown();
    }

    public void destroyConnections() {
        synchronized (this.connectionList) {
            Iterator<HAConnection> it = this.connectionList.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            this.connectionList.clear();
        }
    }

    public DefaultMessageStore getDefaultMessageStore() {
        return this.defaultMessageStore;
    }

    public WaitNotifyObject getWaitNotifyObject() {
        return this.waitNotifyObject;
    }

    public AtomicLong getPush2SlaveMaxOffset() {
        return this.push2SlaveMaxOffset;
    }
}
