package com.sogou.ai.nsrss.pipeline;

import com.sogou.ai.nsrss.errors.SogouError;
import com.sogou.ai.nsrss.utils.Log;
import com.tencent.matrix.trace.core.MethodBeat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/* compiled from: SogouSource */
/* loaded from: classes4.dex */
public class SequentialExecutor<INPUT, OUTPUT> {
    public static final String TAG = "SequentialExecutor";
    public static ExclusiveAssets sAssets;
    public static Map<String, BlockingQueue<Job>> sJobMap;
    public static BlockingQueue<BlockingQueue<Job>> sJobQueue;
    public static Object sLock;
    public static AtomicInteger sRefCount;

    /* compiled from: SogouSource */
    /* loaded from: classes4.dex */
    public interface ExclusiveAssets<INPUT, OUTPUT extends List<Capsule>> {
        void destroy();

        void drain();

        boolean init();

        OUTPUT process(String str, INPUT input);

        void reset();
    }

    /* compiled from: SogouSource */
    /* loaded from: classes4.dex */
    public interface ExclusiveAssetsFactory<INPUT, OUTPUT extends List<Capsule>> {
        ExclusiveAssets<INPUT, OUTPUT> createInstance();
    }

    /* compiled from: SogouSource */
    /* loaded from: classes4.dex */
    public static class Job<INPUT, OUTPUT> {
        public Capsule closeReason;
        public INPUT input;
        public boolean isFinal;
        public String session;
        public QueuedSource<OUTPUT> source;

        public Job(String str, QueuedSource<OUTPUT> queuedSource, INPUT input, Capsule capsule) {
            MethodBeat.i(18800);
            this.isFinal = false;
            this.session = str;
            this.source = queuedSource;
            this.input = input;
            this.isFinal = capsule != null;
            this.closeReason = capsule;
            MethodBeat.o(18800);
        }
    }

    /* compiled from: SogouSource */
    /* loaded from: classes4.dex */
    public class JobExecutor extends Thread {
        public JobExecutor() {
            super("sequential_executor");
            MethodBeat.i(18801);
            MethodBeat.o(18801);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Job job;
            MethodBeat.i(18802);
            boolean z = true;
            do {
                try {
                    BlockingQueue blockingQueue = (BlockingQueue) SequentialExecutor.sJobQueue.take();
                    if (z) {
                        z = false;
                    } else {
                        synchronized (SequentialExecutor.sLock) {
                            try {
                                if (SequentialExecutor.sAssets != null) {
                                    SequentialExecutor.sAssets.reset();
                                }
                            } finally {
                                MethodBeat.o(18802);
                            }
                        }
                    }
                    do {
                        job = (Job) blockingQueue.take();
                        for (Capsule capsule : SequentialExecutor.sAssets.process(job.session, job.isFinal ? null : job.input)) {
                            SogouError error = capsule.getError();
                            if (capsule.getContent() != null) {
                                capsule.setError(null);
                                Log.d(SequentialExecutor.TAG, "write to q " + capsule + " " + job.source + " " + job.session);
                                job.source.writeToQueue(capsule);
                            }
                            if (error != null) {
                                Log.d(SequentialExecutor.TAG, "close " + error + " " + job.closeReason + " " + job.source + " " + job.session);
                                job.source.close(job.closeReason, error, capsule.getMetricInfo(), null);
                            }
                        }
                    } while (!job.isFinal);
                    Log.d(SequentialExecutor.TAG, "break outer");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } while (!SequentialExecutor.this.release());
            Log.d(SequentialExecutor.TAG, "finish");
        }
    }

    static {
        MethodBeat.i(18806);
        sLock = new Object();
        sRefCount = new AtomicInteger(0);
        MethodBeat.o(18806);
    }

    public SequentialExecutor(ExclusiveAssetsFactory exclusiveAssetsFactory) {
        MethodBeat.i(18803);
        synchronized (sLock) {
            try {
                if (sAssets == null) {
                    Log.d(TAG, "create instance");
                    ExclusiveAssets createInstance = exclusiveAssetsFactory.createInstance();
                    sAssets = createInstance;
                    createInstance.init();
                    sJobQueue = new LinkedBlockingQueue();
                    sJobMap = new HashMap();
                    new JobExecutor().start();
                }
                int incrementAndGet = sRefCount.incrementAndGet();
                Log.d(TAG, "ref count" + incrementAndGet);
            } catch (Throwable th) {
                MethodBeat.o(18803);
                throw th;
            }
        }
        MethodBeat.o(18803);
    }

    private synchronized BlockingQueue<Job> createQueue(String str) {
        LinkedBlockingQueue linkedBlockingQueue;
        MethodBeat.i(18804);
        linkedBlockingQueue = new LinkedBlockingQueue();
        sJobQueue.add(linkedBlockingQueue);
        sJobMap.put(str, linkedBlockingQueue);
        MethodBeat.o(18804);
        return linkedBlockingQueue;
    }

    private synchronized BlockingQueue<Job> getQueue(String str) {
        BlockingQueue<Job> blockingQueue;
        MethodBeat.i(18805);
        blockingQueue = sJobMap.get(str);
        MethodBeat.o(18805);
        return blockingQueue;
    }

    public void process(String str, QueuedSource<OUTPUT> queuedSource, INPUT input, Capsule capsule) {
        MethodBeat.i(18807);
        BlockingQueue<Job> queue = getQueue(str);
        if (queue == null) {
            queue = createQueue(str);
        }
        if (queue != null) {
            queue.add(new Job(str, queuedSource, input, capsule));
        }
        MethodBeat.o(18807);
    }

    public boolean release() {
        MethodBeat.i(18808);
        synchronized (sLock) {
            try {
                int decrementAndGet = sRefCount.decrementAndGet();
                Log.d(TAG, "release " + decrementAndGet);
                if (decrementAndGet != 0 || sAssets == null) {
                    MethodBeat.o(18808);
                    return false;
                }
                sAssets.drain();
                sAssets.destroy();
                sRefCount.set(0);
                sAssets = null;
                MethodBeat.o(18808);
                return true;
            } catch (Throwable th) {
                MethodBeat.o(18808);
                throw th;
            }
        }
    }
}
