并发编程学习笔记

Java 创建线程的三种方式

先看一看在Thread类源码中的注释是怎么写的

There are two ways to create a new thread of execution. One is to declare a class to be a subclass of Thread. This subclass should override the run method of class Thread. An instance of the subclass can then be allocated and started. For example, a thread that computes primes larger than a stated value could be written as follows:

   class PrimeThread extends Thread {
       long minPrime;
       PrimeThread(long minPrime) {
           this.minPrime = minPrime;
       }

       public void run() {
           // compute primes larger than minPrime
            . . .
       }
   }

The following code would then create a thread and start it running:

        PrimeThread p = new PrimeThread(143);
        p.start();

The other way to create a thread is to declare a class that implements the Runnable interface. That class then implements the run method. An instance of the class can then be allocated, passed as an argument when creating Thread, and started. The same example in this other style looks like the following:

   class PrimeRun implements Runnable {
       long minPrime;
       PrimeRun(long minPrime) {
           this.minPrime = minPrime;
       }

       public void run() {
           // compute primes larger than minPrime
            . . .
       }
   }

The following code would then create a thread and start it running:

       PrimeRun p = new PrimeRun(143);
       new Thread(p).start();

Every thread has a name for identification purposes. More than one thread may have the same name. If a name is not specified when a thread is created, a new name is generated for it.

1. 继承Thread类创建线程

public class lab1 {
    public static void main(String[] args) {
        //创建一个新的线程
        Runner person01 = new Runner();
        //给线程命名
        person01.setName("PERSON01");
        //启动线程

        Runner person02 = new Runner();

        person02.setName("PERSON02");

        person01.start();
        person02.start();
    }
}

class Runner extends Thread{

    @Override
    public void run() {
        Integer speed = new Random().nextInt(100);
        for (int i = 0; i <= 100; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(this.getName() + "已前进" + (i * speed) + "米(" + speed +"米/秒)");
        }
    }
}

此时在这段Java程序中有哪几个线程?

  1. 主线程
  2. PERSON01线程
  3. PERSON02线程
  4. 垃圾回收监听线程

即Java在运行时永远不会是单一的线程

2. 实现Runnable接口创建线程

public class lab2 {
    public static void main(String[] args) {
        Runner02 person03 = new Runner02();
        Thread thread03 = new Thread(person03);
        thread03.setName("PERSON03");

        Thread person04 = new Thread(new Runner02());
        person04.setName("PERSON04");

        new Thread(new Runner02()).start();
        thread03.start();
        person04.start();

    }
}

class Runner02 implements Runnable{

    @Override
    public void run() {
        Integer speed = new Random().nextInt(100);
        for (int i = 0; i <= 100; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread.currentThread().getName() 用于获取当前执行的线程对象
            //在 Runnable 中是无法使用 this 获取到当前线程对象的
            System.out.println(Thread.currentThread().getName() + "已前进" + (i * speed) + "米(" + speed +"米/秒)");
        }
    }
}

3. 使用Callable和Future创建线程

  • jDK1.5以后为我们专门提供了一个并发工具包java.util.concurrent
  • java.util.concurrent包含许多线程安全、测试良好、高性能的并发构建块。创建concurrent的目的就是要实现Collection框架对数据结构所执行的并发操作。通过提供一组可靠的、高性能并发构建块,开发人员可以提高并发类的线程安全、可伸缩性、性能、可读性和可靠性。
public class lab3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建一个线程池,里面自带三个"空"线程 Executors 是调度器,对线程池进行管理
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        //实例化 Callable 对象
        Runner03 person04 = new Runner03();
        person04.setName("PERSON04");
        Runner03 person05 = new Runner03();
        person05.setName("PERSON05");
        Runner03 person06 = new Runner03();
        person06.setName("PERSON06");
        //将整个对象扔到线程池中, 线程池自动分配一个线程来运行person04对象的 call方法
        //Future 用于接收线程池内部call方法的返回值
        Future<Integer> result04 = executorService.submit(person04);
        Future<Integer> result05 = executorService.submit(person05);
        Future<Integer> result06 = executorService.submit(person06);
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //关闭线程池,释放所有资源
        executorService.shutdown();
        System.out.println("PERSON04累计跑了: " + result04.get() + "米");
        System.out.println("PERSON05累计跑了: " + result05.get() + "米");
        System.out.println("PERSON06累计跑了: " + result06.get() + "米");
    }
}
//使用Callable 允许返回值 和 抛出异常
class Runner03 implements Callable<Integer>{
    private String name;

