思路:
  1. 想到异步,就想到Futrue,紧接着想到Callable
  2. 通过一个任务管理器,执行任务
  3. 通过CompletableFuture异步执行任务
首先建立一个任务基类
package org.java8.transaction;

import java.util.concurrent.Callable;

/**
 * @author fpp
 * @version 1.0
 * @date 2020/8/17 19:16
 */
public interface Task<T> extends Callable<T> {
     /**
      * 任务回滚方法
      */
     void rollback();
}

package org.java8.transaction;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

/**
 * 复写FutureTask,来获取任务中的callable
 * @author fpp
 * @version 1.0
 * @date 2020/8/18 15:48
 */
public class MyFutureTask<T> extends FutureTask<T> {

    public Callable<T> callable;

    public MyFutureTask(Callable<T> callable) {
        super(callable);
        this.callable=callable;
    }

    public MyFutureTask(Runnable runnable, T result) {
        super(runnable, result);
    }

    public Callable<T> getCallable() {
        return callable;
    }
}
任务管理和任务执行器
package org.java8.transaction;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/**
 * @author fpp
 * @version 1.0
 * @date 2020/8/17 19:15
 */
public class TaskManager {
    private List<MyFutureTask> taskList=new LinkedList<>();

    private boolean isSuccess=false;

    public TaskManager() {
    }

    public void addTask(Task task){
        taskList.add(new MyFutureTask(task));
    }

    public void done() {
        CompletableFuture[] completableFutures=taskList.stream().map(task ->CompletableFuture.runAsync(task).whenCompleteAsync(this::done1)).collect(Collectors.toList()).toArray(new CompletableFuture[taskList.size()]);
        CompletableFuture.allOf(completableFutures).join();
    }

    private void done1(Void v, Throwable t) {
        if(!isSuccess) {
            isSuccess=true;
            try {
                for (MyFutureTask futureTask : taskList) {
                    futureTask.get();
                }
            } catch (Exception e) {
                System.out.println(e.getMessage());
                for (MyFutureTask futureTask : taskList) {
                    if (!futureTask.isCancelled()) {
                        futureTask.cancel(true);
                        Task task = (Task) futureTask.getCallable();
                        task.rollback();
                    }
                }
            }
        }
    }

}
测试代码
package org.java8.transaction;

/**
 * @author fpp
 * @version 1.0
 * @date 2020/8/17 19:42
 */
public class Test {
    public static void main(String[] args) throws InterruptedException {
        TaskManager transactionManager=new TaskManager();
        Task a=new Task() {
            @Override
            public Object call() throws InterruptedException {
                System.out.println("do1 start.......");
                Thread.sleep(2000);
                System.out.println("do1.......");
                Thread.sleep(2000);
                return null;
            }

            @Override
            public void rollback() {
                System.out.println("do1 rollback");
            }
        };

        Task c=new Task() {
            @Override
            public Object call() throws InterruptedException {
                System.out.println("doc start.......");
                Thread.sleep(5000);
                System.out.println("doc.......");
                Thread.sleep(1000);
                return null;
            }

            @Override
            public void rollback() {
                System.out.println("doc rollback");
            }
        };

        Task d=new Task() {
            @Override
            public Object call() throws InterruptedException {
                System.out.println("dod start.......");
                Thread.sleep(500);
                System.out.println("dod.......");
                Thread.sleep(1000);
                return null;
            }

            @Override
            public void rollback() {
                System.out.println("dod rollback");
            }
        };



        Task b=new Task() {
            @Override
            public Object call() throws InterruptedException {
                System.out.println("dob start.......");
                Thread.sleep(2000);
                int a=1/0;
                System.out.println("dob.......");
                Thread.sleep(2000);
                return null;
            }

            @Override
            public void rollback() {
                System.out.println("dob rollback");
            }
        };

        transactionManager.addTask(a);
        transactionManager.addTask(b);
        transactionManager.addTask(c);
        transactionManager.addTask(d);
        transactionManager.done();
    }
}
Logo

鸿蒙生态一站式服务平台。

更多推荐