티스토리 뷰

반응형

이번에 배치 실패 시 알림을 보내는 기능을 구현 하면서 배치에서 제공하는 리스너에 대해 정리한 내용을 공유하려고 합니다

 

스프링 배치 프레임 워크

다음 아키텍처는 스프링 배치의 프레임 워크의 구성 요소를 보여 줍니다

스프링 배치의 주요 구성요소 및 전체 프로세스 흐름
 

스프링 배치 작업은 오랫동안 실행될 수 있으며 진행 정보를 제공하는 것이 중요합니다. 진행 중인 작업, 실패한 작업 및 완료된 작업 등 모든 정보는 배치 이벤트 리스너를 사용해서 수집 및 가공할 수 있습니다.

 
Process flow from Step to Tasklet
 

서비스 운영 중 배치의 실패 여부를 알기 위해 확인을 위한 알림이 필요했습니다. 배치는 여러 개의 이벤트 리스너를 제공하며 그중 Job , Step 을 보조해주는 JobExectionListener 와 StepExecutionListener 가 성공과 실패에 관계없이 이벤트 결과값을 반환해주며 JobExecutionListener 를 사용해 특정 실행 Job 이 실패 시 Slack 으로 배치 실패 알림을 보내도록 하였습니다.

 

스프링 배치 리스너의 종류의 정리하고 간단한 예시를 만들어 보았습니다.

 

1. JobExecutionListener

JobExecutionListener 는 Job 단계 전과 후에 정보를 제공합니다

  • Job 의 성공여부와 상관없이 호출됩니다
  • @BeforeJob, @ AfterJob
@Slf4j
public class CustomJobListener implements JobExecutionListener {

  @Override
  public void beforeJob(JobExecution jobExecution) {
    log.info("--beforeJob");

  }
  @Override
  public void afterJob(JobExecution jobExecution) {
    log.info("--afterJob");
    if (jobExecution.getStatus() == BatchStatus.FAILED) {
      String jobName = jobExecution.getJobInstance().getJobName();
      log.info(jobName,":잡이 실패했습니다~");
    }
  }
}
@Bean
  public Job job(){
    return jobBuilderFactory.get("job")
        .start(step01())
        .next(step02())
        .listener(new CustomJobListener())
        .build();
  }

 

2. StepExecutionListener

StepExecutionListener 는 Step 단계 전과 후에 정보를 제공합니다

  • Step 의 성공여부와 상관없이 호출됩니다
  • @BeforeStep , @AfterStep
@Slf4j
@Component
public class CustomStepExecutionListener implements StepExecutionListener {

  @Override
  public void beforeStep(StepExecution stepExecution) {
    log.info("--beforeStep");
  }

  @Override
  public ExitStatus afterStep(StepExecution stepExecution) {
    log.info("--afterStep");
    String stepName = stepExecution.getStepName();
    log.info("--stepName:",stepName);
    ExitStatus exitStatus = stepExecution.getExitStatus();
    return exitStatus;
  }
}
@Bean
  public Step step01() {
    return stepBuilderFactory.get("step01")
        .<String, String>chunk(10)
        .reader(listItemReader())
        .processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
        .writer((ItemWriter<String>) items -> {
          System.out.println("items =" + items);
        })
        .listener(new CustomStepExecutionListener())
        .build();

  }

  @Bean
  public Step step02() {
    return stepBuilderFactory.get("step02")
        .tasklet((contribution, chunkContext) -> {
          return RepeatStatus.FINISHED;
        })
        .listener(new CustomStepExecutionListener())
        .build();
  }

 

3. ChunkListener

ChunkListener chunck 를 처리하기 전이나 후 오류 발생시 호출됩니다

  • @BeforeChunk , @AfterChunk, @AfterChunkError
@Slf4j
public class CustomChunkListener implements ChunkListener {

  @Override
  public void beforeChunk(ChunkContext context) {
    // read 가 호출되기 전 단계 입니다
    log.info("--beforeChunk");
  }

  @Override
  public void afterChunk(ChunkContext context) {
    // 에러가 발생되면 호출되지 않습니다
    log.info("--afterChunk");

  }
  @Override
  public void afterChunkError(ChunkContext context) {
    //에러가 발생되면 호출됩니다
    log.info("--afterChunkError");
  }
}
@Bean
  public Step step01() {
    return stepBuilderFactory.get("step01")
        .<String, String>chunk(10)
        .reader(listItemReader())
        .processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
        .writer((ItemWriter<String>) items -> {
          System.out.println("items =" + items);
        })
        .listener(new CustomChunkListener())
        .build();
  }

 

