/*
 * Decompiled with CFR 0.152.
 */
package com.netflix.turbine.streaming.servlet;

import com.netflix.turbine.monitor.cluster.ClusterMonitor;
import com.netflix.turbine.monitor.cluster.ClusterMonitorFactory;
import com.netflix.turbine.plugins.PluginsFactory;
import com.netflix.turbine.streaming.RelevanceConfig;
import com.netflix.turbine.streaming.StreamingDataHandler;
import com.netflix.turbine.streaming.TurbineStreamingConnection;
import com.netflix.turbine.streaming.servlet.SynchronizedHttpServletResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TurbineStreamServlet
extends HttpServlet {
    private static final long serialVersionUID = 1L;
    private static final Logger logger = LoggerFactory.getLogger(TurbineStreamServlet.class);

    protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        this.doGet(request, response);
    }

    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
        int delay = -1;
        String delayParam = request.getParameter("delay");
        if (delayParam != null) {
            delay = Integer.parseInt(delayParam);
        }
        Collection<FilterCriteria> criteria = FilterCriteria.getFilterCriteria(request);
        logger.info("FilterCriteria: " + criteria);
        Set<String> statsTypesFilter = this.getFilteredStatsTypes(request);
        logger.info("StatsType filters: " + statsTypesFilter);
        try {
            String clusterName = request.getParameter("cluster");
            if (clusterName == null) {
                clusterName = "default";
            }
            response = new SynchronizedHttpServletResponse(response);
            response.setHeader("Content-Type", "text/event-stream;charset=UTF-8");
            response.setHeader("Cache-Control", "no-cache, no-store, max-age=0, must-revalidate");
            response.setHeader("Pragma", "no-cache");
            TurbineStreamServlet.streamFromCluster(response, clusterName, criteria, delay);
        }
        catch (Exception e) {
            logger.error("We failed to start the streaming connection", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void streamFromCluster(HttpServletResponse response, String clusterName, Collection<FilterCriteria> criteria, int delay) throws Exception {
        ClusterMonitorFactory<?> clusterMonFactory = PluginsFactory.getClusterMonitorFactory();
        if (clusterMonFactory == null) {
            throw new RuntimeException("Must configure plugin for ClusterMonitorFactory");
        }
        ClusterMonitor<?> clusterMonitor = clusterMonFactory.getClusterMonitor(clusterName);
        if (clusterMonitor == null) {
            response.sendError(404, "Cluster not found");
            return;
        }
        TurbineStreamingConnection connection = new TurbineStreamingConnection(new ServletStreamHandler(response), criteria, delay);
        try {
            clusterMonitor.registerListenertoClusterMonitor(connection);
            clusterMonitor.startMonitor();
            connection.waitOnConnection();
            logger.info("\n\n\n\nRETURNING FROM waitOnConnection: " + connection.getName());
        }
        catch (Exception e) {
            logger.info("Caught ex. Stopping StreamingConnection", (Throwable)e);
        }
        catch (Throwable t) {
            logger.info("Caught throwable. StreamingConnection", t);
        }
        finally {
            if (connection != null) {
                clusterMonitor.getDispatcher().deregisterEventHandler(connection);
            }
        }
    }

    private Set<String> getFilteredStatsTypes(HttpServletRequest request) {
        String typeString;
        String[] statsTypes;
        HashSet<String> typeFilters = new HashSet<String>();
        String statsTypeFilter = this.getServletConfig().getInitParameter("statsTypeFilter");
        if (statsTypeFilter != null && (statsTypes = statsTypeFilter.split(",")) != null && statsTypes.length > 0) {
            for (String statsType : statsTypes) {
                typeFilters.add(statsType);
            }
        }
        if (request.getParameter("type") != null && (typeString = request.getParameter("type").trim()).length() > 0) {
            String[] types;
            for (String t : types = typeString.split(",")) {
                String tValue = t.trim().toUpperCase();
                typeFilters.add(tValue);
            }
        }
        return typeFilters;
    }

    public static class FilterCriteria {
        public String name;
        public String type;
        public String prefix;
        public RelevanceConfig relevanceConfig;

        public static FilterCriteria parseCriteria(String s) {
            String[] parts;
            int topN = -1;
            HashMap<String, Double> sortCriteria = new HashMap<String, Double>();
            FilterCriteria filter = new FilterCriteria();
            for (String part : parts = s.split("\\|")) {
                String[] kvPair = part.split(":");
                if (kvPair.length != 2) {
                    throw new RuntimeException("Malformed filter criteria config, missing ':' in " + s);
                }
                String key = kvPair[0];
                String value = kvPair[1];
                if (key.equals("name")) {
                    filter.name = value;
                    continue;
                }
                if (key.equals("type")) {
                    filter.type = value;
                    continue;
                }
                if (key.equals("prefix")) {
                    filter.prefix = value;
                    continue;
                }
                if (key.equals("topN")) {
                    topN = Integer.parseInt(value);
                    continue;
                }
                sortCriteria.put(key, Double.parseDouble(value));
            }
            if (filter.type != null && topN > 0 && sortCriteria.size() > 0) {
                filter.relevanceConfig = new RelevanceConfig("name", filter.type, topN, sortCriteria);
            }
            return filter;
        }

        public static Collection<FilterCriteria> getFilterCriteria(HttpServletRequest request) {
            String[] parts;
            ArrayList<FilterCriteria> criteria = new ArrayList<FilterCriteria>();
            String filterCriteriaString = request.getParameter("filterCriteria");
            if (filterCriteriaString != null && (parts = filterCriteriaString.split(",")) != null && parts.length > 0) {
                for (String part : parts) {
                    criteria.add(FilterCriteria.parseCriteria(part));
                }
            }
            return criteria;
        }
    }

    private static class ServletStreamHandler
    implements StreamingDataHandler {
        private int responseFlushDelay = 100;
        private volatile long lastResponseFlush = -1L;
        private static final String DATA_PREFIX = "data: ";
        private static final String DOUBLE_NEWLINE = "\n\n";
        private static final String PING_STRING = ": ping\n";
        private final HttpServletResponse response;

        private ServletStreamHandler(HttpServletResponse resp) {
            this.response = resp;
        }

        @Override
        public void writeData(String data) throws Exception {
            long currentTime = System.currentTimeMillis();
            this.response.getWriter().print(DATA_PREFIX + data + DOUBLE_NEWLINE);
            if (this.lastResponseFlush == -1L || currentTime > this.lastResponseFlush + (long)this.responseFlushDelay) {
                this.response.flushBuffer();
                this.lastResponseFlush = currentTime;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void deleteData(String type, Set<String> names) throws Exception {
            String prefix = "data: {\"deleteData\":\"true\", \"type\":\"" + type + "\", \"name\":\"";
            StringBuilder sb = new StringBuilder();
            for (String s : names) {
                sb.append(prefix).append(s).append("\"}\n\n");
            }
            String deleteData = sb.toString();
            HttpServletResponse httpServletResponse = this.response;
            synchronized (httpServletResponse) {
                this.response.getWriter().print(deleteData);
                this.response.flushBuffer();
            }
        }

        @Override
        public void noData() throws Exception {
            this.response.getWriter().print(PING_STRING);
            this.response.flushBuffer();
        }
    }
}

