没写过 Java,心里不顶真;我看了网上的资料应该 ok,试了下也没发现什么问题;要并发的去发送一些请求,用到了连接池,我这样应该是线程安全的吧?
public class MyRequest {
private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MyRequest.class);
private final String REQUEST_URL;
private ExecutorService executorService;
private Queue<Map<String, String>> tasks;
private static PoolingHttpClientConnectionManager cm; //<-----
private static CloseableHttpClient httpClient;
private MyRequest(int taskQueueSize, int executorCount, String requestURL, Map<String, Object> connConfig) {
this.tasks = new ArrayBlockingQueue<>(taskQueueSize);
this.executorService = Executors.newFixedThreadPool(executorCount);
REQUEST_URL = requestURL;
String proxyHost = connConfig.get("proxyHost").toString();
int proxyPort = Integer.parseInt(connConfig.get("proxyPort").toString());
cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(Integer.parseInt(connConfig.get("maxTotal").toString()));
cm.setDefaultMaxPerRoute(Integer.parseInt(connConfig.get("defaultMaxPerRoute").toString()));
RequestConfig config = RequestConfig.custom()
.setConnectTimeout(Integer.parseInt(connConfig.get("connectTimeout").toString()))
.setSocketTimeout(Integer.parseInt(connConfig.get("socketTimeout").toString()))
.setConnectionRequestTimeout(Integer.parseInt(connConfig.get("cxxRxxTxout").toString()))
.build();
HttpClientBuilder httpClientBuilder = HttpClients.custom()
.setConnectionManager(cm)
.setDefaultRequestConfig(config);
if (!proxyHost.equals("") && 0 != proxyPort) {
httpClient = httpClientBuilder.setProxy(new HttpHost(proxyHost, proxyPort)).build();
} else {
httpClient = httpClientBuilder.build();
}
}
private void addTask(Map<String, String> parameters) {
tasks.offer(parameters);
}
private void flush() {
List<Future> futures = this.tasks.stream()
.map(this::delegate)
.collect(Collectors.toList());
futures.forEach((f) -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
this.tasks.clear();
}
private Future delegate(Map<String, String> parameters) {
return this.executorService.submit(() -> {
doRequest(parameters, REQUEST_URL);
});
}
private void doRequest(Map<String, String> parameters, String url) {
CloseableHttpResponse resp = null;
HttpGet get = null;
try {
URIBuilder builder = new URIBuilder(url);
builder.addParameter("foo", parameters.get("bar"));
get = new HttpGet(builder.build()); //<-----
resp = httpClient.execute(get); //<-----
if (resp.getStatusLine().getStatusCode() != 200) {
LOGGER.warn("xxx");
} else {
LOGGER.info("xxx");
}
resp.close();
} catch (URISyntaxException e) {
LOGGER.warn("xxx" + e.getMessage());
} catch (ClientProtocolException e) {
LOGGER.warn("xxx" + e.getMessage());
} catch (IOException e) {
LOGGER.warn("xxx" + e.getMessage());
} finally {
if (resp != null) {
try {
resp.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void shutdown() {
this.executorService.shutdown();
}
public static void main(String args[]) throws IOException {
String logPath = System.getProperty("readLogPath");
String requestURL = System.getProperty("requestURL");
int taskQueueSize = Integer.valueOf(System.getProperty("max.requests", "2000"));
int executorCount = Integer.valueOf(System.getProperty("num.executors", "100"));
int interval = Integer.valueOf(System.getProperty("max.interval", "500"));
String proxyHost = System.getProperty("proxyHost", "");
int proxyPort = Integer.parseInt(System.getProperty("proxyInt", "0"));
int maxTotal = Integer.parseInt(System.getProperty("maxTotal", "5000"));
int defaultMaxPerRoute = Integer.parseInt(System.getProperty("defaultMaxPerRoute", "1000"));
int connectTimeout = Integer.parseInt(System.getProperty("connectTimeout", "1000"));
int socketTimeout = Integer.parseInt(System.getProperty("socketTimeout", "3000"));
int connectionRequestTimeout = Integer.parseInt(System.getProperty("connectionRequestTimeout", "3000"));
Map<String, Object> connConfig = new HashMap<>();
connConfig.put("proxyHost", proxyHost);
connConfig.put("proxyPort", proxyPort);
connConfig.put("maxTotal", maxTotal);
connConfig.put("defaultMaxPerRoute", defaultMaxPerRoute);
connConfig.put("connectTimeout", connectTimeout);
connConfig.put("socketTimeout", socketTimeout);
connConfig.put("connectionRequestTimeout", connectionRequestTimeout);
try {
MyRequest syncLog = new MyRequest(taskQueueSize, executorCount, requestURL, connConfig);
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path path = new Path(logPath);
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(path)));
try {
String line;
int size = 0;
long startTs = System.currentTimeMillis();
line = br.readLine();
while (line != null) {
JsonElement root = new JsonParser().parse(line);
Map<String, String> parameters = new HashMap<>();
JsonObject rootJson = root.getAsJsonObject();
for (Map.Entry entry : rootJson.entrySet()) {
parameters.put(entry.getKey().toString(),
rootJson.get(entry.getKey().toString()).getAsString());
}
syncLog.addTask(parameters);
++size;
if (size >= taskQueueSize || (System.currentTimeMillis() - startTs) > interval) {
syncLog.flush();
size = 0;
startTs = System.currentTimeMillis();
}
line = br.readLine();
}
if (0 != size) {
syncLog.flush();
}
} catch (IOException e) {
LOGGER.error("xxx" + e.getMessage());
} finally {
br.close();
}
syncLog.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}