package net.lulihu.mule.tccTransaction.eventExecutor.disruptor;

import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.ExceptionHandler;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import java.util.concurrent.atomic.AtomicInteger;
import net.lulihu.ObjectKit.LogKit;
import net.lulihu.ObjectKit.NumberKit;
import net.lulihu.disruptorKit.DefaultEventFactory;
import net.lulihu.disruptorKit.Event;
import net.lulihu.disruptorKit.Producer;
import net.lulihu.mule.tccTransaction.MuleTccConfig;
import net.lulihu.mule.tccTransaction.MuleTccShutdownManage;
import net.lulihu.mule.tccTransaction.service.MuleTccShutdownService;
import net.lulihu.mule.tccTransaction.service.TransactionCoordinatorService;
import net.lulihu.mule.tccTransaction.service.TransactionExecutorEventService;
import net.lulihu.mule.tccTransaction.service.TransactionHandlerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:net/lulihu/mule/tccTransaction/eventExecutor/disruptor/AbstractDisruptorEventPublisher.class */
public abstract class AbstractDisruptorEventPublisher implements TransactionExecutorEventService, MuleTccShutdownService, ExceptionHandler<Event<DisruptorTransactionEvent>> {
    private static final Logger log = LoggerFactory.getLogger(AbstractDisruptorEventPublisher.class);
    private Disruptor<Event<DisruptorTransactionEvent>> disruptor;
    private Producer<DisruptorTransactionEvent> producer;
    protected TransactionCoordinatorService transactionCoordinatorService;

    @Override // net.lulihu.mule.tccTransaction.service.TransactionExecutorService
    public final void initialization(MuleTccConfig muleTccConfig, TransactionCoordinatorService transactionCoordinatorService) {
        this.transactionCoordinatorService = transactionCoordinatorService;
        LogKit.debug(log, "开始初始化disruptor异步队列...");
        disruptorInitialization(muleTccConfig);
        LogKit.debug(log, "disruptor异步队列初始化成功...");
        MuleTccShutdownManage.getInstance().addComponents(this);
    }

    private void disruptorInitialization(MuleTccConfig muleTccConfig) {
        int bufferSize = muleTccConfig.getBufferSize();
        int closest2IndexGreaterThanSelf = Integer.bitCount(bufferSize) == 1 ? bufferSize : NumberKit.getClosest2IndexGreaterThanSelf(bufferSize);
        int consumerThreadNum = muleTccConfig.getConsumerThreadNum();
        DisruptorEventConsumer[] disruptorEventConsumerArr = new DisruptorEventConsumer[consumerThreadNum];
        for (int i = 0; i < consumerThreadNum; i++) {
            disruptorEventConsumerArr[i] = new DisruptorEventConsumer(this.transactionCoordinatorService);
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        this.disruptor = new Disruptor<>(DefaultEventFactory.factory(), closest2IndexGreaterThanSelf, runnable -> {
            return new Thread(new ThreadGroup("Mule-disruptor"), runnable, "Mule异步执行线程-" + atomicInteger.incrementAndGet());
        }, ProducerType.MULTI, new BlockingWaitStrategy());
        this.disruptor.handleEventsWithWorkerPool(disruptorEventConsumerArr);
        this.disruptor.setDefaultExceptionHandler(this);
        this.producer = new Producer<>(this.disruptor.getRingBuffer());
        this.disruptor.start();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void submit(EventTypeEnum eventTypeEnum, Object... objArr) {
        DisruptorTransactionEvent disruptorTransactionEvent = new DisruptorTransactionEvent();
        disruptorTransactionEvent.setEventType(eventTypeEnum);
        disruptorTransactionEvent.setArgs(objArr);
        this.producer.submit(disruptorTransactionEvent);
    }

    public void handleOnStartException(Throwable th) {
        LogKit.error(log, "Disruptor 启动时出现例外...", th);
    }

    public void handleEventException(Throwable th, long j, Event<DisruptorTransactionEvent> event) {
        LogKit.error(log, "Disruptor 异步线程处理过程中发生错误...\n处理事件参数:{}", event, th);
    }

    public void handleOnShutdownException(Throwable th) {
        LogKit.error(log, "Disruptor 关闭时出现例外...", th);
    }

    @Override // net.lulihu.mule.tccTransaction.service.TransactionSupportService
    public boolean support(TransactionHandlerService transactionHandlerService) {
        return true;
    }

    @Override // net.lulihu.mule.tccTransaction.service.TransactionSupportService, net.lulihu.mule.tccTransaction.service.MuleTccShutdownService
    public int order() {
        return 0;
    }

    @Override // net.lulihu.mule.tccTransaction.service.ComponentService
    public String componentName() {
        return "Disruptor 异步线程队列";
    }

    @Override // net.lulihu.mule.tccTransaction.service.MuleTccShutdownService
    public final void shutdown() {
        LogKit.debug(log, "Disruptor 停止接收新的消息...");
        LogKit.debug(log, "Disruptor 开始处理内部剩余消息...");
        this.disruptor.shutdown();
        LogKit.debug(log, "Disruptor 关闭成功...");
    }
}
