多节点服务器定时任务重复处理的几种方案
1.使用zookeeper做分布式锁@Componentpublic class MutexConfig {@Value(value = "${zookeeper.host}")private String zkHost;public Boolean getLock(String lock, Integer acquireTime, Integer sleepTime) throws Except
·
1.使用zookeeper做分布式锁
@Component
public class MutexConfig {
@Value(value = "${zookeeper.host}")
private String zkHost;
public Boolean getLock(String lock, Integer acquireTime, Integer sleepTime) throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zkHost, retryPolicy);
client.start();
InterProcessMutex mutex = new InterProcessMutex(client, lock);
Boolean result = mutex.acquire(acquireTime, TimeUnit.MILLISECONDS);
Thread.sleep(sleepTime);
if (mutex.isAcquiredInThisProcess()) {
mutex.release();
}
client.close();
return result;
}
}
在配置文件里配置
zookeeper.path.smsxxx.xxtemplate=/dmp/smsxxx/xxtemplate
在定时任务实现类编写定时任务
@Value(value = "${zookeeper.path.smsxxx.xxtemplate}")
private String jd;
/**
* 定时更新短信模板状态
*/
@Scheduled(cron = "0 0/10 * * * ?")
public void updateJdState() {
try {
logger.info("updateJdState start");
Boolean result = mutexConfig.getLock(jd, 2000, 3000);
if (Boolean.TRUE.equals(result)) {
logger.info("updateJdState content start");
smsTemplateService.updateJdState();
}
logger.info("end");
} catch (Exception e) {
logger.error("updateJdState error: {}", e.getMessage());
}
}
2.使用mongodb做分布式锁
编写dao类
public interface DistributeLockerDAO extends BaseDAO<LockerPO> {
/**
* insert one locker
* @param lockerPo
* @return true if success, false if the locker is duplicated
*/
boolean insert(LockerPO lockerPo);
/**
* the acquirer remove the locker with the id
* @param id
* @param acquirer
* @return
*/
boolean remove(String id, String acquirer);
/**
* update locker's time held by the acquirer
* @param id
* @param acquirer
* @param time
* @return
*/
boolean updateShakenTime(String id, String acquirer, LocalDateTime time);
/**
* get the locker by the locker id
* @param lockerId
* @return
*/
Optional<LockerPO> getLocker(String lockerId);
/**
* list the lockers
* @param offset
* @param limit
* @return
*/
List<LockerPO> list(long offset, int limit);
/**
* list the lockers with the acquirer
* @param offset
* @param limit
* @param acquirer
* @return
*/
List<LockerPO> list(String acquirer, long offset, int limit);
@Data
@Accessors(chain = true)
class LockerCountOfAcquirer {
@Field("_id")
String acquirer;
long count;
}
/**
* list the locker count group by the user
* @return the map user->count pair list
*/
List<LockerCountOfAcquirer> countByAcquirer();
}
锁的实体类
@Document(CollectionName.LOCKER)
@Data
@Accessors(chain = true)
public class LockerPO implements IdPO {
@Id
private String id;
private String acquirer;
private LocalDateTime acquiredAt;
private LocalDateTime shakenAt;
}
锁的服务类
@Service
public class DistributeLockerServiceImpl implements DistributeLockerService {
private final DistributeLockerDAO distributeLockerDAO;
private final Gauge gauge;
private final List<String> lastSyncAcquires;
public DistributeLockerServiceImpl(
@Value(value = "${prometheus.namespace}") String namespace,
@Value(value = "${prometheus.subsystem}") String subsystem,
DistributeLockerDAO distributeLockerDAO,
MeterRegistry registry
) {
this.distributeLockerDAO = distributeLockerDAO;
lastSyncAcquires = new ArrayList<>();
gauge = Gauge.build()
.namespace(namespace)
.subsystem(subsystem)
.name("distribute_locker_total")
.labelNames("user")
.help("distribute locker statistic by the acquirer")
.register(((PrometheusMeterRegistry)registry).getPrometheusRegistry());
}
@Override
public boolean tryLock(String lockerId, String user) {
LocalDateTime dateTime = LocalDateTime.now();
boolean suc = distributeLockerDAO.insert(
new LockerPO().setId(lockerId)
.setAcquirer(user)
.setAcquiredAt(dateTime)
.setShakenAt(dateTime));
if (!suc) {
suc = distributeLockerDAO.updateShakenTime(lockerId, user, dateTime);
}
return suc;
}
@Override
public boolean release(String lockerId, String user) {
return distributeLockerDAO.remove(lockerId, user);
}
@Override
public boolean shake(String lockerId, String user) {
return distributeLockerDAO.updateShakenTime(lockerId, user, LocalDateTime.now());
}
@Override
public Optional<LockerPO> getLocker(String lockerId) {
return distributeLockerDAO.getLocker(lockerId);
}
@Override
public List<LockerPO> list(long offset, int limit) {
return distributeLockerDAO.list(offset, limit);
}
@Override public List<LockerPO> list(String user, long offset, int limit) {
return distributeLockerDAO.list(user, offset, limit);
}
@Scheduled(fixedDelay = 500)
public void monitorLockerCountOfAcquirer() {
List<DistributeLockerDAO.LockerCountOfAcquirer> lockerCountOfAcquirers = distributeLockerDAO.countByAcquirer();
updateMonitorOfLockerCountOfAcquirer(lockerCountOfAcquirers);
List<String> acquires = lockerCountOfAcquirers.stream()
.map(DistributeLockerDAO.LockerCountOfAcquirer::getAcquirer)
.collect(Collectors.toList());
resetLockerCountOfDeletedAcquire(acquires);
updateLastSyncAcquires(acquires);
}
private void updateMonitorOfLockerCountOfAcquirer(List<DistributeLockerDAO.LockerCountOfAcquirer> lockerCountOfAcquirers) {
lockerCountOfAcquirers.forEach(c -> gauge.labels(c.getAcquirer()).set(c.getCount()));
}
private void resetLockerCountOfDeletedAcquire(List<String> countOfUsers) {
calcDeletedAcquirers(countOfUsers).forEach(a -> gauge.labels(a).set(0));
}
private List<String> calcDeletedAcquirers(List<String> acquirers) {
return lastSyncAcquires.stream()
.filter(s -> !acquirers.contains(s))
.collect(Collectors.toList());
}
private void updateLastSyncAcquires(List<String> acquirers) {
lastSyncAcquires.clear();
lastSyncAcquires.addAll(acquirers);
}
}
分布式锁的基类
@Slf4j
public class DistributeLockerBase implements LockerShakerScheduler {
private final String acquirer;
private final List<String> lockingIds;
private final DistributeLockerService distributeLockerService;
protected DistributeLockerBase(String acquirer,
DistributeLockerService distributeLockerService) {
Preconditions.checkArgument(!Strings.isNullOrEmpty(acquirer));
Preconditions.checkArgument(distributeLockerService != null);
this.acquirer = acquirer;
this.lockingIds = Collections.synchronizedList(new ArrayList<>());
this.distributeLockerService = distributeLockerService;
}
public final String getAcquirer() {
return acquirer;
}
protected boolean lock(String lockerId) {
boolean suc = distributeLockerService.tryLock(lockerId, acquirer);
if (suc) {
lockingIds.add(lockerId);
}
log.info("lock [{}], [{}]", lockerId, suc);
return suc;
}
protected boolean release(String lockerId) {
boolean suc = distributeLockerService.release(lockerId, acquirer);
lockingIds.remove(lockerId);
log.info("release [{}], [{}]", lockerId, suc);
return suc;
}
/**
* delay 1 second. after sync finished
*/
@Scheduled(fixedDelay = 1000)
@Override
public void shake() {
List<String> lockingIdsCopied;
synchronized (this.lockingIds) {
lockingIdsCopied = new ArrayList<>(this.lockingIds);
}
lockingIdsCopied.forEach(id -> {
if (!distributeLockerService.shake(id, acquirer)) {
log.error("[{}] shakes [{}] FAILED", acquirer, id);
} else {
log.info("[{}] shakes [{}] SUCCESS", acquirer, id);
}
});
}
}
/**
* the class used for defining the shake method
* so that the <code>shake</code> method in the <code>DistributeLockerBase</code> can auto-scheduled
* when some class inherits <code>DistributeLockerBase</code> and marks as a bean
*
* NOTE: it's dependent on the Spring boot version
*/
interface LockerShakerScheduler {
void shake();
}
实现定时任务类
@Slf4j
@Component
public class BrandUVSyncher extends DistributeLockerBase {
private static final String SPLITTER = "+";
private static final String TASK_ID_PREFIX = "lbi-openapi-brandUV:";
private static final long PERIOD = 10_000;
private final MidPlatformClient midPlatformClient;
private final BrandUVDAO brandUVDAO;
private final AreaDAO areaDAO;
private final List<AreaPO> top2LevelCities;
/**
* geo of banks in some city
* cityName -> {bankName -> coordinate}
*/
private final Map<String, Map<String, List<double[]>>> geoOfBanks;
BrandUVSyncher(MidPlatformClient midPlatformClient, BrandUVDAO brandUVDAO,
DistributeLockerService distributeLockerService, AreaDAO areaDAO) {
super("BrandUVSyncher:" + UuidGenerator.newBase64Uuid(), distributeLockerService);
this.midPlatformClient = midPlatformClient;
this.brandUVDAO = brandUVDAO;
this.areaDAO = areaDAO;
this.geoOfBanks = loadGeoOfBanks();
this.top2LevelCities = loadTop2LevelCity();
}
/**
* sync the living&working uv of the cities of level 1&2
*/
@Scheduled(fixedDelay = PERIOD, initialDelay = 3000)
public void syncCityLivingAndWorkingUV() {
log.info("run sync city WORKING&LIVING UV");
// DO NOT MODIFY ME!
final String lockerID = buildLockerID("syncCityLivingAndWorkingUV");
LocalDate now = LocalDate.now();
if (!isTimeToSync(now)) {
return;
}
if (!lock(lockerID)) {
log.error("locker [{}] failed", lockerID);
return;
}
if (top2LevelCities.isEmpty()) {
log.info("empty level 1 and level 2 cities");
return;
}
LocalDate preMonth = now.minusMonths(1);
LocalDate month = LocalDate.of(preMonth.getYear(), preMonth.getMonth(), 1);
try {
Stream.of(Brand.values()).forEach(
b -> syncCityLivingAndWorkingUV(b.getCode(), month, top2LevelCities)
);
} finally {
release(lockerID);
}
}
}
3.使用shedlock实现分布式定时任务锁
https://www.jianshu.com/p/941416645606
更多推荐
已为社区贡献2条内容
所有评论(0)