本文共 5336 字,大约阅读时间需要 17 分钟。
如果想开发服务器应用,应该有大的吞吐量和快速的响应。 这样就要求服务器段有清晰的任务边界和任务执行策略。
现在看一个服务器应用:
public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(8080); while (true){ Socket s = ss.accept(); handleRequest(s); } }顺序执行的,造成资源利用率低, 吞吐量或响应速度都很低。
来一个多线程的:
public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(8080); while (true){ final Socket s = ss.accept(); new Thread(){ @Override public void run() { handleRequest(s); } }.start(); } }会有吞吐量,响应速度上的好处。
但是因为程序并发,可能要注意资源冲突。
另外其还会造成线程无线增长带来的栈溢出,内存溢出, 创建线程的开销过大等问题。
更好的办法是,能够平缓的进行优化,就是能够更好的调度任务,控制线程的个数等等。这就需要用到Executor框架
先来个固定线程数的:
public static void main(String[] args) throws IOException { final Executor exec = Executors.newFixedThreadPool(3); ServerSocket ss = new ServerSocket(8080); while (true){ final Socket s = ss.accept(); Runnable task = new Runnable(){ @Override public void run() { handleRequest(s); } }; exec.execute(task); } }Exccutors.可以创建多个策略的执行器。 newFixedThreadPool是可以运行指定书目的执行器。
这个是一个生产者和消费者模式, exec.execute是生产, FixedThreadPool表示有空闲的线程就能消费这个task.
Executor是用一个策略来执行任务,那么策略应该决定线程的那些行为呢:
ExecutorService。 这个接口包含了很多生命周期的方法:
shutdown() 停止接收新任务, 等待已提交任务的完成, 再次想加入新任务会抛出RejectedExecutionException
shutdownNom 停止为执行的任务,并且尝试关闭正在运行的任务
awaitTermination等待到达终止状态,跟轮询判断isTrminated的效果一样。通常shutdown会紧随其后,进入种植状态后就关闭Executor
isShutdown 判断是否关闭
isTermination 判断是否进入终止状态:
例如可关闭的Server:
public class WebServer { private static final ExecutorService exec = Executors.newFixedThreadPool(1); public static void main(String[] args) throws IOException { ServerSocket ss = new ServerSocket(8080); new Thread(){ @Override public void run() { while(!exec.isTerminated()){ } System.exit(0); } }.start(); while (true){ final Socket s = ss.accept(); Runnable task = new Runnable(){ @Override public void run() { handleRequest(s); } }; exec.execute(task); } } private static void handleRequest(Socket s) { BufferedReader is = null; try { is = new BufferedReader(new InputStreamReader(s.getInputStream())); String line = is.readLine(); if("exit".equals(line)){ System.out.println("进来了"); exec.shutdown(); } System.out.println(line); } catch (IOException e) { e.printStackTrace(); } finally { try { is.close(); s.close(); } catch (IOException e) { e.printStackTrace(); } } }}
public class TestCallable { public static void main(String[] args) { List> futureList = new ArrayList >(); ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { futureList.add(executor.submit(new AddCallable(i, i + 1))); } for (Future future : futureList) { try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }}class AddCallable implements Callable { private int param1; private int param2; public AddCallable(int param1, int param2) { this.param1 = param1; this.param2 = param2; } @Override public Integer call() throws Exception { return param1 + param2; }}
public static void main(String[] args) throws InterruptedException, ExecutionException { ExecutorService executor = Executors.newFixedThreadPool(3); CompletionServiceserv = new ExecutorCompletionService (executor); serv.submit(new Callable () { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(3); return "第一个任务执行完"; } }); serv.submit(new Callable () { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(1); return "第二个任务执行完"; } }); for (int i = 0; i < 2; i++) { System.out.println(serv.take().get()); } executor.shutdown(); }
转载地址:http://jisno.baihongyu.com/