`
coach
  • 浏览: 382474 次
  • 性别: Icon_minigender_2
  • 来自: 印度
社区版块
存档分类
最新评论

Java5中的线程池实例讲解

阅读更多
Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。

  简介

  本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下:

  1. 建立监听端口。

  2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。

  这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器模型将如下:

  1. 建立监听端口,创建线程池。

  2. 发现有新连接,使用线程池来执行服务任务。

  3. 服务完毕,释放线程到线程池。

  下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。

  初始化

  初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方法来建立线程池。

ExecutorService pool = Executors.newFixedThreadPool(10);

  表示新建了一个线程池,线程池里面有10个线程为任务队列服务。

  使用ServerSocket对象来初始化监听端口。

private static final int PORT = 19527;
serverListenSocket = new ServerSocket(PORT);
serverListenSocket.setReuseAddress(true);
serverListenSocket.setReuseAddress(true);

  服务新连接

  当有新连接建立时,accept返回时,将服务任务提交给线程池执行。

while(true){
 Socket socket = serverListenSocket.accept();
 pool.execute(new ServiceThread(socket));
}

  这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。

  服务任务

  服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码:

private static ReentrantLock lock = new ReentrantLock ();
private static int count = 0;
private int getCount(){
 int ret = 0;
 try{
  lock.lock();
  ret = count;
 }finally{
  lock.unlock();
 }
 return ret;
}
private void increaseCount(){
 try{
  lock.lock();
  ++count;
 }finally{
  lock.unlock();
 }
}

  服务线程在开始给客户端打印一个欢迎信息,

increaseCount();
int curCount = getCount();
helloString = "hello, id = " + curCount+"\r\n";
dos = new DataOutputStream(connectedSocket.getOutputStream());
dos.write(helloString.getBytes());

  然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,则等待到运行完毕。

ExecutorService executor = Executors.newSingleThreadExecutor();
Future future = executor.submit(new TimeConsumingTask());
dos.write("let's do soemthing other".getBytes());
String result = future.get();
dos.write(result.getBytes());

  其中TimeConsumingTask实现了Callable接口

class TimeConsumingTask implements Callable {
 public String call() throws Exception {
  System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
  return "ok, here's the result: It takes me lots of time to produce this result";
 }
}

  这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们提交了一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let's do soemthing other".getBytes());当程序执行到String result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。

服务器端的完整实现

  服务器端的完整实现代码如下:
package demo;

import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

public class Server
{
    private static int produceTaskSleepTime = 100;
    private static int consumeTaskSleepTime = 1200;
    private static int produceTaskMaxNumber = 100;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAX_POOL_SIZE = 100;
    private static final int KEEPALIVE_TIME = 3;
    private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 19527;
    private BlockingQueue workQueue = new ArrayBlockingQueue(QUEUE_CAPACITY);
    // private ThreadPoolExecutor serverThreadPool = null;
    private ExecutorService pool = null;
    private RejectedExecutionHandler rejectedExecutionHandler = new ThreadPoolExecutor.DiscardOldestPolicy();
    private ServerSocket serverListenSocket = null;
    private int times = 5;

    public void start()
    {
        // You can also init thread pool in this way.
        /*
         * serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue, rejectedExecutionHandler);
         */
        pool = Executors.newFixedThreadPool(10);
        try
        {
            serverListenSocket = new ServerSocket(PORT);
            serverListenSocket.setReuseAddress(true);

            System.out.println("I'm listening");
            while (times-- > 0)
            {
                Socket socket = serverListenSocket.accept();
                String welcomeString = "hello";
                // serverThreadPool.execute(new ServiceThread(socket, welcomeString));
                pool.execute(new ServiceThread(socket));
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        cleanup();
    }

    public void cleanup()
    {
        if (null != serverListenSocket)
        {
            try
            {
                serverListenSocket.close();
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // serverThreadPool.shutdown();
        pool.shutdown();
       //调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。interrupt():只有阻塞(sleep,wait,join的线程调用他们的interrupt()才起作用,正在运行的线程不起作用也不抛异常)
    }

    public static void main(String args[])
    {
        Server server = new Server();
        server.start();
    }
}

class ServiceThread implements Runnable, Serializable
{
    private static final long serialVersionUID = 0;
    private Socket connectedSocket = null;
    private String helloString = null;
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();

    ServiceThread(Socket socket)
    {
        connectedSocket = socket;
    }

    public void run()
    {
        increaseCount();
        int curCount = getCount();
        helloString = "hello, id = " + curCount + "\r\n";

        ExecutorService executor = Executors.newSingleThreadExecutor();
        Future<String> future = executor.submit(new TimeConsumingTask());

        DataOutputStream dos = null;
        try
        {
            dos = new DataOutputStream(connectedSocket.getOutputStream());
            dos.write(helloString.getBytes());
            try
            {
                dos.write("let's do soemthing other.\r\n".getBytes());
                String result = future.get();
                dos.write(result.getBytes());
            }
            catch (InterruptedException e)
            {
                e.printStackTrace();
            }
            catch (ExecutionException e)
            {
                e.printStackTrace();
            }
        }
        catch (IOException e)
        {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finally
        {
            if (null != connectedSocket)
            {
                try
                {
                    connectedSocket.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (null != dos)
            {
                try
                {
                    dos.close();
                }
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            executor.shutdown();
        }
    }

    private int getCount()
    {
        int ret = 0;
        try
        {
            lock.lock();
            ret = count;
        }
        finally
        {
            lock.unlock();
        }
        return ret;
    }

    private void increaseCount()
    {
        try
        {
            lock.lock();
            ++count;
        }
        finally
        {
            lock.unlock();
        }
    }
}

class TimeConsumingTask implements Callable<String>
{
    public String call() throws Exception
    {
        System.out.println("It's a time-consuming task, you'd better retrieve your result in the furture");
        return "ok, here's the result: It takes me lots of time to produce this result";
    }

}
分享到:
评论
2 楼 memoryisking 2014-11-18  
可以看看这篇文章,这个是struts教程网上一个简单的例子,构建一个简单的线程池:http://www.strutshome.com/index.php/archives/710
1 楼 jjruanlili 2014-08-26  
要搞个executor和nio的结合,差不多

相关推荐

    java线程池实例详细讲解

    线程池详细讲解的很好实例!

    Java线程池文档

    Reference: 《创建Java线程池》[1],《Java线程:新特征-线程池》[2], 《Java线程池学习》[3],《线程池ThreadPoolExecutor使用简介》[4],《Java5中的线程池实例讲解》[5],《ThreadPoolExecutor使用和思考》[6] ...

    ActiveMQ与Spring线程池整合实例

    对于实例的讲解,在竹子的论坛有我对这个实例的帖子(http://www.java2000.net/viewthread.jsp?tid=1167) lib中包含: apache-activemq-4.1.1.jar backport-util-concurrent-2.1.jar commons-lang-2.0.jar commons...

    Java Socket编程实例(三)- TCP服务端线程池

    主要讲解Java Socket编程中TCP服务端线程池的实例,希望能给大家做一个参考。

    java+socket 及多线程线程池应用(IBM教程)

    网页式教程,学习方便,大量经典实例讲解。IBM官方教程,可遇不可求哦。

    《Java并发编程的艺术》

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    Java并发编程的艺术_非扫描

    第5章介绍Java并发包中与锁相关的API和组件,以及这些API和组件的使用方式与实现细节。第6章介绍了Java中的大部分并发容器,并深入剖析其实现原理,让读者领略大师的设计技巧。第7章介绍了Java中的原子操作类,并给...

    Java并发编程的艺术

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    Java SE实践教程 pdf格式电子书 下载(四) 更新

    内容简介:此书结合具体实例讲解,通俗易懂,又不乏深度。我觉得这本书写的确实不错,堪称经典,市面上这样的书实在太少了,所以在这里发布下,供大家共享。本书从编程技术、项目实践以及软件工程的角度出发,如果...

    Java SE实践教程 源代码 下载

    内容简介:此书结合具体实例讲解,通俗易懂,又不乏深度。我觉得这本书写的确实不错,堪称经典,市面上这样的书实在太少了,所以在这里发布下,供大家共享。本书从编程技术、项目实践以及软件工程的角度出发,如果...

    ArtConcurrentBook.rar

    《Java并发编程的艺术》内容涵盖Java并发编程机制的底层实现原理、Java内存模型、Java并发编程基础、Java中的锁、并发容器和框架、原子类、并发工具类、线程池、Executor框架等主题,每个主题都做了深入的讲解,同时...

    Java SE实践教程 pdf格式电子书 下载(一) 更新

    内容简介:此书结合具体实例讲解,通俗易懂,又不乏深度。我觉得这本书写的确实不错,堪称经典,市面上这样的书实在太少了,所以在这里发布下,供大家共享。本书从编程技术、项目实践以及软件工程的角度出发,如果...

    精通Spring(书签版)

     看清Java万花的本质,从复杂的表象中寻找普遍的规律,深刻理解Java的核心思想。  关于Spring2.5的权威教程,是Java/Java EE开发者、架构师必备的参考书  对JavaEE5及Spring2.5进行了综述。包括Java EE5,步入...

    精通Spring(书签)

     看清Java万花的本质,从复杂的表象中寻找普遍的规律,深刻理解Java的核心思想。  关于Spring2.5的权威教程,是Java/Java EE开发者、架构师必备的参考书  对JavaEE5及Spring2.5进行了综述。包括Java EE5,步入...

    精通Spring (书签版)

     看清Java万花的本质,从复杂的表象中寻找普遍的规律,深刻理解Java的核心思想。  关于Spring2.5的权威教程,是Java/Java EE开发者、架构师必备的参考书  对JavaEE5及Spring2.5进行了综述。包括Java EE5,步入...

    精通spring--源代码

    精通spring 源代码 对JavaEE5及Spring2.5进行了综述。包括Java EE5,步入Spring2.5,获得... 全书理论与实践并重,通过大量的实例帮助读者尽快掌握Spring2,5的各种基本和高级使用技巧,从而提高本书的参考和阅读价值

    通俗易懂的Netty从入门到源码剖析教程

    本套课程详细讲解了Netty核心技术点,同时进行底层机制和源码剖析,并编写了大量的应用实例。通过学习可以快速掌握Netty的底层实现机制,熟练运用Netty解决网络高并发问题。Netty涉及内容很多(比如:设计模式、数据...

Global site tag (gtag.js) - Google Analytics