Zaplanowane zadanie Spring działające w środowisku klastrowym

105

Piszę aplikację, która ma zadanie cron, które jest wykonywane co 60 sekund. Aplikacja jest skonfigurowana do skalowania, gdy jest to wymagane, do wielu wystąpień. Chcę wykonywać zadanie tylko na 1 wystąpieniu co 60 sekund (na dowolnym węźle). Po wyjęciu z pudełka nie mogę znaleźć rozwiązania tego problemu i jestem zaskoczony, że nie pytano o to wcześniej wiele razy. Używam Spring 4.1.6.

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
user3131879
źródło
8
Myślę, że Quartz to najlepsze rozwiązanie dla Ciebie: stackoverflow.com/questions/6663182/…
selalerer
Jakieś sugestie dotyczące używania CronJobw kubernetes?
ch271828n

Odpowiedzi:

103

Istnieje projekt ShedLock , który służy dokładnie do tego celu. Po prostu dodajesz adnotacje do zadań, które powinny być zablokowane podczas wykonywania

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
   // do something
}

Skonfiguruj Spring i LockProvider

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
    ...
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
       return new JdbcTemplateLockProvider(dataSource);
    }
    ...
}
Lukas
źródło
1
Chcę tylko powiedzieć „Dobra robota!”. Ale ... Fajną cechą byłoby, gdyby biblioteka mogła wykryć nazwę bazy danych bez podawania jej wyraźnie w kodzie ... Poza tym, że działa doskonale!
Krzysiek
Działa dla mnie z starterem jpa danych rozruchowych Oracle i Spring.
Mahendran Ayyarsamy Kandiar
Czy to rozwiązanie działa w wersji Spring 3.1.1.RELEASE i java 6? Proszę powiedzieć.
Vikas Sharma
Próbowałem z MsSQL i Spring boot JPA i użyłem skryptu liquibase z części SQL .. działa dobrze .. Dzięki
arkusz
To rzeczywiście działa dobrze. Jednak spotkałem się tutaj z nieco złożonym przypadkiem, czy mógłbyś rzucić okiem. Dzięki!!! stackoverflow.com/questions/57691205/…
Dayton Wang
15

Jest to kolejny prosty i niezawodny sposób bezpiecznego wykonywania zadania w klastrze. Możesz bazować na bazie danych i wykonać zadanie tylko wtedy, gdy węzeł jest „liderem” w klastrze.

Również w przypadku awarii lub zamknięcia węzła w klastrze inny węzeł został liderem.

Wystarczy, że stworzysz mechanizm „wyboru lidera” i za każdym razem sprawdzisz, czy jesteś liderem:

@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

Wykonaj następujące kroki:

1. Zdefiniuj obiekt i tabelę, która zawiera jeden wpis na węzeł w klastrze:

@Entity(name = "SYS_NODE")
public class SystemNode {

/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;

/** The ip. */
@Column(name = "IP")
private String ip;

/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;

/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();

/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;

public Long getId() {
    return id;
}

public void setId(final Long id) {
    this.id = id;
}

public String getTimestamp() {
    return timestamp;
}

public void setTimestamp(final String timestamp) {
    this.timestamp = timestamp;
}

public String getIp() {
    return ip;
}

public void setIp(final String ip) {
    this.ip = ip;
}

public Date getLastPing() {
    return lastPing;
}

public void setLastPing(final Date lastPing) {
    this.lastPing = lastPing;
}

public Date getCreatedAt() {
    return createdAt;
}

public void setCreatedAt(final Date createdAt) {
    this.createdAt = createdAt;
}

public Boolean getIsLeader() {
    return isLeader;
}

public void setIsLeader(final Boolean isLeader) {
    this.isLeader = isLeader;
}

@Override
public String toString() {
    return "SystemNode{" +
            "id=" + id +
            ", timestamp='" + timestamp + '\'' +
            ", ip='" + ip + '\'' +
            ", lastPing=" + lastPing +
            ", createdAt=" + createdAt +
            ", isLeader=" + isLeader +
            '}';
}

}

2. Utwórz usługę, która a) wstawi węzeł do bazy danych, b) sprawdź lidera

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {

/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);

/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";

/** The ip. */
private String ip;

/** The system service. */
private SystemService systemService;

/** The system node repository. */
private SystemNodeRepository systemNodeRepository;

@Autowired
public void setSystemService(final SystemService systemService) {
    this.systemService = systemService;
}

@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
    this.systemNodeRepository = systemNodeRepository;
}

@Override
public void pingNode() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    if (node == null) {
        createNode();
    } else {
        updateNode(node);
    }
}

@Override
public void checkLeaderShip() {
    final List<SystemNode> allList = systemNodeRepository.findAll();
    final List<SystemNode> aliveList = filterAliveNodes(allList);

    SystemNode leader = findLeader(allList);
    if (leader != null && aliveList.contains(leader)) {
        setLeaderFlag(allList, Boolean.FALSE);
        leader.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    } else {
        final SystemNode node = findMinNode(aliveList);

        setLeaderFlag(allList, Boolean.FALSE);
        node.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    }
}