    public void setName(String name){
        this.name = name;
    }
    //实现 Callable接口可以允许我们的线程返回值或抛出异常
    @Override
    public Integer call() throws Exception {
        Integer speed = new Random().nextInt(100);
        Integer distince = 0;//总共奔跑的距离
        for (int i = 0; i <= 100; i++) {
            Thread.sleep(10);
            distince = i * speed;
            System.out.println(this.name + "已前进" + distince + "米(" + speed + "米/秒)");
        }
        return distince;
    }
}
优点 缺点 使用场景
继承Thread 编程简单 单继承 无法对线程组有效控制 不推荐使用
实现Runnable 面向接口编程 执行效率高 无法对线程组有效控制没有返回值、异常 简单的多线程程序
利用线程池 容器管理线程 允许返回值和异常 执行效率相对低 编程复杂 企业级应用 推荐使用

synchronized的使用场景

  • synchronized代码块 - 任意对象即可

  • 在堆中的对象对象头中的线程id指向当前线程,线程中的record mark记录当前对象头的信息

    class classSample{
      //锁对象
      Obeject obj = new Object();
    
      public void methodSample01(){
        synchronized(obj){
            //待同步代码
            . . .
        }
      }
      public void methodSample02(){
        synchronized(obj){
            //待同步代码
            . . .
        }
      }
    }
  • synchronized方法 - this当前对象

    class classSample{
    
      public synchronized void methodSample01(){
          //待同步代码
        . . .
      }
    
      //给方法加锁等同于给this对象加锁
      public void methodSample02(){
        synchronized(this){
            //待同步代码
            . . .
        }
      }
    }
  • synchronized静态方法 - 该类的字节码对象

    class classSample{
    
      public synchronized static void methodSample01(){
          //待同步代码
        . . .
      }
    
      //给静态方法加锁等同于给类的字节码对象加锁
      public void methodSample02(){
        synchronized(classSample.class){
            //待同步代码
            . . .
        }
      }
    }

线程的五种状态

  1. 新建 (new)
  2. 就绪 (ready)
  3. 运行中 (running)
  4. 阻塞 (blocked)
  5. 死亡 (dead)

image-20220322195314329

死锁

死锁是在多线程情况下最严重的问题,在多线程对公共资源 (文件、数据)等进行操作时,彼此不释放自己的资源,而 去试图操作其他线程的资源,而形成交叉引用,就会产生死锁

public class DeadLock {
    private static String fileA = "A文件";
    private static String fileB = "B文件";

    public static void main(String[] args) {
        new Thread(){//线程1
            @Override
            public void run() {
                while (true) {
                    //打开文件A,线程独占
                    synchronized (fileA) {
                        System.out.println(this.getName() + ": 文件A写入");
                        synchronized (fileB) {
                            System.out.println(this.getName() + ": 文件B写入");
                        }
                        System.out.println(this.getName() + ": 所有文件保存");
                    }
                }
            }
        }.start();

        new Thread(){//线程1
            @Override
            public void run() {
                while (true) {
                    //打开文件A,线程独占
                    synchronized (fileB) {
                        System.out.println(this.getName() + ": 文件B写入");
                        synchronized (fileA) {
                            System.out.println(this.getName() + ": 文件A写入");
                        }
                        System.out.println(this.getName() + ": 所有文件保存");
                    }
                }
            }
        }.start();
    }
}

解决死锁最根本的建议是:

  • 尽量减少公共资源的引用,用完马上释放
  • 用完马上释放公共资源
  • 减少synchronized使用,采用“副本”方式替代

