一、多线程

1、什么是多线程

1.1 多线程的概念

多线程是指在一个程序中同时执行多个线程,每个线程都可以独立执行,各自完成自己的任务。
多线程的实现可以提高程序的性能和响应速度,尤其是在需要同时执行多个耗时的任务时。在多线程中,不同的线程可以访问相同的内存空间,从而可以共享数据和资源,但也需要注意线程安全的问题。
多线程编程需要考虑线程的创建、启动、停止、同步等问题,同时也需要避免线程死锁、饥饿等问题

1.2 为什么需要使用多线程

使用多线程可以提高程序的性能和响应速度,尤其是在需要同时执行多个耗时的任务时。
多线程可以让程序在等待IO操作完成时,继续执行其他的计算任务,从而提高了程序的效率。
此外,多线程还可以实现并发编程,从而使得程序更具有可扩展性和灵活性,同时能够更好地利用多核CPU的计算资源。

1.3 多线程与单线程的比较

多线程和单线程的主要区别在于是否允许同时执行多个任务。
单线程程序只有一个执行流程,每个任务必须按照先后顺序逐一执行,无法利用计算机的多核处理器资源,也会导致程序的响应速度较慢。
而多线程程序可以同时执行多个任务,从而能够更好地利用计算机的处理能力,提高程序的性能和响应速度。

