V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
hheedat
V2EX  ›  Java

有人能帮我看下我这么用 PoolingHttpClientConnectionManager 是线程安全的吗?

  •  
  •   hheedat · 2017-09-10 23:11:05 +08:00 · 4008 次点击
    这是一个创建于 2692 天前的主题,其中的信息可能已经有所发展或是发生改变。

    没写过 Java,心里不顶真;我看了网上的资料应该 ok,试了下也没发现什么问题;要并发的去发送一些请求,用到了连接池,我这样应该是线程安全的吧?

    public class MyRequest {
    
        private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MyRequest.class);
        private final String REQUEST_URL;
    
        private ExecutorService executorService;
        private Queue<Map<String, String>> tasks;
    
        private static PoolingHttpClientConnectionManager cm; //<-----
        private static CloseableHttpClient httpClient;
    
        private MyRequest(int taskQueueSize, int executorCount, String requestURL, Map<String, Object> connConfig) {
            this.tasks = new ArrayBlockingQueue<>(taskQueueSize);
            this.executorService = Executors.newFixedThreadPool(executorCount);
            REQUEST_URL = requestURL;
    
            String proxyHost = connConfig.get("proxyHost").toString();
            int proxyPort = Integer.parseInt(connConfig.get("proxyPort").toString());
    
            cm = new PoolingHttpClientConnectionManager();
            cm.setMaxTotal(Integer.parseInt(connConfig.get("maxTotal").toString()));
            cm.setDefaultMaxPerRoute(Integer.parseInt(connConfig.get("defaultMaxPerRoute").toString()));
    
            RequestConfig config = RequestConfig.custom()
                    .setConnectTimeout(Integer.parseInt(connConfig.get("connectTimeout").toString()))
                    .setSocketTimeout(Integer.parseInt(connConfig.get("socketTimeout").toString()))
                    .setConnectionRequestTimeout(Integer.parseInt(connConfig.get("cxxRxxTxout").toString()))
                    .build();
    
            HttpClientBuilder httpClientBuilder = HttpClients.custom()
                    .setConnectionManager(cm)
                    .setDefaultRequestConfig(config);
    
            if (!proxyHost.equals("") && 0 != proxyPort) {
                httpClient = httpClientBuilder.setProxy(new HttpHost(proxyHost, proxyPort)).build();
            } else {
                httpClient = httpClientBuilder.build();
            }
        }
    
        private void addTask(Map<String, String> parameters) {
            tasks.offer(parameters);
        }
    
        private void flush() {
            List<Future> futures = this.tasks.stream()
                    .map(this::delegate)
                    .collect(Collectors.toList());
    
            futures.forEach((f) -> {
                try {
                    f.get();
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
    
            this.tasks.clear();
        }
    
        private Future delegate(Map<String, String> parameters) {
            return this.executorService.submit(() -> {
                doRequest(parameters, REQUEST_URL);
            });
        }
    
        private void doRequest(Map<String, String> parameters, String url) {
            CloseableHttpResponse resp = null; 
            HttpGet get = null; 
            try {
                URIBuilder builder = new URIBuilder(url);
                builder.addParameter("foo", parameters.get("bar"));
    
                get = new HttpGet(builder.build()); //<-----
                resp = httpClient.execute(get); //<-----
    
                if (resp.getStatusLine().getStatusCode() != 200) {
                    LOGGER.warn("xxx");
                } else {
                    LOGGER.info("xxx");
                }
    
                resp.close();
    
            } catch (URISyntaxException e) {
                LOGGER.warn("xxx" + e.getMessage());
            } catch (ClientProtocolException e) {
                LOGGER.warn("xxx" + e.getMessage());
            } catch (IOException e) {
                LOGGER.warn("xxx" + e.getMessage());
            } finally {
                if (resp != null) {
                    try {
                        resp.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private void shutdown() {
            this.executorService.shutdown();
        }
    
        public static void main(String args[]) throws IOException {
    
            String logPath = System.getProperty("readLogPath");
            String requestURL = System.getProperty("requestURL");
            int taskQueueSize = Integer.valueOf(System.getProperty("max.requests", "2000"));
            int executorCount = Integer.valueOf(System.getProperty("num.executors", "100"));
            int interval = Integer.valueOf(System.getProperty("max.interval", "500"));
            String proxyHost = System.getProperty("proxyHost", "");
            int proxyPort = Integer.parseInt(System.getProperty("proxyInt", "0"));
    
            int maxTotal = Integer.parseInt(System.getProperty("maxTotal", "5000"));
            int defaultMaxPerRoute = Integer.parseInt(System.getProperty("defaultMaxPerRoute", "1000"));
            int connectTimeout = Integer.parseInt(System.getProperty("connectTimeout", "1000"));
            int socketTimeout = Integer.parseInt(System.getProperty("socketTimeout", "3000"));
            int connectionRequestTimeout = Integer.parseInt(System.getProperty("connectionRequestTimeout", "3000"));
    
            Map<String, Object> connConfig = new HashMap<>();
            connConfig.put("proxyHost", proxyHost);
            connConfig.put("proxyPort", proxyPort);
            connConfig.put("maxTotal", maxTotal);
            connConfig.put("defaultMaxPerRoute", defaultMaxPerRoute);
            connConfig.put("connectTimeout", connectTimeout);
            connConfig.put("socketTimeout", socketTimeout);
            connConfig.put("connectionRequestTimeout", connectionRequestTimeout);
    
            try {
    
                MyRequest syncLog = new MyRequest(taskQueueSize, executorCount, requestURL, connConfig);
    
                Configuration conf = new Configuration();
                FileSystem fs = FileSystem.get(conf);
                Path path = new Path(logPath);
                BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
    
                try {
    
                    String line;
                    int size = 0;
                    long startTs = System.currentTimeMillis();
                    line = br.readLine();
    
                    while (line != null) {
                        JsonElement root = new JsonParser().parse(line);
                        Map<String, String> parameters = new HashMap<>();
                        JsonObject rootJson = root.getAsJsonObject();
    
                        for (Map.Entry entry : rootJson.entrySet()) {
                            parameters.put(entry.getKey().toString(),
                                    rootJson.get(entry.getKey().toString()).getAsString());
                        }
    
                        syncLog.addTask(parameters);
                        ++size;
    
                        if (size >= taskQueueSize || (System.currentTimeMillis() - startTs) > interval) {
                            syncLog.flush();
                            size = 0;
                            startTs = System.currentTimeMillis();
                        }
    
                        line = br.readLine();
                    }
    
                    if (0 != size) {
                        syncLog.flush();
                    }
    
                } catch (IOException e) {
    
                    LOGGER.error("xxx" + e.getMessage());
    
                } finally {
    
                    br.close();
    
                }
    
                syncLog.shutdown();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }
    
    2 条回复    2017-09-11 07:56:27 +08:00
    hand515
        1
    hand515  
       2017-09-11 07:49:47 +08:00
    doRequest 的 resp.close()重复了吧,既然你都在 finally 里 close 了
    hand515
        2
    hand515  
       2017-09-11 07:56:27 +08:00   ❤️ 1
    还有就是 ArrayBlockingQueue 的用法有问题,你这用法还不如直接用 List 批量提交。
    如果想用到 Queue,那直接开 N 个线程作为消费者线程。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2389 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 13:48 · PVG 21:48 · LAX 05:48 · JFK 08:48
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.