package com.brianway.webporter.data;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import us.codecraft.webmagic.thread.CountableThreadPool;

/* loaded from: input_file:com/brianway/webporter/data/BaseAssembler.class */
public class BaseAssembler<IN, OUT> {
    private static final Logger logger = LoggerFactory.getLogger(BaseAssembler.class);
    protected RawInput<IN> rawInput;
    protected DataProcessor<IN, OUT> dataProcessor;
    protected CountableThreadPool threadPool;
    protected static final int STAT_INIT = 0;
    protected static final int STAT_RUNNING = 1;
    protected static final int STAT_STOPPED = 2;
    protected int threadNum = STAT_RUNNING;
    protected List<OutPipeline<OUT>> outPipelines = new ArrayList();
    protected AtomicLong outItemCount = new AtomicLong(0);
    protected AtomicInteger stat = new AtomicInteger(STAT_INIT);
    private final AtomicLong inItemCount = new AtomicLong(0);

    public static <IN, OUT> BaseAssembler<IN, OUT> create(RawInput<IN> rawInput, DataProcessor<IN, OUT> dataProcessor) {
        return new BaseAssembler<>(rawInput, dataProcessor);
    }

    public BaseAssembler(RawInput<IN> rawInput, DataProcessor<IN, OUT> dataProcessor) {
        this.rawInput = rawInput;
        this.dataProcessor = dataProcessor;
    }

    protected void initComponent() {
        if (this.rawInput == null) {
            throw new RuntimeException("must set input");
        }
        if (this.threadPool == null || this.threadPool.isShutdown()) {
            this.threadPool = new CountableThreadPool(this.threadNum);
        }
        if (this.outPipelines.isEmpty()) {
            this.outPipelines.add(new ConsoleOutpipeline());
        }
    }

    public void run() {
        long currentTimeMillis = System.currentTimeMillis();
        checkRunningStat();
        initComponent();
        while (!Thread.currentThread().isInterrupted() && this.stat.get() == STAT_RUNNING) {
            IN poll = this.rawInput.poll();
            if (poll != null) {
                this.threadPool.execute(() -> {
                    try {
                        processInItem(poll);
                    } catch (Exception e) {
                        logger.error("error: " + poll, e);
                    } finally {
                        this.inItemCount.incrementAndGet();
                    }
                });
            } else if (this.threadPool.getThreadAlive() == 0) {
                break;
            }
        }
        this.stat.set(STAT_STOPPED);
        logger.info("Process end. spent {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        close();
        logger.info("Total time: {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        logger.info("Total outItemCount: {}", this.outItemCount);
    }

    protected void processInItem(IN in) {
        List<OUT> process = this.dataProcessor.process(in);
        if (process == null || process.isEmpty()) {
            return;
        }
        this.outItemCount.addAndGet(process.size());
        this.outPipelines.forEach(outPipeline -> {
            outPipeline.process(process);
        });
    }

    private void checkRunningStat() {
        int i;
        do {
            i = this.stat.get();
            if (i == STAT_RUNNING) {
                throw new IllegalStateException("Assembler is already running!");
            }
        } while (!this.stat.compareAndSet(i, STAT_RUNNING));
    }

    protected void checkIfRunning() {
        if (this.stat.get() == STAT_RUNNING) {
            throw new IllegalStateException("Assembler is already running!");
        }
    }

    public void close() {
        destroyEach(this.dataProcessor);
        this.outPipelines.forEach((v1) -> {
            destroyEach(v1);
        });
        this.threadPool.shutdown();
    }

    private void destroyEach(Object obj) {
        if (obj instanceof AutoCloseable) {
            try {
                ((AutoCloseable) obj).close();
            } catch (Exception e) {
                logger.warn("destroyEach: {}", e);
            }
        }
    }

    public BaseAssembler<IN, OUT> thread(int i) {
        this.threadNum = i;
        return this;
    }

    public BaseAssembler<IN, OUT> setOutPipelines(List<OutPipeline<OUT>> list) {
        checkIfRunning();
        this.outPipelines = list;
        return this;
    }

    public BaseAssembler<IN, OUT> addOutPipeline(OutPipeline<OUT> outPipeline) {
        checkIfRunning();
        this.outPipelines.add(outPipeline);
        return this;
    }

    public static void main(String[] strArr) {
        String str = "/Users/brian/Desktop/zhihu/20161124/www.zhihu.com";
        ConsoleOutpipeline consoleOutpipeline = new ConsoleOutpipeline();
        new Thread(() -> {
            create(new FileRawInput(str), new DemoDataProcessor()).addOutPipeline(consoleOutpipeline).thread(10).run();
        }).start();
    }
}