线程安全

在拥有共享数据的多条线程并行执行的程序中,线程安全的代码会通过同步机制保证各个线程都可以正常且正确的执行,不会出现数据污染等意外情况

线程安全

  • 优点:可靠

  • 缺点:执行速度慢

  • 使用建议:需要线程共享时使用

线程不安全

  • 优点:速度快

  • 缺点:可能与预期不符

  • 使用建议:在线程内部使用,无 需线程间共享

列举线程安全与不安全的类

StringBuffer是线程安全的,StringBuilder是线程不安全的

HashTable是线程安全的,HashMap是线程不安全的

Properties是线程安全的,HashSetTreeSet是不安全的

Vector是线程安全的,ArrayListLinkedList是线程不安全的

ThreadPool 线程池

new Thread() 的弊端

  • new Thread()新建对象性能差
  • 线程缺乏统一管理,可能无限制的新建线程,相互竞争,严重时会占用过多系统资源导致死机或OOM

ThreadPool 线程池

  • 重用存在的线程,减少对象创建、消亡的开销
  • 线程总数可控,提高资源的利用率
  • 避免过多的资源竞争,避免阻塞
  • 提供额外功能,定时执行、定期执行、监控等

线程池的种类

在JUC中,提供了工具类Executors(调度器)对象来创建线程池,可创建的线程池有四种:

  1. CachedThreadPool:可缓存线程池

    public class ThreadPoolSample01 {
       public static void main(String[] args) {
           //Executors调度器对象
           //ExecutorService用于管理线程池
           //创建一个可缓存线程池
           //可缓存线程池的特点,无限大,如果线程池中没有可用的线程则创建,有空闲则利用
           ExecutorService threadPool = Executors.newCachedThreadPool();
           for (int i = 0; i <= 1000; i++) {
               final int index = i;
               threadPool.execute(new Runnable() {
                   @Override
                   public void run() {
                       System.out.println(Thread.currentThread().getName() + " : " + index);
                   }
               });
           }
           try {
               //给线程足够的运行时间
               Thread.sleep(1000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           //threadPool.shutdown() 代表关闭线程池(等待所有线程完成)
           //threadPool.shutdownNow() 代表立即终止线程池的运行,不等待线程,不推荐使用(强制关闭)
           threadPool.shutdown();
       }
    }
    
  2. FixedThreadPool:定长线程池

    public class ThreadPoolSample02 {
       public static void main(String[] args) {
           //创建一个定长线程池
           //定长线程池的特点,固定线程总数,空闲线程用于执行任务,如果线程都在使用
           //后续任务则处于等待状态(备选的等待算法为 FIFO(默认)和 LIFO)
           //在线程池中的线程执行任务后再执行后续的任务
           ExecutorService threadPool = Executors.newFixedThreadPool(3);
           for (int i = 0; i <= 1000; i++) {
               final int index = i;
               threadPool.execute(new Runnable() {
                   @Override
                   public void run() {
                       System.out.println(Thread.currentThread().getName() + " : " + index);
                   }
               });
           }
           try {
               Thread.sleep(1000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           threadPool.shutdown();
       }
    }
  3. SingleThreadExecutor:单线程池

    public class ThreadPoolSample03 {
       public static void main(String[] args) {
           //Executors调度器对象
           //ExecutorService用于管理线程池
           //创建一个单线程线程池
           ExecutorService threadPool = Executors.newSingleThreadExecutor();
           for (int i = 0; i <= 1000; i++) {
               final int index = i;
               threadPool.execute(new Runnable() {
                   @Override
                   public void run() {
                       System.out.println(Thread.currentThread().getName() + " : " + index);
                   }
               });
           }
           try {
               Thread.sleep(1000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
           threadPool.shutdown();
       }
    }
  4. ScheduledThreadPool:调度线程池

    public class ThreadPoolSample04 {
       public static void main(String[] args) {
           //Executors调度器对象
           //ExecutorService用于管理线程池
           //创建一个可调度线程池
           ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
           //延迟三秒执行一次run
    //        scheduledThreadPool.schedule(new Runnable() {
    //            @Override
    //            public void run() {
    //                System.out.println("延迟3s执行");
    //            }
    //        }, 3, TimeUnit.SECONDS);
    
           //在使用可调度线程池后 Timer类就可以不再使用
           //但是在项目实际开发中两者都不会被用到,
           //因为有着更为成熟的调度框架 Quartz 或 Spring自带调度
           scheduledThreadPool.scheduleAtFixedRate(new Runnable() {
               @Override
               public void run() {
                   System.out.println(new Date() + "  延迟1s执行,每3s执行一次");
               }
           }, 1, 3, TimeUnit.SECONDS);
       }
    }

CountDownLatch 倒计时锁

CountDownLatch倒计时锁适合"总-分任务",例如多线程计算后的数据汇总,可以实现类似计数器的功能

CDL执行原理

image-20220322234437780

public class CountDownSample {
    private static int count = 0;
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(100);
        //CDL总数和操作数保持一直
        CountDownLatch cdl = new CountDownLatch(10000);
        for (int i = 0; i <= 10000; i++) {
            final int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    synchronized (CountDownSample.class) {
                        try {
                            count += index;
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            cdl.countDown();
                        }
                    }
                }
            });
        }
        try {
            //堵塞当前线程,直到cdl=0的适合再继续往下走
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(count);
        threadPool.shutdown();
    }
}

Semaphore 信号量

Semaphore信号量经常用于限制获取某种资源的线程数量

public class SemaphoreSample01 {
    public static void main(String[] args) {
        //用来装载等待的请求
        ExecutorService threadPool = Executors.newCachedThreadPool();
        //定义5个信号量,即服务器只允许5个请求同时进行
        Semaphore semaphore = new Semaphore(5);
        for (int i = 1; i <= 20 ; i++) {
            final int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //获取一个信号量, 占用一个"使用权"
                        semaphore.acquire();
                        use();
                        //执行完成后释放这个信号量, 结束服务使用
                        semaphore.release();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPool.shutdown();
    }

    public static void use(){
        try {
            System.out.println(new Date() + " " + Thread.currentThread().getName() + "获得服务器使用资格");
            Thread.sleep(2000);
            System.out.println(new Date() + " " + Thread.currentThread().getName() + "退出服务器");
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

服务已满终止等待提示

public class SemaphoreSample02 {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(5);
        for (int i = 1; i <= 20 ; i++) {
            final int index = i;
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        //尝试获取一次信号量,获取到返回true否则返回false
                        if (semaphore.tryAcquire(6, TimeUnit.SECONDS)){
                            use();
                            semaphore.release();
                        } else{
                            System.out.println(Thread.currentThread().getName() + " 对不起,服务器已满,请稍后再试");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        threadPool.shutdown();
    }

    public static void use(){
        try {
            System.out.println(new Date() + " " + Thread.currentThread().getName() + "获得服务器使用资格");
            Thread.sleep(2000);
            System.out.println(new Date() + " " + Thread.currentThread().getName() + "退出服务器");
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

CyclicBarrier 循环屏障

CyclicBarrier 是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障带你。与CDL不同的是该barrier再释放等待线程后可以宠用,所以称为循环(Cyclic)屏障(Barrier)。

image-20220323002754810

public class CyclicBarrierSample {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newCachedThreadPool();
        for (int i = 1; i <= 20; i++) {
            final int index = i;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    go();
                }
            });
        }
        threadPool.shutdown();
    }

    private static void go(){
        System.out.println(Thread.currentThread().getName() + " 准备就绪");
        try {
            //设置屏障点,当累计五个线程都准备好后,才运行后面的代码
            cyclicBarrier.await();
            System.out.println(Thread.currentThread().getName() + " 开始运行");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}

CyclicBarrier 应用场景

适用于多线程必须同时开始的场景,抢票秒杀等等

ReentrantLock 重入锁

重入锁是指任意线程在获取到锁之后,再次获取该锁而不会被该锁所阻塞

ReentrantLock设计的目标是用来替代synchronized关键字1

特征 synchronized(推荐) ReentrantLock
底层原理 JVM实现 JDK实现
性能区别 低 -> 高 (JDK5+)
锁的释放 自动释放 (编译器保证) 手动释放 (finally保证)
编码难度 简单 复杂
锁的粒度 读写不区分 读锁、写锁
高级功能 公平锁、非公平锁唤醒、Condition分组唤醒、中断等待锁
public class ReentrantLockSample {
    public static int users = 100;//同时模拟的并发访问用户数量
    public static int downTotal = 50000; //用户下载的真实总数
    public static int count = 0;//计数器
    private static ReentrantLock lock = new ReentrantLock();

    public static void main(String[] args) {
        //调度器,JDK1.5后提供的concurrent包对于并发的支持
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量,用于模拟并发的人数
        final Semaphore semaphore = new Semaphore(users);
        for (int i = 0; i < downTotal; i++) {
            executorService.execute(() -> {
                //通过多线程模拟N个用户并发访问并下载
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();//关闭调度服务
        System.out.println("下载总数:" + count);
    }

    public static void add() {
        lock.lock();//上锁
        try {
            count++;
        } finally {
            lock.unlock(); //解锁,一定要放在finally里面否则会出现死锁
        }
    }
}

Condition 等待与唤醒

在并行程序中,避免不了某些线程要预先规定好顺序执行,如:先新增再修改,先买后卖,先进后出...,对于这种场景,使用JUC中的Condition对象再适合不过了。

JUC中提供了Condition对象,用于让指定线程等待和唤醒,按预期顺序执行。它必须和ReentrantLock重入锁配合使用。

Condition用于替代wait()和/notify()方法

  • notify()只能随机唤醒等待的线程,而Condition可以唤醒指定的线程,这有利于更好的控制并发程序。

Condition核心方法

  • await():阻塞当前线程,直到singal()唤醒
  • signal():唤醒被await()的线程,从中断处继续执行
  • signalAll():唤醒所有被await()阻塞的线程
public class ConditionSample {
    public static void main(String[] args) {
        //Condition对象必须配合Lock一起使用
        ReentrantLock lock = new ReentrantLock();
        Condition c1 = lock.newCondition();//创建Condition
        Condition c2 = lock.newCondition();//创建Condition
        Condition c3 = lock.newCondition();//创建Condition

        new Thread(new Runnable() { //T1
            @Override
            public void run() {
                lock.lock();//加锁
                try {
                    c1.await();//阻塞当前线程,c1.signal的时候线程激活继续执行
                    Thread.sleep(1000);
                    System.out.println("粒粒皆辛苦");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();//解锁
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();//加锁
                try {
                    c2.await();
                    Thread.sleep(1000);
                    System.out.println("谁知盘中餐");
                    c1.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();//解锁
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();//加锁
                try {
                    c3.await();
                    Thread.sleep(1000);
                    System.out.println("汗滴禾下土");
                    c2.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();//解锁
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    Thread.sleep(1000);
                    System.out.println("锄禾日当午");
                    c3.signal();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        }).start();
    }
}

Callable & Future

Callable 和 Runnable一样代表着任务,区别在于Callable有返回值并且可以抛出异常

Future 是一个接口。它用于表示异步计算的结果。提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。

//并发计算质数
public class FutureSample {
    public static void main(String[] args) {
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int i = 2; i <= 10000; i++) {
            Computer c = new Computer();
            c.setNum(i);
            //Future对用于计算的线程进行监听,因为计算是在其它线程中执行的,所以这个返回结果的过程是异步的
            //将c对象提交给线程池,如有空闲线程立即执行里面的call方法
            Future<Boolean> result = threadPool.submit(c);
            try {
                //用于获取返回值,如果线程内部的call没有执行完成,则进入等到状态,直到计算完成
                Boolean r = result.get();
                if (r){
                    System.out.println(c.getNum());
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        threadPool.shutdown();
    }
}
class Computer implements Callable<Boolean> {

    private Integer num;

    public Integer getNum() {
        return num;
    }

    public void setNum(Integer num) {
        this.num = num;
    }

    @Override
    public Boolean call() throws Exception {
        boolean isPrime = true;
        for (int i = 2; i < num; i++) {
            if (num % i == 0){
                isPrime = false;
                break;
            }
        }
        return isPrime;
    }
}

JUC 并发容器

ArrayList -> CopyOnWriteArrayList - 写复制列表

HashSet -> CopyOnWriteArraySet - 写复制集合

HashMap -> ConcurrentHashMap - 分段锁映射

CopyOnWriteArrayList 并发原理

CopyOnWriteArrayList 通过"副本"解决并发问题

public class CopyOnWriteArrayListSample {
    public static void main(String[] args) {
        List<Integer> list = new CopyOnWriteArrayList <>();
        for (int i = 0; i < 1000; i++) {
            list.add(i);
        }
        for (Integer i : list) {
            list.remove(i);
        }
        System.out.println(list);
    }
}

CopyOnWriteArraySet 并发原理

与CopyOnWriteArrayList类似

ConcurrentHashMap

public class ReentrantLockSample {
    public static int users = 100;//同时模拟的并发访问用户数量
    public static int downTotal = 50000; //用户下载的真实总数
    public static ConcurrentHashMap count = new ConcurrentHashMap();//计数器

    public static void main(String[] args) {
        //调度器,JDK1.5后提供的concurrent包对于并发的支持
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量,用于模拟并发的人数
        final Semaphore semaphore = new Semaphore(users);
        for (int i = 0; i < downTotal; i++) {
            final Integer index = i;
            executorService.execute(() -> {
                //通过多线程模拟N个用户并发访问并下载
                try {
                    semaphore.acquire();
                    count.put(index, index);
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();//关闭调度服务
        System.out.println("下载总数:" + count.size());
    }
}

Atomic 包 & CAS 算法

原子性

是指一个操作或者多个操作要么全部执行,且执行的过程不会被任何因素打断,要么就不执行

Atomic 包

专为线程安全而设计,包含多个原子操作类

Atomic 常用类

  • AtomicInteger
  • AtomicIntegerArray
  • AtomicBoolean
  • AtomicLong
  • AtomicLongArray
public class AtomicIntegerSample {
    public static int users = 100;//同时模拟的并发访问用户数量
    public static int downTotal = 50000; //用户下载的真实总数
    public static AtomicInteger count = new AtomicInteger();//计数器

    public static void main(String[] args) {
        //调度器,JDK1.5后提供的concurrent包对于并发的支持
        ExecutorService executorService = Executors.newCachedThreadPool();
        //信号量,用于模拟并发的人数
        final Semaphore semaphore = new Semaphore(users);
        for (int i = 0; i < downTotal; i++) {
            final Integer index = i;
            executorService.execute(() -> {
                //通过多线程模拟N个用户并发访问并下载
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executorService.shutdown();//关闭调度服务
        System.out.println("下载总数:" + count);

    }
    //线程不安全
    public static void add(){
        //count++
        count.getAndIncrement();
    }
}

CAS 算法

  • 锁是用来做并发最简单的方式,当然其代价也是最高的。独占锁是 一种悲观锁,synchronized就是一种独占锁,它假设最坏的情况, 并且只有在确保其它线程不会造成干扰的情况下执行,会导致其它 所有需要锁的线程挂起,等待持有锁的线程释放锁。

  • 所谓乐观锁就是,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。其中CAS(比较与交换,Compare And Swap) 是一种有名的无锁算法

Atomic的应用场景

虽然基于CAS的线程安全机制很好很高效,但要说的是,并非所有线程安全都可以用这样的方法来实现, 这只适合一些粒度比较小型,如计数器这样的需求用起来才有效,否则也不会有锁的存在了

暂无评论

发送评论 编辑评论


|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