Java 线程池中worker(线程池是复用线程的)
Java 线程池中worker
在java线程中,真正执行计算操作的内容是在一个worker类中。
Worker的主要代码:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { /** * This class will never be serialized, but we provide a * serialVersionUID to suppress a javac warning. */ private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks; /** * Creates with given first task and thread from ThreadFactory. * @param firstTask the first task (null if none) */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } ......
总体来看,worker其实就是一个Runable,其也是需要构造成一个Thread对象,然后调用Thread start方法运行的。只不过在worker的run方法中是定一个了一个runWoker的方法。这个方法的主要内容从 for 循环的不停的从task队列中获取对应的runable的task,然后同步调用这个task的run()方法。其实就是在某个线程中,不停的拿队列中的任务进行执行。
运行worker
可以看到构造方法内,有一个Thread对象,其使用了ThreadFactory构造了一个新的线程,并且线程的runable是worker本身。
this.thread = getThreadFactory().newThread(this);
所以需要执行worker的时候,只需要调用worker中的thread的start方法即可,并且可以调用thread的方法来控制worker的状态:
interrupt等。
worker和ThreadPool
在ThreadPool中是有一个workder集合的。通过这个集合,我们可以知道有多少worker线程在进行工作等,每一个worker都是各自进行工作,工作的内容就是不停的获取task,然后执行task即可。
worker中的异常处理
try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); }
通过源代码可以看出,对应的Exception都是保存在thrownn中,在finally中交给了 afterExecute进行了处理。 所以可以自己实现对应的afterExecute来进行处理系统内部发生的异常问题。
上一篇:
通过多线程提高代码的执行效率例子
下一篇:
Springboot中数据库连接