Piszę aplikację, która ma zadanie cron, które jest wykonywane co 60 sekund. Aplikacja jest skonfigurowana do skalowania, gdy jest to wymagane, do wielu wystąpień. Chcę wykonywać zadanie tylko na 1 wystąpieniu co 60 sekund (na dowolnym węźle). Po wyjęciu z pudełka nie mogę znaleźć rozwiązania tego problemu i jestem zaskoczony, że nie pytano o to wcześniej wiele razy. Używam Spring 4.1.6.
<task:scheduled-tasks>
<task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
</task:scheduled-tasks>
spring
spring-scheduled
user3131879
źródło
źródło
CronJob
wkubernetes
?Odpowiedzi:
Istnieje projekt ShedLock , który służy dokładnie do tego celu. Po prostu dodajesz adnotacje do zadań, które powinny być zablokowane podczas wykonywania
@Scheduled( ... ) @SchedulerLock(name = "scheduledTaskName") public void scheduledTask() { // do something }
Skonfiguruj Spring i LockProvider
@Configuration @EnableScheduling @EnableSchedulerLock(defaultLockAtMostFor = "10m") class MySpringConfiguration { ... @Bean public LockProvider lockProvider(DataSource dataSource) { return new JdbcTemplateLockProvider(dataSource); } ... }
źródło
Myślę, że trzeba użyć Quartz klastrowania z JDBC-JobStore do tego celu
źródło
Jest to kolejny prosty i niezawodny sposób bezpiecznego wykonywania zadania w klastrze. Możesz bazować na bazie danych i wykonać zadanie tylko wtedy, gdy węzeł jest „liderem” w klastrze.
Również w przypadku awarii lub zamknięcia węzła w klastrze inny węzeł został liderem.
Wystarczy, że stworzysz mechanizm „wyboru lidera” i za każdym razem sprawdzisz, czy jesteś liderem:
@Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
Wykonaj następujące kroki:
1. Zdefiniuj obiekt i tabelę, która zawiera jeden wpis na węzeł w klastrze:
@Entity(name = "SYS_NODE") public class SystemNode { /** The id. */ @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id; /** The name. */ @Column(name = "TIMESTAMP") private String timestamp; /** The ip. */ @Column(name = "IP") private String ip; /** The last ping. */ @Column(name = "LAST_PING") private Date lastPing; /** The last ping. */ @Column(name = "CREATED_AT") private Date createdAt = new Date(); /** The last ping. */ @Column(name = "IS_LEADER") private Boolean isLeader = Boolean.FALSE; public Long getId() { return id; } public void setId(final Long id) { this.id = id; } public String getTimestamp() { return timestamp; } public void setTimestamp(final String timestamp) { this.timestamp = timestamp; } public String getIp() { return ip; } public void setIp(final String ip) { this.ip = ip; } public Date getLastPing() { return lastPing; } public void setLastPing(final Date lastPing) { this.lastPing = lastPing; } public Date getCreatedAt() { return createdAt; } public void setCreatedAt(final Date createdAt) { this.createdAt = createdAt; } public Boolean getIsLeader() { return isLeader; } public void setIsLeader(final Boolean isLeader) { this.isLeader = isLeader; } @Override public String toString() { return "SystemNode{" + "id=" + id + ", timestamp='" + timestamp + '\'' + ", ip='" + ip + '\'' + ", lastPing=" + lastPing + ", createdAt=" + createdAt + ", isLeader=" + isLeader + '}'; }
}
2. Utwórz usługę, która a) wstawi węzeł do bazy danych, b) sprawdź lidera
@Service @Transactional public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener { /** The logger. */ private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class); /** The constant NO_ALIVE_NODES. */ private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}"; /** The ip. */ private String ip; /** The system service. */ private SystemService systemService; /** The system node repository. */ private SystemNodeRepository systemNodeRepository; @Autowired public void setSystemService(final SystemService systemService) { this.systemService = systemService; } @Autowired public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) { this.systemNodeRepository = systemNodeRepository; } @Override public void pingNode() { final SystemNode node = systemNodeRepository.findByIp(ip); if (node == null) { createNode(); } else { updateNode(node); } } @Override public void checkLeaderShip() { final List<SystemNode> allList = systemNodeRepository.findAll(); final List<SystemNode> aliveList = filterAliveNodes(allList); SystemNode leader = findLeader(allList); if (leader != null && aliveList.contains(leader)) { setLeaderFlag(allList, Boolean.FALSE); leader.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } else { final SystemNode node = findMinNode(aliveList); setLeaderFlag(allList, Boolean.FALSE); node.setIsLeader(Boolean.TRUE); systemNodeRepository.save(allList); } } /** * Returns the leaded * @param list * the list * @return the leader */ private SystemNode findLeader(final List<SystemNode> list) { for (SystemNode systemNode : list) { if (systemNode.getIsLeader()) { return systemNode; } } return null; } @Override public boolean isLeader() { final SystemNode node = systemNodeRepository.findByIp(ip); return node != null && node.getIsLeader(); } @Override public void onApplicationEvent(final ApplicationEvent applicationEvent) { try { ip = InetAddress.getLocalHost().getHostAddress(); } catch (Exception e) { throw new RuntimeException(e); } if (applicationEvent instanceof ContextRefreshedEvent) { pingNode(); } } /** * Creates the node */ private void createNode() { final SystemNode node = new SystemNode(); node.setIp(ip); node.setTimestamp(String.valueOf(System.currentTimeMillis())); node.setCreatedAt(new Date()); node.setLastPing(new Date()); node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll())); systemNodeRepository.save(node); } /** * Updates the node */ private void updateNode(final SystemNode node) { node.setLastPing(new Date()); systemNodeRepository.save(node); } /** * Returns the alive nodes. * * @param list * the list * @return the alive nodes */ private List<SystemNode> filterAliveNodes(final List<SystemNode> list) { int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class); final List<SystemNode> finalList = new LinkedList<>(); for (SystemNode systemNode : list) { if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) { finalList.add(systemNode); } } if (CollectionUtils.isEmpty(finalList)) { LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list)); throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list)); } return finalList; } /** * Finds the min name node. * * @param list * the list * @return the min node */ private SystemNode findMinNode(final List<SystemNode> list) { SystemNode min = list.get(0); for (SystemNode systemNode : list) { if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) { min = systemNode; } } return min; } /** * Sets the leader flag. * * @param list * the list * @param value * the value */ private void setLeaderFlag(final List<SystemNode> list, final Boolean value) { for (SystemNode systemNode : list) { systemNode.setIsLeader(value); } }
}
3. wysłanie bazy danych do wysłania, że żyjesz
@Override @Scheduled(cron = "0 0/5 * * * ?") public void executeSystemNodePing() { systemNodeService.pingNode(); } @Override @Scheduled(cron = "0 0/10 * * * ?") public void executeLeaderResolution() { systemNodeService.checkLeaderShip(); }
4. jesteś gotowy! Przed wykonaniem zadania sprawdź tylko, czy jesteś liderem:
@Override @Scheduled(cron = "*/30 * * * * *") public void executeFailedEmailTasks() { if (checkIfLeader()) { final List<EmailTask> list = emailTaskService.getFailedEmailTasks(); for (EmailTask emailTask : list) { dispatchService.sendEmail(emailTask); } } }
źródło
Zadania wsadowe i zaplanowane są zwykle uruchamiane na własnych, autonomicznych serwerach, z dala od aplikacji przeznaczonych dla klientów, więc nie jest typowym wymogiem dołączania zadania do aplikacji, która ma działać w klastrze. Ponadto zadania w środowiskach klastrowych zazwyczaj nie wymagają martwienia się o inne wystąpienia tego samego zadania działające równolegle, więc jest to kolejny powód, dla którego izolacja wystąpień zadań nie jest dużym wymaganiem.
Prostym rozwiązaniem byłoby skonfigurowanie zadań w profilu sprężystym. Na przykład, jeśli aktualna konfiguracja to:
<beans> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans>
zmień to na:
<beans> <beans profile="scheduled"> <bean id="someBean" .../> <task:scheduled-tasks> <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/> </task:scheduled-tasks> </beans> </beans>
Następnie uruchom aplikację na jednym komputerze z
scheduled
aktywnym profilem (-Dspring.profiles.active=scheduled
).Jeśli z jakiegoś powodu serwer główny stanie się niedostępny, po prostu uruchom inny serwer z włączonym profilem, a wszystko będzie działać dobrze.
Sytuacja się zmienia, jeśli chcesz również automatycznej pracy awaryjnej dla zadań. Następnie będziesz musiał utrzymać zadanie uruchomione na wszystkich serwerach i sprawdzić synchronizację za pomocą wspólnego zasobu, takiego jak tabela bazy danych, klastrowana pamięć podręczna, zmienna JMX itp.
źródło
get
iset
operacją, aby to zarchiwizować.Do blokowania używam tabeli bazy danych. Tylko jedno zadanie na raz może wykonać wstawkę do tabeli. Drugi otrzyma wyjątek DuplicateKeyException. Logika wstawiania i usuwania jest obsługiwana przez aspekt wokół adnotacji @Scheduled. Używam Spring Boot 2.0
@Component @Aspect public class SchedulerLock { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class); @Autowired private JdbcTemplate jdbcTemplate; @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))") public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable { String jobSignature = joinPoint.getSignature().toString(); try { jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()}); Object proceed = joinPoint.proceed(); jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature}); return proceed; }catch (DuplicateKeyException e) { LOGGER.warn("Job is currently locked: "+jobSignature); return null; } } }
@Component public class EveryTenSecondJob { @Scheduled(cron = "0/10 * * * * *") public void taskExecution() { System.out.println("Hello World"); } }
CREATE TABLE scheduler_lock( signature varchar(255) NOT NULL, date datetime DEFAULT NULL, PRIMARY KEY(signature) );
źródło
dlock jest przeznaczony do uruchamiania zadań tylko raz przy użyciu indeksów bazy danych i ograniczeń. Możesz po prostu zrobić coś takiego jak poniżej.
@Scheduled(cron = "30 30 3 * * *") @TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES) public void execute() { }
Zobacz artykuł o używaniu go.
źródło
Aby to osiągnąć, można użyć wbudowanego programu planującego, takiego jak program planujący db . Ma trwałe wykonania i wykorzystuje prosty optymistyczny mechanizm blokujący, aby zagwarantować wykonanie przez pojedynczy węzeł.
Przykładowy kod, jak można osiągnąć przypadek użycia:
RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60))) .execute((taskInstance, executionContext) -> { System.out.println("Executing " + taskInstance.getTaskAndInstance()); }); final Scheduler scheduler = Scheduler .create(dataSource) .startTasks(recurring1) .build(); scheduler.start();
źródło
Kontekst Spring nie jest klastrowany, więc zarządzanie zadaniem w aplikacji rozproszonej jest trochę trudne i musisz użyć systemów obsługujących jgroup, aby zsynchronizować stan i pozwolić zadaniu przyjąć priorytet do wykonania akcji. Lub możesz użyć kontekstu ejb do zarządzania klastrowaną usługą ha singleton, taką jak środowisko jboss ha https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd Lub możesz użyć klastrowanej pamięci podręcznej i zasobu blokady dostępu między usługą a pierwszą usługą weź blokadę, aby wykonać akcję lub zaimplementować własną jgroup, aby komunikować się z usługą i wykonywać akcję jeden węzeł
źródło