博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
并发编程6-执行器
阅读量:6582 次
发布时间:2019-06-24

本文共 5336 字,大约阅读时间需要 17 分钟。

如果想开发服务器应用,应该有大的吞吐量和快速的响应。  这样就要求服务器段有清晰的任务边界和任务执行策略。

现在看一个服务器应用:

public static void main(String[] args) throws IOException {        ServerSocket ss = new ServerSocket(8080);        while (true){            Socket s = ss.accept();            handleRequest(s);        }    }
顺序执行的,造成资源利用率低,  吞吐量或响应速度都很低。

来一个多线程的:

public static void main(String[] args) throws IOException {        ServerSocket ss = new ServerSocket(8080);        while (true){            final Socket s = ss.accept();            new Thread(){                @Override                public void run() {                    handleRequest(s);                }            }.start();        }    }
会有吞吐量,响应速度上的好处。

但是因为程序并发,可能要注意资源冲突。

另外其还会造成线程无线增长带来的栈溢出,内存溢出, 创建线程的开销过大等问题。

更好的办法是,能够平缓的进行优化,就是能够更好的调度任务,控制线程的个数等等。这就需要用到Executor框架

先来个固定线程数的:

public static void main(String[] args) throws IOException {        final Executor exec = Executors.newFixedThreadPool(3);        ServerSocket ss = new ServerSocket(8080);        while (true){            final Socket s = ss.accept();            Runnable task = new Runnable(){                @Override                public void run() {                    handleRequest(s);                }            };            exec.execute(task);        }    }
Exccutors.可以创建多个策略的执行器。 newFixedThreadPool是可以运行指定书目的执行器。

这个是一个生产者和消费者模式,  exec.execute是生产,  FixedThreadPool表示有空闲的线程就能消费这个task.

任务策略

Executor是用一个策略来执行任务,那么策略应该决定线程的那些行为呢:

  • 任务在什么线程中执行
  • 任务以什么顺序执行(FIFO, 优先级?)
  • 可以有多少个任务并发执行
  • 可以有多少个任务进入等待队列
  • 如果要舍弃一个任务,如何选择,并且如何告诉应用程序
  • 在执行任务前后应该做什么
我们需要使用和调整各种策略,来达到最优的效果
这些策略基本上都是基于线程池的。
线程池与任务队列紧密相连。
当线程池中有空闲线程时会从队列中拿一个任务进行执行。

生命周期

ExecutorService。 这个接口包含了很多生命周期的方法:

shutdown()  停止接收新任务,  等待已提交任务的完成,  再次想加入新任务会抛出RejectedExecutionException

shutdownNom  停止为执行的任务,并且尝试关闭正在运行的任务

awaitTermination等待到达终止状态,跟轮询判断isTrminated的效果一样。通常shutdown会紧随其后,进入种植状态后就关闭Executor

isShutdown 判断是否关闭

isTermination 判断是否进入终止状态:

例如可关闭的Server:

public class WebServer {    private static final ExecutorService exec = Executors.newFixedThreadPool(1);    public static void main(String[] args) throws IOException {        ServerSocket ss = new ServerSocket(8080);        new Thread(){            @Override            public void run() {                while(!exec.isTerminated()){                }                System.exit(0);            }        }.start();        while (true){            final Socket s = ss.accept();            Runnable task = new Runnable(){                @Override                public void run() {                    handleRequest(s);                }            };            exec.execute(task);        }    }    private static void handleRequest(Socket s) {        BufferedReader is = null;        try {            is = new BufferedReader(new InputStreamReader(s.getInputStream()));            String line = is.readLine();            if("exit".equals(line)){                System.out.println("进来了");                exec.shutdown();            }            System.out.println(line);        } catch (IOException e) {            e.printStackTrace();        } finally {            try {                is.close();                s.close();            } catch (IOException e) {                e.printStackTrace();            }        }    }}

定时执行

Timer 和TimerTask
使用简单,但是是点线程调度,并且任务如果抛出异常,会影响其他任务
ScheduleThreadPoolExecutor 1.5之后更好的调度执行器

可携带结果的任务Callable 和 Future

public class TestCallable {    public static void main(String[] args) {        List
> futureList = new ArrayList
>(); ExecutorService executor = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; i++) { futureList.add(executor.submit(new AddCallable(i, i + 1))); } for (Future
future : futureList) { try { System.out.println(future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }}class AddCallable implements Callable
{ private int param1; private int param2; public AddCallable(int param1, int param2) { this.param1 = param1; this.param2 = param2; } @Override public Integer call() throws Exception { return param1 + param2; }}
ExecutorService submit Callable进去。 返回Future, get()会阻塞到执行结果或者抛出异常, 如果抛出ExecutionException则可以使用getCause得到异常链。
如果第一个任务比第二个任务执行的时间长,则可能造成不能及时获取到返回结果。可以使用get(0)不停地试探,但是可以使用更好的方式使用自带阻塞队列的ExecutorCompletionService中

public static void main(String[] args) throws InterruptedException, ExecutionException {        ExecutorService executor = Executors.newFixedThreadPool(3);        CompletionService
serv = new ExecutorCompletionService
(executor); serv.submit(new Callable
() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(3); return "第一个任务执行完"; } }); serv.submit(new Callable
() { @Override public String call() throws Exception { TimeUnit.SECONDS.sleep(1); return "第二个任务执行完"; } }); for (int i = 0; i < 2; i++) { System.out.println(serv.take().get()); } executor.shutdown(); }

转载地址:http://jisno.baihongyu.com/

你可能感兴趣的文章
Swift 实现图片转字符画的功能
查看>>
PHP开发之Zend Studio快捷键汇总
查看>>
李洪强iOS开发之OC[002] - OC中注释以及@符号的使用
查看>>
【转载】COM编程入门不得不看的文章 :第一部分 什么是COM,如何使用COM
查看>>
bug-android之ActivityNotFoundException
查看>>
IT战略规划咨询
查看>>
JavaScript 最佳实践
查看>>
NOIP2013花匠
查看>>
告别脚本小子【编写端口扫描工具】
查看>>
监控录像
查看>>
HtmlHelper使用大全
查看>>
SQLServer 之 聚合函数
查看>>
正则输入非0的整数
查看>>
TortoiseGit与GitHub项目关联设置
查看>>
java模式:深入单例模式
查看>>
Struts2的模板和主题theme及自定义theme的使用
查看>>
ImageView显示图像控件
查看>>
Deepin-文件目录介绍
查看>>
MySQL数据库如何去掉数据库中重复记录
查看>>
【原创】如何写一篇“用户友好”的随笔
查看>>