Java zatrzyma usługę modułu wykonującego, gdy jedno z jego przypisanych zadań zawiedzie z jakiegokolwiek powodu

12

Potrzebuję usługi, która uruchomi kilka zadań jednocześnie i w odstępie 1 sekundy przez 1 minutę.

Jeśli jedno z zadań się nie powiedzie, chcę zatrzymać usługę i każde zadanie, które się z nią uruchomiło, z jakimś wskaźnikiem, że coś poszło nie tak, inaczej jeśli po minucie wszystko pójdzie dobrze, usługa zatrzyma się ze wskaźnikiem, że wszystko poszło dobrze.

Na przykład mam 2 funkcje:

Runnable task1 = ()->{
      int num = Math.rand(1,100);
      if (num < 5){
          throw new Exception("something went wrong with this task,terminate");
      }
}

Runnable task2 = ()->{
      int num = Math.rand(1,100)
      return num < 50;
}



ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
task1schedule = scheduledExecutorService.scheduleAtFixedRate(task1, 1, 60, TimeUnit.SECONDS);
task2schedule = scheduledExecutorService.scheduleAtFixedRate(task2, 1, 60, TimeUnit.SECONDS);

if (!task1schedule || !task2schedule) scheduledExecutorService.shutdown();

Jakieś pomysły na to, jak sobie z tym poradzić i uczynić rzeczy tak ogólnymi, jak to możliwe?

totothegreat
źródło
1
Kilka rzeczy oprócz samego pytania, Math.randnie jest wbudowanym interfejsem API. Implementacja Runnablemusi mieć void rundefinicję. Rodzaj task1/2schedulebędzie ScheduledFuture<?>w podanym kontekście. Przechodząc do właściwego pytania, w jaki sposób można go wykorzystać awaitTermination? Możesz to zrobić jak scheduledExecutorService.awaitTermination(1,TimeUnit.MINUTES);. Alternatywnie, a co ze sprawdzeniem, czy którekolwiek z zadań zostało anulowane przed jego normalnym zakończeniem if (task1schedule.isCancelled() || task2schedule.isCancelled()) scheduledExecutorService.shutdown();:?
Naman
2
Planowanie powtarzania zadań co minutę nie ma sensu , ale powiedzmy, że chcesz je zatrzymać „jeśli po minucie wszystko pójdzie dobrze”. Ponieważ w każdym przypadku zatrzymujesz moduł wykonujący, zaplanowanie zadania, które wyłącza moduł wykonawczy po minucie, jest banalne. Przyszłości już wskazują, czy coś poszło nie tak, czy nie. Nie powiedziałeś, jakiego innego rodzaju wskaźnika chcesz.
Holger

Odpowiedzi:

8

Chodzi o to, że zadania przekazują do wspólnego obiektu TaskCompleteEvent. Jeśli zgłoszą błąd, harmonogram zostanie zatrzymany, a wszystkie zadania zatrzymane.

Możesz sprawdzić wyniki każdej iteracji zadania na mapach „błędy” i „sukces”.

public class SchedulerTest {

    @Test
    public void scheduler() throws InterruptedException {
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
        TaskCompleteEvent taskCompleteEvent = new TaskCompleteEvent(scheduledExecutorService);
        Runnable task1 = () -> {
            int num = new Random().nextInt(100);
            if (num < 5) {
                taskCompleteEvent.message("task1-"+UUID.randomUUID().toString(), "Num "+num+" was obatined. Breaking all the executions.", true);
            }
        };
        Runnable task2 = () -> {
            int num = new Random().nextInt(100);
            taskCompleteEvent.message("task2-"+UUID.randomUUID().toString(), num < 50, false);
        };
        scheduledExecutorService.scheduleAtFixedRate(task1, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleAtFixedRate(task2, 0, 1, TimeUnit.SECONDS);
        scheduledExecutorService.awaitTermination(60, TimeUnit.SECONDS);
        System.out.println("Success: "+taskCompleteEvent.getSuccess());
        System.out.println("Errors: "+taskCompleteEvent.getErrors());
        System.out.println("Went well?: "+taskCompleteEvent.getErrors().isEmpty());
    }

    public static class TaskCompleteEvent {

        private final ScheduledExecutorService scheduledExecutorService;
        private final Map<String, Object> errors = new LinkedHashMap<>();
        private final Map<String, Object> success = new LinkedHashMap<>();

        public TaskCompleteEvent(ScheduledExecutorService scheduledExecutorService) {
            this.scheduledExecutorService = scheduledExecutorService;
        }

        public synchronized void message(String id, Object response, boolean error) {
            if (error) {
                errors.put(id, response);
                scheduledExecutorService.shutdown();
            } else {
                success.put(id, response);
            }
        }

        public synchronized Map<String, Object> getErrors() {
            return errors;
        }

        public synchronized Map<String, Object> getSuccess() {
            return success;
        }

    }

}
ravenskater
źródło
2

Musisz tylko dodać dodatkowe zadanie, którego zadaniem jest monitorowanie wszystkich pozostałych uruchomionych zadań - a gdy którekolwiek z monitorowanych zadań nie powiedzie się, muszą ustawić semafor (flagę), który zabójca może sprawdzić.