/**
 * Returns the leaded
 * @param list
 *          the list
 * @return  the leader
 */
private SystemNode findLeader(final List<SystemNode> list) {
    for (SystemNode systemNode : list) {
        if (systemNode.getIsLeader()) {
            return systemNode;
        }
    }
    return null;
}

@Override
public boolean isLeader() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    return node != null && node.getIsLeader();
}

@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
    try {
        ip = InetAddress.getLocalHost().getHostAddress();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (applicationEvent instanceof ContextRefreshedEvent) {
        pingNode();
    }
}

/**
 * Creates the node
 */
private void createNode() {
    final SystemNode node = new SystemNode();
    node.setIp(ip);
    node.setTimestamp(String.valueOf(System.currentTimeMillis()));
    node.setCreatedAt(new Date());
    node.setLastPing(new Date());
    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
    systemNodeRepository.save(node);
}

/**
 * Updates the node
 */
private void updateNode(final SystemNode node) {
    node.setLastPing(new Date());
    systemNodeRepository.save(node);
}

/**
 * Returns the alive nodes.
 *
 * @param list
 *         the list
 * @return the alive nodes
 */
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
    final List<SystemNode> finalList = new LinkedList<>();
    for (SystemNode systemNode : list) {
        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
            finalList.add(systemNode);
        }
    }
    if (CollectionUtils.isEmpty(finalList)) {
        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
    }
    return finalList;
}

/**
 * Finds the min name node.
 *
 * @param list
 *         the list
 * @return the min node
 */
private SystemNode findMinNode(final List<SystemNode> list) {
    SystemNode min = list.get(0);
    for (SystemNode systemNode : list) {
        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
            min = systemNode;
        }
    }
    return min;
}

/**
 * Sets the leader flag.
 *
 * @param list
 *         the list
 * @param value
 *         the value
 */
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
    for (SystemNode systemNode : list) {
        systemNode.setIsLeader(value);
    }
}

}

3. wysłanie bazy danych do wysłania, że ​​żyjesz

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
    systemNodeService.pingNode();
}

@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
    systemNodeService.checkLeaderShip();
}

4. jesteś gotowy! Przed wykonaniem zadania sprawdź tylko, czy jesteś liderem:

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}
mspapant
źródło
W takim przypadku co to jest SystemService i SettingEnum? Wygląda na to, że jest to niezwykle proste i po prostu zwraca wartość limitu czasu. W takim razie dlaczego nie po prostu zakodować limitu czasu?
tlavarea
@mspapant, co to jest SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT? Jaka jest optymalna wartość, której powinienem tutaj użyć?
user525146
@tlavarea czy zaimplementowałeś ten kod, mam pytanie dotyczące metody DateUtils.hasExpired? czy jest to metoda niestandardowa, czy jest to typowe narzędzie Apache?
user525146
10

Zadania wsadowe i zaplanowane są zwykle uruchamiane na własnych, autonomicznych serwerach, z dala od aplikacji przeznaczonych dla klientów, więc nie jest typowym wymogiem dołączania zadania do aplikacji, która ma działać w klastrze. Ponadto zadania w środowiskach klastrowych zazwyczaj nie wymagają martwienia się o inne wystąpienia tego samego zadania działające równolegle, więc jest to kolejny powód, dla którego izolacja wystąpień zadań nie jest dużym wymaganiem.

Prostym rozwiązaniem byłoby skonfigurowanie zadań w profilu sprężystym. Na przykład, jeśli aktualna konfiguracja to:

<beans>
  <bean id="someBean" .../>

  <task:scheduled-tasks>
    <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
  </task:scheduled-tasks>
</beans>

zmień to na:

<beans>
  <beans profile="scheduled">
    <bean id="someBean" .../>

    <task:scheduled-tasks>
      <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
  </beans>
</beans>

Następnie uruchom aplikację na jednym komputerze z scheduledaktywnym profilem ( -Dspring.profiles.active=scheduled).

Jeśli z jakiegoś powodu serwer główny stanie się niedostępny, po prostu uruchom inny serwer z włączonym profilem, a wszystko będzie działać dobrze.


Sytuacja się zmienia, jeśli chcesz również automatycznej pracy awaryjnej dla zadań. Następnie będziesz musiał utrzymać zadanie uruchomione na wszystkich serwerach i sprawdzić synchronizację za pomocą wspólnego zasobu, takiego jak tabela bazy danych, klastrowana pamięć podręczna, zmienna JMX itp.