4. ItemReadListener

ItemReadListener read 메서드 호출전, 후, 오류시 호출됩니다

  • @BeforeRead, @AfterRead, @OnReadError
@Slf4j
public class CustomItemReadListener implements ItemReadListener {

  @Override
  public void beforeRead() throws Exception {
    log.info("--beforeRead");
  }

  @Override
  public void afterRead(Object item) throws Exception {
    log.info("--afterRead");
  }

  @Override
  public void onReadError(Exception ex) throws Exception {
    log.info("--onReadError");

  }
}
@Bean
  public Step step01() {
    return stepBuilderFactory.get("step01")
        .<String, String>chunk(10)
        .reader(listItemReader())
        .processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
        .writer((ItemWriter<String>) items -> {
          System.out.println("items =" + items);
        })
        .listener(new CustomItemReadListener())
        .build();
  }

 

5. ItemProcessListener

ItemProcessListener는 process메소드 호출전 후 오류 발생시 호출됩니다

  • @BeforeProcess, @AfterProcess, @OnProcessError
@Slf4j
public class CustomItemProcessListener implements ItemProcessListener<Integer, String> {

  @Override
  public void beforeProcess(Integer item) {
    log.info("--beforeProcess");
  }

  @Override
  public void afterProcess(Integer item, String result) {
    log.info("--afterProcess");
  }

  @Override
  public void onProcessError(Integer item, Exception e) {
    log.info("--onProcessError");
  }
}
@Bean
  public Step step01() {
    return stepBuilderFactory.get("step01")
        .<String, String>chunk(10)
        .reader(listItemReader())
        .processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
        .writer((ItemWriter<String>) items -> {
          System.out.println("items =" + items);
        })
        .listener(new CustomItemProcessListener())
        .build();
  }

 

6. ItemWriteListener

ItemWriterListener는 writer 메서드 호출전 후 오류 발생시 호출됩니다

  • @BeforeWrite, @AfterWrite, @OnWriteError
@Slf4j
public class CustomItemWriterListner implements ItemWriteListener {

  @Override
  public void beforeWrite(List items) {
    log.info("--beforeWrite");
  }

  @Override
  public void afterWrite(List items) {
    log.info("--afterWrite");
  }

  @Override
  public void onWriteError(Exception exception, List items) {
    log.info("--onWriteError");
  }
}
@Bean
  public Step step01() {
    return stepBuilderFactory.get("step01")
        .<String, String>chunk(10)
        .reader(listItemReader())
        .processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
        .writer((ItemWriter<String>) items -> {
          System.out.println("items =" + items);
        })
        .listener(new CustomItemWriterListner())
        .build();
  }

 

7. SkipListener

SkipListener read write process 중 skip이 발생한 경우 호출됩니다

  • @OnSkipRead, @OnSkipInWrite, @OnSkipInPorcess
@Slf4j
public class CustomSkipListener implements SkipListener {
  @Override
  public void onSkipInRead(Throwable t) {
    log.info("--onSkipInRead");
  }

  @Override
  public void onSkipInWrite(Object item, Throwable t) {
    log.info("--onSkipInWrite");
  }

  @Override
  public void onSkipInProcess(Object item, Throwable t) {
    log.info("--onSkipInProcess");
  }
}
@Bean
public Step step01() {
  return stepBuilderFactory.get("step01")
      .<String,String >chunk(10)
      .reader(listItemReader())
      .processor((Function<? super String, ? extends String>) firstName -> firstName +"직방")
      .writer((ItemWriter<String>) items -> {
        System.out.println("items =" + items);
      })
      .faultTolerant()
      .skip(SQLException.class)
      .listener(new CustomSkipListener())
      .build();
}

 

8. RetryListener

RetryListener retry 전 후 오류 시 호출 되며 open() 반환값에 따라 재시도 를 합니다

@Slf4j
public class CustomRetryListener implements RetryListener {
  @Override
  public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
    log.info("--open");
    return true;
  }

  @Override
  public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
    log.info("--close");
  }

  @Override
  public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
    log.info("--onError");
  }
}
@Bean
public Step step01() {
  return stepBuilderFactory.get("step01")
      .<String,String >chunk(10)
      .reader(listItemReader())
      .processor((Function<? super String, ? extends String>) firstName -> firstName +"직방")
      .writer((ItemWriter<String>) items -> {
        System.out.println("items =" + items);
      })
      .faultTolerant()
      .retry(SQLException.class)
      .listener(new CustomRetryListener())
      .build();
}

 

반응형
댓글
공지사항