    ScheduledExecutorService executor = (ScheduledExecutorService) Executors.newScheduledThreadPool(2);

    // INSTANTIATE THE REMOTE-FILE-MONITOR:
    RemoteFileMonitor monitor = new RemoteFileMonitor(remotesource, localtarget);

    // THIS TimerTask PERIODICALLY TRIGGERS THE RemoteFileMonitor: 
    TimerTask remote = new TimerTask() {

        // RUN FORREST... RUN !
        public void run() {

            try { 

                kae.trace("TimerTask::run() --> Calling RemoteFileMonitor.check()");
                monitor.check();

            } catch (Exception ex) {

                // NULL TRAP: ALLOWS US TO CONTINUE AND RETRY:

            }

        }

    };

    // THIS TimerTask PERIODICALLY TRIES TO KILL THE REMOTE-FILE-MONITOR:
    TimerTask assassin = new TimerTask() {

        // WHERE DO BAD FOLKS GO WHEN THEY DIE ? 
        private final LocalDateTime death = LocalDateTime.now().plus(ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);

        // RUN FORREST... RUN !
        public void run() {

            // IS THERE LIFE AFTER DEATH ???
            if (LocalDateTime.now().isAfter(death)) {

                // THEY GO TO A LAKE OF FIRE AND FRY:
                kae.error(ReturnCode.MONITOR_POLLING_CYCLE_EXCEEDED);                   

            }

        }

    };

    // SCHEDULE THE PERIODIC EXECUTION OF THE RemoteFileMonitor: (remote --> run() monitor --> check())
    executor.scheduleAtFixedRate(remote, delay, interval, TimeUnit.MINUTES);

    // SCHEDULE PERIODIC ASSASSINATION ATTEMPTS AGAINST THE RemoteFileMonitor: (assassin --> run() --> after death --> die())
    executor.scheduleAtFixedRate(assassin, delay, 60L, TimeUnit.SECONDS);

    // LOOP UNTIL THE MONITOR COMPLETES:
    do {

        try {

            // I THINK I NEED A NAP:
            Thread.sleep(interval * 10);                

        } catch (InterruptedException e) {

            // FAIL && THEN cleanexit();
            kae.error(ReturnCode.MONITORING_ERROR, "Monitoring of the XXXXXX-Ingestion site was interrupted");

        }

        // NOTE: THE MONITOR IS SET TO 'FINISHED' WHEN THE DONE-File IS DELIVERED AND RETRIEVED:
    } while (monitor.isNotFinished());

    // SHUTDOWN THE MONITOR TASK:
    executor.shutdown();
Greg Patnude
źródło
2
Klasa TimerTaskjest całkowicie niezwiązana z ScheduledExecutorService; po prostu się zdarza Runnable. Co więcej, nie ma sensu planować okresowego zadania, aby sprawdzić, czy osiągnięto określony czas ( ConfigurationOptions.getPollingCycleTime()). Masz ScheduledExecutorService, więc możesz powiedzieć mu, aby zaplanował zadanie we właściwym czasie.
Holger
Implementacja w przykładzie, którego użyłem, polegała na zabiciu wykonania zadania po pewnym czasie, jeśli zadanie nie zostało ukończone. Przypadek użycia to: Jeśli serwer zdalny nie upuścił pliku w ciągu 2 godzin - zabij zadanie. o to prosił PO.
Greg Patnude
Czy przeczytałeś i zrozumiałeś mój komentarz? To nie ma znaczenia, co kod robi, to używa zniechęcony klasę bez powodu, po prostu wymienić TimerTasksię Runnablei masz problem został rozwiązany, nie zmieniając co kod robi. Co więcej, po prostu użyj, executor.schedule(assassin, ConfigurationOptions.getPollingCycleTime(), ChronoUnit.MINUTES);a uruchomi się raz w pożądanym czasie, dlatego if(LocalDateTime.now().isAfter(death))kontrola jest przestarzała. Ponownie, nie zmienia to, co robi kod, poza tym, że robi to znacznie prostsze i bardziej wydajne.
Holger