manish
źródło
58
Jest to poprawne obejście, ale naruszy to ideę posiadania środowiska klastrowego, w którym jeśli węzeł nie działa, inny węzeł może obsługiwać inne żądania. W tym obejściu, jeśli węzeł z profilem „zaplanowanym” przestanie działać, to zadanie w tle nie będzie działać
Ahmed Hashem,
3
Myślę, że moglibyśmy użyć Redis z atomic geti setoperacją, aby to zarchiwizować.
Thanh Nguyen Van
Istnieje kilka problemów z Twoją sugestią: 1. Zwykle chciałbyś, aby każdy węzeł klastra miał dokładnie taką samą konfigurację, aby były w 100% wymienne i wymagały tych samych zasobów przy tym samym obciążeniu, które współdzielą. 2. Twoje rozwiązanie wymagałoby ręcznej interwencji w przypadku awarii węzła „zadania”. 3. Nadal nie gwarantuje, że zadanie zostało faktycznie wykonane pomyślnie, ponieważ węzeł „zadania” przestał działać, zanim zakończył przetwarzanie bieżącego wykonania, a nowy „moduł uruchamiania zadań” został utworzony po awarii pierwszego, nie wiedząc, czy skończył się czy nie.
Moshe Bixenshpaner,
1
po prostu narusza ideę środowisk klastrowych, nie może być żadnego rozwiązania z sugerowanym podejściem. Nie można replikować nawet serwerów profili, aby zapewnić dostępność, ponieważ spowoduje to dodatkowe koszty i niepotrzebne marnotrawstwo zasobów. Rozwiązanie sugerowane przez @Thanh jest znacznie czystsze niż to. Pomyśl o tym samym, co MUTEX. Każdy serwer, na którym działa skrypt, uzyska tymczasową blokadę w rozproszonej pamięci podręcznej, takiej jak redis, a następnie będzie kontynuował koncepcje tradycyjnego blokowania.
anuj pradhan
2

Do blokowania używam tabeli bazy danych. Tylko jedno zadanie na raz może wykonać wstawkę do tabeli. Drugi otrzyma wyjątek DuplicateKeyException. Logika wstawiania i usuwania jest obsługiwana przez aspekt wokół adnotacji @Scheduled. Używam Spring Boot 2.0

@Component
@Aspect
public class SchedulerLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);

    @Autowired
    private JdbcTemplate jdbcTemplate;  

    @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
    public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {

        String jobSignature = joinPoint.getSignature().toString();
        try {
            jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});

            Object proceed = joinPoint.proceed();

            jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
            return proceed;

        }catch (DuplicateKeyException e) {
            LOGGER.warn("Job is currently locked: "+jobSignature);
            return null;
        }
    }
}


@Component
public class EveryTenSecondJob {

    @Scheduled(cron = "0/10 * * * * *")
    public void taskExecution() {
        System.out.println("Hello World");
    }
}


CREATE TABLE scheduler_lock(
    signature varchar(255) NOT NULL,
    date datetime DEFAULT NULL,
    PRIMARY KEY(signature)
);
RenRen
źródło
3
Myślisz, że to zadziała idealnie? Ponieważ jeśli jeden z węzłów ulegnie awarii po zablokowaniu, to inni nie dowiedzą się, dlaczego jest blokada (w Twoim przypadku pozycja wiersza odpowiadająca zadaniu w tabeli).
Badman,
2

dlock jest przeznaczony do uruchamiania zadań tylko raz przy użyciu indeksów bazy danych i ograniczeń. Możesz po prostu zrobić coś takiego jak poniżej.

@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {

}

Zobacz artykuł o używaniu go.

Will Hughes
źródło
4
Jeśli używasz dlock .Assume, używamy DB do utrzymania blokady. I jeden z węzłów w klastrze nieoczekiwanie przestał działać po zablokowaniu, co się stanie w tym scenariuszu? Czy będzie w stanie impasu?
Badman,
0

Aby to osiągnąć, można użyć wbudowanego programu planującego, takiego jak program planujący db . Ma trwałe wykonania i wykorzystuje prosty optymistyczny mechanizm blokujący, aby zagwarantować wykonanie przez pojedynczy węzeł.

Przykładowy kod, jak można osiągnąć przypadek użycia:

   RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
    .execute((taskInstance, executionContext) -> {
        System.out.println("Executing " + taskInstance.getTaskAndInstance());
    });

   final Scheduler scheduler = Scheduler
          .create(dataSource)
          .startTasks(recurring1)
          .build();

   scheduler.start();
Gustav Karlsson
źródło
-2

Kontekst Spring nie jest klastrowany, więc zarządzanie zadaniem w aplikacji rozproszonej jest trochę trudne i musisz użyć systemów obsługujących jgroup, aby zsynchronizować stan i pozwolić zadaniu przyjąć priorytet do wykonania akcji. Lub możesz użyć kontekstu ejb do zarządzania klastrowaną usługą ha singleton, taką jak środowisko jboss ha https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd Lub możesz użyć klastrowanej pamięci podręcznej i zasobu blokady dostępu między usługą a pierwszą usługą weź blokadę, aby wykonać akcję lub zaimplementować własną jgroup, aby komunikować się z usługą i wykonywać akcję jeden węzeł

Abdulghaffar Al-Labadi
źródło