public class SingleThreadVsMultiThread {    public static void main(String[] args) {        // 单线程程序        long start = System.currentTimeMillis(); // 记录程序开始时间        for (int i = 0; i < 10; i++) { // 模拟耗时操作            doSomething();        }        long end = System.currentTimeMillis(); // 记录程序结束时间        System.out.println("Single thread takes " + (end - start) + " ms."); // 输出程序执行时间        // 多线程程序        start = System.currentTimeMillis(); // 记录程序开始时间        Thread[] threads = new Thread[10]; // 创建10个线程        for (int i = 0; i < 10; i++) {            threads[i] = new Thread(() -> {                doSomething();            });            threads[i].start(); // 启动线程        }        for (int i = 0; i < 10; i++) {            try {                threads[i].join(); // 等待线程执行完成            } catch (InterruptedException e) {                e.printStackTrace();            }        }        end = System.currentTimeMillis(); // 记录程序结束时间        System.out.println("Multi thread takes " + (end - start) + " ms."); // 输出程序执行时间    }    private static void doSomething() {        try {            Thread.sleep(1000); // 模拟耗时操作        } catch (InterruptedException e) {            e.printStackTrace();        }    }}

2、多线程的实现方式

2.1 创建线程

import java.util.concurrent.Callable;import java.util.concurrent.ExecutionException;import java.util.concurrent.FutureTask;// 继承 Thread 类创建多线程public class MyThread extends Thread {    public void run() { // 重写 run 方法        for (int i = 0; i < 5; i++) {            System.out.println("MyThread is running.");            try {                Thread.sleep(1000); // 线程休眠1秒钟            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}// 实现 Runnable 接口创建多线程public class MyRunnable implements Runnable {    public void run() { // 实现 run 方法        for (int i = 0; i < 5; i++) {            System.out.println("MyRunnable is running.");            try {                Thread.sleep(1000); // 线程休眠1秒钟            } catch (InterruptedException e) {                e.printStackTrace();            }        }    }}// 实现 Callable 接口创建多线程public class MyCallable implements Callable<Integer> {    public Integer call() throws Exception { // 实现 call 方法        int sum = 0;        for (int i = 1; i <= 100; i++) {            sum += i;        }        return sum;    }}// 创建多线程并启动public class Main {    public static void main(String[] args) {        // 使用 Thread 类创建多线程        MyThread thread1 = new MyThread();        thread1.start(); // 启动线程        // 使用 Runnable 接口创建多线程        MyRunnable runnable = new MyRunnable();        Thread thread2 = new Thread(runnable);        thread2.start(); // 启动线程        // 使用 Callable 接口创建多线程        MyCallable callable = new MyCallable();        FutureTask<Integer> futureTask = new FutureTask<>(callable);        Thread thread3 = new Thread(futureTask);        thread3.start(); // 启动线程        try {            int result = futureTask.get(); // 获取线程的返回结果            System.out.println("The result is: " + result);        } catch (InterruptedException | ExecutionException e) {            e.printStackTrace();        }    }}

2.2 启动线程

public class MyThread extends Thread {    @Override    public void run() {        System.out.println("Thread started.");    }}public class Main {    public static void main(String[] args) {        MyThread thread = new MyThread();        thread.start(); // 启动线程    }}public class Main {    public static void main(String[] args) {        Thread thread = new Thread(() -> {            System.out.println("Thread started.");        });        thread.start(); // 启动线程    }}

2.3 线程的状态

线程状态解释
新建(New)当线程对象被创建时,它处于新建状态,还未调用start()方法
运行(Runnable)线程进入运行状态,一旦调用了 start() 方法后,线程即进入了就绪状态,等待 CPU 分配时间片进入运行状态。
阻塞(Blocked)线程因为某些原因被暂停执行,进入阻塞状态,比如等待 I/O 操作、等待获取锁、调用 Thread.sleep() 方法,等待监视器锁等。
等待(Waiting)线程进入等待状态,等待其他线程的通知或中断,等待状态可以由 Object.wait()、Thread.join()、LockSupport.park() 方法进入。
计时等待(Timed Waiting)类似于等待状态,但是在一定时间后会自动解除阻塞,等待状态可以由 Thread.sleep()、Object.wait(long)、Thread.join(long)、LockSupport.parkNanos()、LockSupport.parkUntil() 方法进入。
终止(Terminated)线程执行完毕或者出现异常时,进入终止状态。

3、异步编程

3.1 异步编程的概念

在 Java 中,异步编程指的是以非阻塞的方式处理任务,从而提高程序的响应速度和资源利用率。Java 通过
Future、CompletableFuture、Callback 等机制实现异步编程,

例如 Future 可以通过 get() 方法阻塞等待任务执行结果,也可以通过 isDone()
判断任务是否完成;CompletableFuture 则支持链式调用、组合等操作,通过 thenApply()、thenAccept()
等方法实现任务之间的协作。

Java 8 引入的 Stream API 也可以用于处理集合或数组等数据结构的异步操作。

3.2 Stream API

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);numbers.parallelStream()    .map(n -> {        // 休眠1秒,模拟耗时        try {            Thread.sleep(1000);        } catch (InterruptedException e) {            e.printStackTrace();        }        return n * n;    }).forEach(System.out::println);

3.3 Java 8新引入的CompletableFuture

ompletableFuture 是 Java 8
中新增的一个类,它提供了一种简单而强大的异步编程机制,可以帮助我们更加方便地实现异步处理和链式调用。

在 CompletableFuture 中,我们可以通过调用
supplyAsync()、thenApply()、thenAccept()、thenRun()
等方法来创建一个异步任务,并在任务完成后自动回调指定的方法。 具体来说,supplyAsync()
方法会在一个新的线程中执行指定的任务,并返回一个 CompletableFuture 对象,该对象会在任务完成后自动回调
thenApply()、thenAccept() 或 thenRun() 等方法。

public class Main {    public static void main(String[] args) {        // 创建一个异步任务        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {            // 模拟耗时,以便先输出:"等待异步任务完成."            try {                Thread.sleep(1000);            } catch (InterruptedException e) {                e.printStackTrace();            }            return "Hello, CompletableFuture!";        });        // 在任务完成后自动回调指定的方法        future.thenAccept(result -> {            System.out.println(result);        });        // 输出一条消息,表示异步任务正在进行中        System.out.println("等待异步任务完成.");    }}

3.4 Spring注解 @Async、@EnableAsync

@Servicepublic class MyService {     @Async    public CompletableFuture<String> doSomething() {        // 这里是一个异步任务        return CompletableFuture.completedFuture("Hello K.Chan");    } }@Configuration@EnableAsyncpublic class AppConfig {     @Bean    public MyService myService() {        return new MyService();    }     // 配置一个线程池,用于执行异步任务    @Bean    public Executor asyncExecutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(10);        executor.setMaxPoolSize(100);        executor.setQueueCapacity(50);        executor.initialize();        return executor;    } }//myService.doSomething() 将立刻返回

3.5 异步编程的优缺点

Java 异步编程的优点包括:

提高程序性能:异步编程可以让程序在执行 I/O 操作等等耗时操作时不必等待,而是继续执行其他任务,从而提高程序的运行效率和吞吐量。

提高用户体验:异步编程可以避免用户因为等待操作完成而产生的卡顿和延迟,提高用户的体验和满意度。

改善资源利用率:异步编程可以让线程不必一直等待,从而释放线程资源,提高系统的资源利用率。

Java 异步编程的缺点包括:

复杂性增加:异步编程需要考虑并发和线程安全等问题,对程序员的编程能力要求更高。

可读性降低:异步编程会增加代码的复杂度和难度,降低代码的可读性和可维护性。

调试困难:异步编程存在异步执行、多线程交互等问题,增加了程序的调试难度。

二、线程池

1、什么是线程池

1.1 线程池的概念

线程池是一种用于管理和重用线程的机制。通常情况下,创建和销毁线程是一项非常消耗资源的操作,因此使用线程池可以避免频繁地创建和销毁线程,从而提高系统的性能。

线程池通常包含一组已经创建的线程,当需要执行任务时,可以从线程池中取出一个空闲线程来执行任务。当任务执行完毕后,该线程可以被重用,而不是被销毁。通过合理地管理线程池的大小,可以避免创建过多的线程,从而避免出现资源浪费和线程饥饿等问题。

1.2 为什么需要使用线程池

降低线程的创建和销毁频率,避免资源浪费和线程饥饿。

提高系统的性能和吞吐量。

可以对线程的执行进行统一的管理和调度,避免任务之间的相互影响。

可以提供更加灵活和可控的线程执行方式。

1.3 线程池与多线程的比较

创建和销毁线程:在多线程中,每次需要执行任务时,需要创建一个新的线程。当任务完成后,需要销毁该线程。而在线程池中,线程的创建和销毁都由线程池管理,线程可以被重复利用,从而避免了频繁的创建和销毁线程所带来的开销。

线程数量控制:在多线程中,线程数量由开发人员手动控制,如果线程数量过多,会导致系统资源的浪费,如果线程数量过少,则会导致任务等待时间过长。而线程池可以根据系统的负载情况自动调整线程数量,从而使系统资源的利用更加高效。

任务队列管理:线程池中的任务队列可以用来缓存未执行的任务,当线程池中的线程都在执行任务时,新的任务可以暂时存放在任务队列中,等待线程空闲时再进行执行。而在多线程中,需要自己手动管理任务队列,实现起来比较麻烦。

2、线程池的七大参数和执行过程

corePoolSize:线程池的核心线程数,即线程池中保留的最少线程数。

maximumPoolSize:线程池中允许的最大线程数。

keepAliveTime:当线程池中线程数量超过corePoolSize时,多余的空闲线程的存活时间。

unit:keepAliveTime参数的时间单位。

workQueue:线程池中的任务队列,用于保存等待执行的任务


threadFactory:创建线程的工厂类,可给工厂命名,用于跟踪和后期排查。

handler:当线程池中线程数量达到maximumPoolSize且workQueue已满时,用于处理新任务的策略。

DiscardOldestPolicy:丢弃任务队列中最早添加的任务,并尝试提交当前任务
CallerRunsPolicy:调用主线程执行被拒绝的任务,这提供了一种简单的反馈控制机制,将降低新任务的提交速度
DiscardPolicy:默默丢弃无法处理的任务,不予任何处理
AbortPolicy:直接抛出异常,阻止系统正常工作

//自定义拒绝策略:实现RejectedExecutionHandler 接口 重写rejectedExecution方法//例如:public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {    @Override    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {        // 自定义处理逻辑,将任务添加到队列中等待执行        if (executor.isShutdown()) {            throw new RejectedExecutionException("ThreadPoolExecutor has been shutdown");        }        executor.getQueue().offer(r);    }}

线程池执行过程

1.当有任务到达线程池时,线程池中的线程将会尝试去执行这个任务。
2.如果线程池中有空闲的线程,则会立即执行该任务。
3.如果线程池中没有空闲的线程,则会将任务加入到任务队列中等待执行。
4.当任务队列已满并且线程池中的线程数量小于最大线程数时,线程池将会创建新的线程执行该任务。
5.当线程池中的线程数量达到最大线程数时(此时任务队列也已满),如果仍然有任务到达,则线程池会根据拒绝策略进行处理。

3、线程池的实现方式

3.1 创建线程池

3.1.1上下级关系

1.使用Executors工具类中的静态方法创建线程池,如newFixedThreadPool、newCachedThreadPool、newSingleThreadExecutor等。2.通过ThreadPoolExecutor类手动创建线程池,可以自定义线程池的核心线程数、最大线程数、等待队列、拒绝策略等参数。import java.util.concurrent.*;public class ThreadPoolDemo {    public static void main(String[] args) {        // 创建一个固定大小的线程池,核心线程数为3,最大线程数为3        ExecutorService executorService = Executors.newFixedThreadPool(3);        for (int i = 1; i <= 10; i++) {            final int task = i;            // 提交任务到线程池中            executorService.execute(() -> {                System.out.println(Thread.currentThread().getName() + " 执行任务 " + task);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }        // 关闭线程池        executorService.shutdown();    }}import java.util.concurrent.*;public class ThreadPoolDemo {    public static void main(String[] args) {        // 自定义线程池的参数        int corePoolSize = 3; // 核心线程数        int maxPoolSize = 5; // 最大线程数        long keepAliveTime = 10; // 线程池中非核心线程的闲置超时时间        TimeUnit unit = TimeUnit.SECONDS; // 时间单位        BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(2); // 等待队列 (推荐使用Linked 而不是 Array)        RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy(); // 拒绝策略        // 创建线程池        ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, unit,                workQueue, handler);        // 提交任务到线程池中        for (int i = 1; i <= 10; i++) {            final int task = i;            executor.execute(() -> {                System.out.println(Thread.currentThread().getName() + " 执行任务 " + task);                try {                    Thread.sleep(1000);                } catch (InterruptedException e) {                    e.printStackTrace();                }            });        }        // 关闭线程池        executor.shutdown();    }}3.其他常用线程池接口/*相比于直接使用ExecutorService执行任务,使用CompletionService可以在任意一个任务完成时立即获得其结果,而不需要等待所有任务完成后再收集结果,这样可以提高程序的响应速度和效率。*/import java.util.concurrent.*;public class CompletionServiceDemo {    public static void main(String[] args) throws InterruptedException, ExecutionException {        ExecutorService executor = Executors.newFixedThreadPool(5);        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executor);        for (int i = 1; i <= 10; i++) {            final int taskId = i;            completionService.submit(() -> {                System.out.println("Task " + taskId + " is running...");                Thread.sleep(1000);                System.out.println("Task " + taskId + " is done.");                return taskId;            });        }        for (int i = 1; i <= 10; i++) {            Future<Integer> future = completionService.take(); //阻塞            int result = future.get();            System.out.println("Task " + result + " is completed.");        }        executor.shutdown();    }}4.ForkJoinPool线程池ForkJoinPoolJava 7中加入的线程池实现,专门用于处理可分解的计算任务,充分利用了多核CPU的计算能力。其底层实现采用了"工作窃取"算法,将大任务分解成若干个小任务进行处理,同时采用了任务双端队列来优化任务分配和任务执行的效率。例如常见的求和import java.util.Arrays;import java.util.concurrent.ForkJoinPool;import java.util.concurrent.RecursiveTask;public class SumTask extends RecursiveTask<Integer> { //继承递归任务    private static final int THRESHOLD = 5; //阈值 大于5就fork 小于5就循环求和    private int[] array;    private int start;    private int end;    public SumTask(int[] array, int start, int end) {        this.array = array;        this.start = start;        this.end = end;    }    @Override    protected Integer compute() {        if (end - start <= THRESHOLD) {            int sum = 0;            for (int i = start; i < end; i++) {                sum += array[i];            }            return sum;        } else {            int mid = start + (end - start) / 2;//递归求和            SumTask left = new SumTask(array, start, mid);            SumTask right = new SumTask(array, mid, end);            left.fork();            int rightSum = right.compute();            int leftSum = left.join();            return leftSum + rightSum;        }    }    public static void main(String[] args) {        int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};        ForkJoinPool pool = new ForkJoinPool();        int sum = pool.invoke(new SumTask(array, 0, array.length));        System.out.println("The sum is: " + sum);    }}

3.2 线程池的状态与流程


3.3 添加任务并执行任务

import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Future;public class ThreadPoolExample {    public static void main(String[] args) {        // 创建一个线程池,其中包含10个线程        ExecutorService executor = Executors.newFixedThreadPool(10);        // 创建一个Runnable任务        Runnable runnableTask = () -> {            System.out.println("Runnable task : " + Thread.currentThread().getName());        };        // 向线程池添加任务,使用execute()方法        executor.execute(runnableTask);        // 创建一个Callable任务        Callable<String> callableTask = () -> {            System.out.println("Callable task : " + Thread.currentThread().getName());            return "任务完成!";        };        // 向线程池添加任务,使用submit()方法        Future<String> futureResult = executor.submit(callableTask);        try {            // 等待任务执行完成,并获取返回结果            String result = futureResult.get();            System.out.println(result);        } catch (Exception e) {            e.printStackTrace();        }        // 关闭线程池        executor.shutdown();    }}

三、多线程和线程池的注意事项

3.1 多线程注意事项

线程安全:多个线程访问共享数据时需要考虑线程安全问题,如使用同步机制、原子类等。
避免死锁:多个线程相互等待对方释放资源时,可能会导致死锁,需要避免。 线程生命周期的管理:线程应该在不需要时及时销毁,避免资源浪费。
线程优先级的设置:设置线程优先级时,需要注意不要过分依赖优先级带来的性能提升。
多线程的上下文切换开销:在多线程并发执行时,线程的切换会产生一定的开销,需要注意线程的数量和执行时间。

3.2 线程池注意事项

线程池大小的选择:线程池的大小应该根据任务类型和系统配置进行调整,避免线程过多或过少。
任务队列的选择:线程池的任务队列应该根据任务类型和负载情况进行选择,如使用有界队列或无界队列。
关闭线程池:线程池在不需要时应该及时关闭,否则可能会产生资源浪费和内存泄漏。
线程池的监控和调试:线程池在运行过程中可能会产生各种问题,如任务堆积、线程死锁等,需要进行监控和调试。

这些注意事项并不是全面的,实际使用中还需要根据具体情况进行适当调整和优化。

3.3 线程池调优 (针对大型高并发项目)

方案问题
线程数= CPU 核心数*(1+平均等待时间/平均工作时间)出自《Java并发编程实战》往往在实际业务中,很难给出精确的平均时间和平均工作时间,也与网络,硬件等相关,不确定性因素较多。
coreSize = 2* cpu核数; maxSize = 25*cpu核数没考虑到业务中往往有多个线程池对象,不符合业务场景
coreSize = tps(每秒钟事务数) * time;maxSize = tps(每秒钟事务数) * time(1.7-2)考虑到了业务场景,但是是由模型流量平均分布得出来的,但是业务场景的流量往往是随机的,将不符合真实情况。

综上所述,固定的公式往往也很难给出合理预估值。因此可以采用动态线程池参数配置。可以将线程池的参数从代码中迁移到分布式配置中心上,实现线程池参数可动态配置和即时生效。


原生 jdk中也包含了这些覆盖原配置,但需要注意的是,如果该线程池在工作中,需要考量覆盖的值是否满足条件,否则会覆盖失败。

查看配置是否生效:JDK也提供了一系列原生的get接口

四、线程池和多线程的未来发展方向

3.1 异步编程:

在Java中实现异步编程可以通过使用线程池和Future接口实现。例如,在Spring框架中,可以使用@Async注解来实现异步方法调用。这样,在调用方法时,程序会立即返回Future对象,而实际的方法执行会在另一个线程中进行,从而提高了程序的并发度和响应速度。

3.2 协程:

Java中可以通过使用协程库来实现协程。JDK8无原生协程库,可从三方库导包,例如,Quasar是一个开源的Java协程库,它提供了一组API来实现协程。使用Quasar可以编写更加高效和灵活的并发程序。

import co.paralleluniverse.fibers.Fiber;public class MyCoroutine implements Runnable {    public void run() {        Fiber.sleep(1000);        System.out.println("我的协程 !");    }}public class Main {    public static void main(String[] args) {        Fiber<Void> fiber = new Fiber<Void>(new MyCoroutine());       // 启动协程        fiber.start();        System.out.println("主线程!");    }}

3.3 分布式计算:

分布式计算是利用多台计算机协同工作来解决大规模计算问题的技术。未来,分布式计算将会在更多的领域得到应用。

例如,云计算、大数据分析和人工智能等领域都需要利用分布式计算来处理大规模的数据和计算任务。

3.4大规模并发(挑战):

在未来的应用场景中,大规模并发将成为多线程编程的重要挑战之一。为了解决大规模并发带来的问题,未来的多线程编程技术需要更加注重资源管理和调度算法的优化。

例如,为了解决大规模并发问题,Facebook开源了一个基于协程的Web服务器框架——Mcrouter,它能够支持每秒数百万的请求处理。

Finally,非常感谢大家一直以来的支持和阅读,我会继续努力创作更好的内容,与大家分享更多有价值的知识和经验。