(阿里面试题)CompletableFuture模仿分布式事务 ,异步多任务处理,只要一个任务失败,那么回滚,并且停止其他正在运行的任务
思路:想到异步,就想到Futrue,紧接着想到Callable通过一个任务管理器,执行任务通过CompletableFuture异步执行任务首先建立一个任务基类package org.java8.transaction;import java.util.concurrent.Callable;/*** @author fpp* @version 1.0* @date 2020/8/17 19:16
·
思路:
- 想到异步,就想到Futrue,紧接着想到Callable
- 通过一个任务管理器,执行任务
- 通过CompletableFuture异步执行任务
首先建立一个任务基类
package org.java8.transaction;
import java.util.concurrent.Callable;
/**
* @author fpp
* @version 1.0
* @date 2020/8/17 19:16
*/
public interface Task<T> extends Callable<T> {
/**
* 任务回滚方法
*/
void rollback();
}
package org.java8.transaction;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
/**
* 复写FutureTask,来获取任务中的callable
* @author fpp
* @version 1.0
* @date 2020/8/18 15:48
*/
public class MyFutureTask<T> extends FutureTask<T> {
public Callable<T> callable;
public MyFutureTask(Callable<T> callable) {
super(callable);
this.callable=callable;
}
public MyFutureTask(Runnable runnable, T result) {
super(runnable, result);
}
public Callable<T> getCallable() {
return callable;
}
}
任务管理和任务执行器
package org.java8.transaction;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/**
* @author fpp
* @version 1.0
* @date 2020/8/17 19:15
*/
public class TaskManager {
private List<MyFutureTask> taskList=new LinkedList<>();
private boolean isSuccess=false;
public TaskManager() {
}
public void addTask(Task task){
taskList.add(new MyFutureTask(task));
}
public void done() {
CompletableFuture[] completableFutures=taskList.stream().map(task ->CompletableFuture.runAsync(task).whenCompleteAsync(this::done1)).collect(Collectors.toList()).toArray(new CompletableFuture[taskList.size()]);
CompletableFuture.allOf(completableFutures).join();
}
private void done1(Void v, Throwable t) {
if(!isSuccess) {
isSuccess=true;
try {
for (MyFutureTask futureTask : taskList) {
futureTask.get();
}
} catch (Exception e) {
System.out.println(e.getMessage());
for (MyFutureTask futureTask : taskList) {
if (!futureTask.isCancelled()) {
futureTask.cancel(true);
Task task = (Task) futureTask.getCallable();
task.rollback();
}
}
}
}
}
}
测试代码
package org.java8.transaction;
/**
* @author fpp
* @version 1.0
* @date 2020/8/17 19:42
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
TaskManager transactionManager=new TaskManager();
Task a=new Task() {
@Override
public Object call() throws InterruptedException {
System.out.println("do1 start.......");
Thread.sleep(2000);
System.out.println("do1.......");
Thread.sleep(2000);
return null;
}
@Override
public void rollback() {
System.out.println("do1 rollback");
}
};
Task c=new Task() {
@Override
public Object call() throws InterruptedException {
System.out.println("doc start.......");
Thread.sleep(5000);
System.out.println("doc.......");
Thread.sleep(1000);
return null;
}
@Override
public void rollback() {
System.out.println("doc rollback");
}
};
Task d=new Task() {
@Override
public Object call() throws InterruptedException {
System.out.println("dod start.......");
Thread.sleep(500);
System.out.println("dod.......");
Thread.sleep(1000);
return null;
}
@Override
public void rollback() {
System.out.println("dod rollback");
}
};
Task b=new Task() {
@Override
public Object call() throws InterruptedException {
System.out.println("dob start.......");
Thread.sleep(2000);
int a=1/0;
System.out.println("dob.......");
Thread.sleep(2000);
return null;
}
@Override
public void rollback() {
System.out.println("dob rollback");
}
};
transactionManager.addTask(a);
transactionManager.addTask(b);
transactionManager.addTask(c);
transactionManager.addTask(d);
transactionManager.done();
}
}
更多推荐
已为社区贡献1条内容
所有评论(0)