티스토리 뷰
이번에 배치 실패 시 알림을 보내는 기능을 구현 하면서 배치에서 제공하는 리스너에 대해 정리한 내용을 공유하려고 합니다
스프링 배치 프레임 워크
다음 아키텍처는 스프링 배치의 프레임 워크의 구성 요소를 보여 줍니다
스프링 배치 작업은 오랫동안 실행될 수 있으며 진행 정보를 제공하는 것이 중요합니다. 진행 중인 작업, 실패한 작업 및 완료된 작업 등 모든 정보는 배치 이벤트 리스너를 사용해서 수집 및 가공할 수 있습니다.
서비스 운영 중 배치의 실패 여부를 알기 위해 확인을 위한 알림이 필요했습니다. 배치는 여러 개의 이벤트 리스너를 제공하며 그중 Job , Step 을 보조해주는 JobExectionListener 와 StepExecutionListener 가 성공과 실패에 관계없이 이벤트 결과값을 반환해주며 JobExecutionListener 를 사용해 특정 실행 Job 이 실패 시 Slack 으로 배치 실패 알림을 보내도록 하였습니다.
스프링 배치 리스너의 종류의 정리하고 간단한 예시를 만들어 보았습니다.
1. JobExecutionListener
JobExecutionListener 는 Job 단계 전과 후에 정보를 제공합니다
- Job 의 성공여부와 상관없이 호출됩니다
- @BeforeJob, @ AfterJob
@Slf4j
public class CustomJobListener implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
log.info("--beforeJob");
}
@Override
public void afterJob(JobExecution jobExecution) {
log.info("--afterJob");
if (jobExecution.getStatus() == BatchStatus.FAILED) {
String jobName = jobExecution.getJobInstance().getJobName();
log.info(jobName,":잡이 실패했습니다~");
}
}
}
@Bean
public Job job(){
return jobBuilderFactory.get("job")
.start(step01())
.next(step02())
.listener(new CustomJobListener())
.build();
}
2. StepExecutionListener
StepExecutionListener 는 Step 단계 전과 후에 정보를 제공합니다
- Step 의 성공여부와 상관없이 호출됩니다
- @BeforeStep , @AfterStep
@Slf4j
@Component
public class CustomStepExecutionListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
log.info("--beforeStep");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
log.info("--afterStep");
String stepName = stepExecution.getStepName();
log.info("--stepName:",stepName);
ExitStatus exitStatus = stepExecution.getExitStatus();
return exitStatus;
}
}
@Bean
public Step step01() {
return stepBuilderFactory.get("step01")
.<String, String>chunk(10)
.reader(listItemReader())
.processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
.writer((ItemWriter<String>) items -> {
System.out.println("items =" + items);
})
.listener(new CustomStepExecutionListener())
.build();
}
@Bean
public Step step02() {
return stepBuilderFactory.get("step02")
.tasklet((contribution, chunkContext) -> {
return RepeatStatus.FINISHED;
})
.listener(new CustomStepExecutionListener())
.build();
}
3. ChunkListener
ChunkListener chunck 를 처리하기 전이나 후 오류 발생시 호출됩니다
- @BeforeChunk , @AfterChunk, @AfterChunkError
@Slf4j
public class CustomChunkListener implements ChunkListener {
@Override
public void beforeChunk(ChunkContext context) {
// read 가 호출되기 전 단계 입니다
log.info("--beforeChunk");
}
@Override
public void afterChunk(ChunkContext context) {
// 에러가 발생되면 호출되지 않습니다
log.info("--afterChunk");
}
@Override
public void afterChunkError(ChunkContext context) {
//에러가 발생되면 호출됩니다
log.info("--afterChunkError");
}
}
@Bean
public Step step01() {
return stepBuilderFactory.get("step01")
.<String, String>chunk(10)
.reader(listItemReader())
.processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
.writer((ItemWriter<String>) items -> {
System.out.println("items =" + items);
})
.listener(new CustomChunkListener())
.build();
}
4. ItemReadListener
ItemReadListener read 메서드 호출전, 후, 오류시 호출됩니다
- @BeforeRead, @AfterRead, @OnReadError
@Slf4j
public class CustomItemReadListener implements ItemReadListener {
@Override
public void beforeRead() throws Exception {
log.info("--beforeRead");
}
@Override
public void afterRead(Object item) throws Exception {
log.info("--afterRead");
}
@Override
public void onReadError(Exception ex) throws Exception {
log.info("--onReadError");
}
}
@Bean
public Step step01() {
return stepBuilderFactory.get("step01")
.<String, String>chunk(10)
.reader(listItemReader())
.processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
.writer((ItemWriter<String>) items -> {
System.out.println("items =" + items);
})
.listener(new CustomItemReadListener())
.build();
}
5. ItemProcessListener
ItemProcessListener는 process메소드 호출전 후 오류 발생시 호출됩니다
- @BeforeProcess, @AfterProcess, @OnProcessError
@Slf4j
public class CustomItemProcessListener implements ItemProcessListener<Integer, String> {
@Override
public void beforeProcess(Integer item) {
log.info("--beforeProcess");
}
@Override
public void afterProcess(Integer item, String result) {
log.info("--afterProcess");
}
@Override
public void onProcessError(Integer item, Exception e) {
log.info("--onProcessError");
}
}
@Bean
public Step step01() {
return stepBuilderFactory.get("step01")
.<String, String>chunk(10)
.reader(listItemReader())
.processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
.writer((ItemWriter<String>) items -> {
System.out.println("items =" + items);
})
.listener(new CustomItemProcessListener())
.build();
}
6. ItemWriteListener
ItemWriterListener는 writer 메서드 호출전 후 오류 발생시 호출됩니다
- @BeforeWrite, @AfterWrite, @OnWriteError
@Slf4j
public class CustomItemWriterListner implements ItemWriteListener {
@Override
public void beforeWrite(List items) {
log.info("--beforeWrite");
}
@Override
public void afterWrite(List items) {
log.info("--afterWrite");
}
@Override
public void onWriteError(Exception exception, List items) {
log.info("--onWriteError");
}
}
@Bean
public Step step01() {
return stepBuilderFactory.get("step01")
.<String, String>chunk(10)
.reader(listItemReader())
.processor((Function<? super String, ? extends String>) firstName -> firstName + "직방")
.writer((ItemWriter<String>) items -> {
System.out.println("items =" + items);
})
.listener(new CustomItemWriterListner())
.build();
}
7. SkipListener
SkipListener read write process 중 skip이 발생한 경우 호출됩니다
- @OnSkipRead, @OnSkipInWrite, @OnSkipInPorcess
@Slf4j
public class CustomSkipListener implements SkipListener {
@Override
public void onSkipInRead(Throwable t) {
log.info("--onSkipInRead");
}
@Override
public void onSkipInWrite(Object item, Throwable t) {
log.info("--onSkipInWrite");
}
@Override
public void onSkipInProcess(Object item, Throwable t) {
log.info("--onSkipInProcess");
}
}
@Bean
public Step step01() {
return stepBuilderFactory.get("step01")
.<String,String >chunk(10)
.reader(listItemReader())
.processor((Function<? super String, ? extends String>) firstName -> firstName +"직방")
.writer((ItemWriter<String>) items -> {
System.out.println("items =" + items);
})
.faultTolerant()
.skip(SQLException.class)
.listener(new CustomSkipListener())
.build();
}
8. RetryListener
RetryListener retry 전 후 오류 시 호출 되며 open() 반환값에 따라 재시도 를 합니다
@Slf4j
public class CustomRetryListener implements RetryListener {
@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback<T, E> callback) {
log.info("--open");
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("--close");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback<T, E> callback, Throwable throwable) {
log.info("--onError");
}
}
@Bean
public Step step01() {
return stepBuilderFactory.get("step01")
.<String,String >chunk(10)
.reader(listItemReader())
.processor((Function<? super String, ? extends String>) firstName -> firstName +"직방")
.writer((ItemWriter<String>) items -> {
System.out.println("items =" + items);
})
.faultTolerant()
.retry(SQLException.class)
.listener(new CustomRetryListener())
.build();
}
'Server' 카테고리의 다른 글
DataGrip 에서 SSH 터널링으로 DB 접근하기 (2) | 2021.12.22 |
---|---|
Ubuntu PPA 저장소 관리 (0) | 2021.12.20 |
[Spring] Jackson을 통한 LocalDateTime 매핑 시 deserialize 에러 해결 (0) | 2021.12.16 |
log4j 보안 취약점 동작원리 및 jenkins 서버 확인 방법 (1) | 2021.12.14 |
M1 맥미니의 Mysql Config (0) | 2021.12.13 |
Redis Stream 을 사용해보려 합니다. (0) | 2021.12.05 |