/*
 * Decompiled with CFR 0.152.
 */
package org.redisson;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.redisson.RedissonBlockingQueue;
import org.redisson.RedissonList;
import org.redisson.RedissonObject;
import org.redisson.RedissonShutdownException;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RFuture;
import org.redisson.api.RMap;
import org.redisson.api.RRemoteService;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.executor.RemotePromise;
import org.redisson.misc.CompletableFutureWrapper;
import org.redisson.remote.BaseRemoteService;
import org.redisson.remote.RRemoteServiceResponse;
import org.redisson.remote.RemoteServiceAck;
import org.redisson.remote.RemoteServiceCancelRequest;
import org.redisson.remote.RemoteServiceCancelResponse;
import org.redisson.remote.RemoteServiceMethod;
import org.redisson.remote.RemoteServiceRequest;
import org.redisson.remote.RemoteServiceResponse;
import org.redisson.remote.ResponseEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedissonRemoteService
extends BaseRemoteService
implements RRemoteService {
    private static final Logger log = LoggerFactory.getLogger(RedissonRemoteService.class);
    private final Map<Class<?>, Entry> remoteMap = new ConcurrentHashMap();

    public RedissonRemoteService(Codec codec, String name, CommandAsyncExecutor commandExecutor, String executorId, ConcurrentMap<String, ResponseEntry> responses) {
        super(codec, name, commandExecutor, executorId, responses);
    }

    public String getRequestTasksMapName(Class<?> remoteInterface) {
        String queue = this.getRequestQueueName(remoteInterface);
        return queue + ":tasks";
    }

    @Override
    protected CompletableFuture<Boolean> addAsync(String requestQueueName, RemoteServiceRequest request, RemotePromise<Object> result) {
        RFuture future = this.commandExecutor.evalWriteNoRetryAsync(this.name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "redis.call('hset', KEYS[2], ARGV[1], ARGV[2]);redis.call('rpush', KEYS[1], ARGV[1]); return 1;", Arrays.asList(requestQueueName, requestQueueName + ":tasks"), request.getId(), this.encode(request));
        result.setAddFuture(future.toCompletableFuture());
        return future.toCompletableFuture();
    }

    @Override
    protected CompletableFuture<Boolean> removeAsync(String requestQueueName, String taskId) {
        RFuture f = this.commandExecutor.evalWriteNoRetryAsync(this.name, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('lrem', KEYS[1], 1, ARGV[1]) > 0 then redis.call('hdel', KEYS[2], ARGV[1]);return 1;end;return 0;", Arrays.asList(requestQueueName, requestQueueName + ":tasks"), taskId);
        return f.toCompletableFuture();
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object) {
        this.register(remoteInterface, object, 1);
    }

    @Override
    public <T> void deregister(Class<T> remoteInterface) {
        Entry entry = this.remoteMap.remove(remoteInterface);
        if (entry != null && entry.getFuture() != null) {
            entry.getFuture().cancel(false);
        }
    }

    @Override
    public int getPendingInvocations(Class<?> remoteInterface) {
        String requestQueueName = this.getRequestQueueName(remoteInterface);
        RBlockingQueue requestQueue = this.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
        return requestQueue.size();
    }

    @Override
    public RFuture<Integer> getPendingInvocationsAsync(Class<?> remoteInterface) {
        String requestQueueName = this.getRequestQueueName(remoteInterface);
        RBlockingQueue requestQueue = this.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
        return requestQueue.sizeAsync();
    }

    @Override
    public int getFreeWorkers(Class<?> remoteInterface) {
        Entry entry = this.remoteMap.get(remoteInterface);
        if (entry == null) {
            return 0;
        }
        return entry.getCounter().get();
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object, int workers) {
        this.register(remoteInterface, object, workers, this.commandExecutor.getConnectionManager().getExecutor());
    }

    private <V> RBlockingQueue<V> getBlockingQueue(String name, Codec codec) {
        return new RedissonBlockingQueue(codec, this.commandExecutor, name);
    }

    @Override
    public <T> void register(Class<T> remoteInterface, T object, int workers, ExecutorService executor) {
        if (workers < 1) {
            throw new IllegalArgumentException("executorsAmount can't be lower than 1");
        }
        if (this.remoteMap.putIfAbsent(remoteInterface, new Entry(workers)) != null) {
            return;
        }
        String requestQueueName = this.getRequestQueueName(remoteInterface);
        RBlockingQueue<String> requestQueue = this.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
        this.subscribe(remoteInterface, requestQueue, executor, object);
    }

    @Override
    public <T> boolean tryExecute(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit) throws InterruptedException {
        return this.tryExecute(remoteInterface, object, this.commandExecutor.getConnectionManager().getExecutor(), timeout, timeUnit);
    }

    @Override
    public <T> boolean tryExecute(Class<T> remoteInterface, T object, ExecutorService executorService, long timeout, TimeUnit timeUnit) throws InterruptedException {
        String requestQueueName = this.getRequestQueueName(remoteInterface);
        RBlockingQueue<String> requestQueue = this.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
        String requestId = (String)requestQueue.poll(timeout, timeUnit);
        if (requestId == null) {
            return false;
        }
        RMap<String, RemoteServiceRequest> tasks = this.getMap(((RedissonObject)((Object)requestQueue)).getRawName() + ":tasks");
        RFuture<RemoteServiceRequest> taskFuture = this.getTask(requestId, tasks);
        RemoteServiceRequest request = this.commandExecutor.getInterrupted(taskFuture);
        if (request == null) {
            throw new IllegalStateException("Task can't be found for request: " + requestId);
        }
        RFuture<RRemoteServiceResponse> r = this.executeMethod(remoteInterface, requestQueue, executorService, request, object);
        this.commandExecutor.getInterrupted(r);
        return true;
    }

    @Override
    public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, long timeout, TimeUnit timeUnit) {
        return this.tryExecuteAsync(remoteInterface, object, this.commandExecutor.getConnectionManager().getExecutor(), timeout, timeUnit);
    }

    @Override
    public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object, ExecutorService executor, long timeout, TimeUnit timeUnit) {
        String requestQueueName = this.getRequestQueueName(remoteInterface);
        RBlockingQueue requestQueue = this.getBlockingQueue(requestQueueName, StringCodec.INSTANCE);
        RFuture pollFuture = timeout == -1L ? requestQueue.pollAsync() : requestQueue.pollAsync(timeout, timeUnit);
        CompletionStage f = pollFuture.thenCompose(requestId -> {
            if (requestId == null) {
                return CompletableFuture.completedFuture(false);
            }
            RMap<String, RemoteServiceRequest> tasks = this.getMap(((RedissonObject)((Object)requestQueue)).getRawName() + ":tasks");
            RFuture<RemoteServiceRequest> taskFuture = this.getTask((String)requestId, tasks);
            return taskFuture.thenCompose(request -> {
                if (request == null) {
                    throw new CompletionException(new IllegalStateException("Task can't be found for request: " + requestId));
                }
                RFuture<RRemoteServiceResponse> future = this.executeMethod(remoteInterface, requestQueue, executor, (RemoteServiceRequest)request, object);
                return future.thenApply(r -> true);
            });
        });
        return new CompletableFutureWrapper<Boolean>(f);
    }

    @Override
    public <T> RFuture<Boolean> tryExecuteAsync(Class<T> remoteInterface, T object) {
        return this.tryExecuteAsync(remoteInterface, object, -1L, null);
    }

    private <T> void subscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, ExecutorService executor, Object bean) {
        Entry entry = this.remoteMap.get(remoteInterface);
        if (entry == null) {
            return;
        }
        RFuture<String> take = requestQueue.pollAsync(60L, TimeUnit.SECONDS);
        entry.setFuture(take);
        take.whenComplete((requestId, e) -> {
            Entry entr = this.remoteMap.get(remoteInterface);
            if (entr == null) {
                return;
            }
            if (e != null) {
                if (e instanceof RedissonShutdownException) {
                    return;
                }
                log.error("Can't process the remote service request.", e);
                this.subscribe(remoteInterface, requestQueue, executor, bean);
                return;
            }
            if (entry.getCounter().get() == 0) {
                return;
            }
            if (entry.getCounter().decrementAndGet() > 0) {
                this.subscribe(remoteInterface, requestQueue, executor, bean);
            }
            if (requestId == null) {
                this.resubscribe(remoteInterface, requestQueue, executor, bean);
                return;
            }
            RMap<String, RemoteServiceRequest> tasks = this.getMap(((RedissonObject)((Object)requestQueue)).getRawName() + ":tasks");
            RFuture<RemoteServiceRequest> taskFuture = this.getTask((String)requestId, tasks);
            taskFuture.whenComplete((request, exc) -> {
                if (exc != null) {
                    if (exc instanceof RedissonShutdownException) {
                        return;
                    }
                    log.error("Can't process the remote service request with id {}", requestId, exc);
                    this.resubscribe(remoteInterface, requestQueue, executor, bean);
                    return;
                }
                if (request == null) {
                    log.debug("Task can't be found for request: {}", requestId);
                    this.resubscribe(remoteInterface, requestQueue, executor, bean);
                    return;
                }
                long elapsedTime = System.currentTimeMillis() - request.getDate();
                if (request.getOptions().isAckExpected() && elapsedTime > request.getOptions().getAckTimeoutInMillis()) {
                    log.debug("request: {} has been skipped due to ackTimeout. Elapsed time: {}ms", (Object)request.getId(), (Object)elapsedTime);
                    this.resubscribe(remoteInterface, requestQueue, executor, bean);
                    return;
                }
                if (request.getOptions().isAckExpected()) {
                    String responseName = this.getResponseQueueName(request.getExecutorId());
                    String ackName = this.getAckName(request.getId());
                    RFuture ackClientsFuture = this.commandExecutor.evalWriteNoRetryAsync(responseName, LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if redis.call('setnx', KEYS[1], 1) == 1 then redis.call('pexpire', KEYS[1], ARGV[1]);return 1;end;return 0;", Arrays.asList(ackName), request.getOptions().getAckTimeoutInMillis());
                    ackClientsFuture.whenComplete((r, ex) -> {
                        if (ex != null) {
                            if (ex instanceof RedissonShutdownException) {
                                return;
                            }
                            log.error("Can't send ack for request: {}", request, ex);
                            this.resubscribe(remoteInterface, requestQueue, executor, bean);
                            return;
                        }
                        if (!r.booleanValue()) {
                            this.resubscribe(remoteInterface, requestQueue, executor, bean);
                            return;
                        }
                        RedissonList<RemoteServiceAck> list = new RedissonList<RemoteServiceAck>(this.codec, this.commandExecutor, responseName, null);
                        RFuture<Boolean> addFuture = list.addAsync(new RemoteServiceAck(request.getId()));
                        addFuture.whenComplete((res, exce) -> {
                            if (exce != null) {
                                if (exce instanceof RedissonShutdownException) {
                                    return;
                                }
                                log.error("Can't send ack for request: {}", request, exce);
                                this.resubscribe(remoteInterface, requestQueue, executor, bean);
                                return;
                            }
                            if (!res.booleanValue()) {
                                this.resubscribe(remoteInterface, requestQueue, executor, bean);
                                return;
                            }
                            this.executeMethod(remoteInterface, requestQueue, executor, (RemoteServiceRequest)request, bean);
                        });
                    });
                } else {
                    this.executeMethod(remoteInterface, requestQueue, executor, (RemoteServiceRequest)request, bean);
                }
            });
        });
    }

    private <T> RFuture<RRemoteServiceResponse> executeMethod(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, ExecutorService executor, RemoteServiceRequest request, Object bean) {
        RemoteServiceMethod method = Arrays.stream(remoteInterface.getMethods()).filter(m -> m.getName().equals(request.getMethodName()) && Arrays.equals(this.getMethodSignature((Method)m), request.getSignature())).map(m -> new RemoteServiceMethod((Method)m, bean)).findFirst().get();
        String responseName = this.getResponseQueueName(request.getExecutorId());
        CompletableFuture responsePromise = new CompletableFuture();
        CompletableFuture cancelRequestFuture = new CompletableFuture();
        this.scheduleCheck(this.cancelRequestMapName, request.getId(), cancelRequestFuture);
        responsePromise.whenComplete((result, e) -> {
            if (request.getOptions().isResultExpected() || result instanceof RemoteServiceCancelResponse) {
                long timeout = 60000L;
                if (request.getOptions().getExecutionTimeoutInMillis() != null) {
                    timeout = request.getOptions().getExecutionTimeoutInMillis();
                }
                RBlockingQueue<RRemoteServiceResponse> queue = this.getBlockingQueue(responseName, this.codec);
                try {
                    RRemoteServiceResponse response;
                    if (result instanceof RemoteServiceResponse && ((RemoteServiceResponse)result).getResult() instanceof Optional) {
                        Optional o = (Optional)((RemoteServiceResponse)result).getResult();
                        response = new RemoteServiceResponse(result.getId(), (Object)o.orElse(null));
                    } else {
                        response = result;
                    }
                    RFuture<Void> clientsFuture = queue.putAsync(response);
                    queue.expireAsync(timeout, TimeUnit.MILLISECONDS);
                    clientsFuture.whenComplete((res, exc) -> {
                        if (exc != null) {
                            if (exc instanceof RedissonShutdownException) {
                                return;
                            }
                            log.error("Can't send response: {} for request: {}", new Object[]{response, request, exc});
                        }
                        this.resubscribe(remoteInterface, requestQueue, executor, method.getBean());
                    });
                }
                catch (Exception ex) {
                    log.error("Can't send response: {} for request: {}", new Object[]{result, request, ex});
                }
            } else {
                this.resubscribe(remoteInterface, requestQueue, executor, method.getBean());
            }
        });
        Future<?> submitFuture = executor.submit(() -> {
            if (this.commandExecutor.getConnectionManager().isShuttingDown()) {
                return;
            }
            this.invokeMethod(request, method, cancelRequestFuture, responsePromise);
        });
        cancelRequestFuture.thenAccept(r -> {
            boolean res = submitFuture.cancel(r.isMayInterruptIfRunning());
            if (res) {
                RemoteServiceCancelResponse response = new RemoteServiceCancelResponse(request.getId(), true);
                if (!responsePromise.complete(response)) {
                    response = new RemoteServiceCancelResponse(request.getId(), false);
                }
                if (r.isSendResponse()) {
                    RMap<String, RemoteServiceCancelResponse> map = this.getMap(this.cancelResponseMapName);
                    map.fastPutAsync(request.getId(), response);
                    map.expireAsync(60L, TimeUnit.SECONDS);
                }
            }
        });
        return new CompletableFutureWrapper<RRemoteServiceResponse>(responsePromise);
    }

    protected <T> void invokeMethod(RemoteServiceRequest request, RemoteServiceMethod method, CompletableFuture<RemoteServiceCancelRequest> cancelRequestFuture, CompletableFuture<RRemoteServiceResponse> responsePromise) {
        try {
            Object result = method.getMethod().invoke(method.getBean(), request.getArgs());
            RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), result);
            responsePromise.complete(response);
        }
        catch (Exception e) {
            RemoteServiceResponse response = new RemoteServiceResponse(request.getId(), e.getCause());
            responsePromise.complete(response);
            log.error("Can't execute: {}", (Object)request, (Object)e);
        }
        if (cancelRequestFuture != null) {
            cancelRequestFuture.cancel(false);
        }
    }

    private <T> void resubscribe(Class<T> remoteInterface, RBlockingQueue<String> requestQueue, ExecutorService executor, Object bean) {
        Entry entry = this.remoteMap.get(remoteInterface);
        if (entry != null && entry.getCounter().getAndIncrement() == 0) {
            this.subscribe(remoteInterface, requestQueue, executor, bean);
        }
    }

    protected RFuture<RemoteServiceRequest> getTask(String requestId, RMap<String, RemoteServiceRequest> tasks) {
        return tasks.removeAsync(requestId);
    }

    public static class Entry {
        RFuture<String> future;
        final AtomicInteger counter;

        public Entry(int workers) {
            this.counter = new AtomicInteger(workers);
        }

        public void setFuture(RFuture<String> future) {
            this.future = future;
        }

        public RFuture<String> getFuture() {
            return this.future;
        }

        public AtomicInteger getCounter() {
            return this.counter;
        }
    }
}

