티스토리 뷰

Server

Job & Step 병렬 처리 하기

니용 2022. 1. 5. 22:50
반응형

이번 글에서는 스프링 배치의 Job을 실행하면서 Step을 순서대로 실행시키는 것이 아닌 병렬로 처리하는 방법을 간단히 적어보고자 합니다. 

저는 Step1, Step2, Step3 은 동시에 실행되고 Step4는 앞의 스텝이 모두 끝난 뒤 실행하고 싶습니다. 한 번 코드로 작성해보겠습니다. 

@Configuration
@RequiredArgsConstructor
public class ParallelBatch {
    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    
    private static final String JOB_NAME = "ParallelJob";
    private static final String STEP1_NAME = "Step1";
    private static final String STEP2_NAME = "Step2";
    private static final String STEP3_NAME = "Step3";
    private static final String STEP4_NAME = "Step4";
    
    @Bean(JOB_NAME)
    public Job ParallelJob() {
    
        Flow flow1 = new FlowBuilder<Flow>("flow1")
          .start(step1()) // Step1 을 가진 플로우를 생성해줍니다.
          .build();
          
        Flow flow2 = new FlowBuilder<Flow>("flow2")
          .start(step2()) // Step2 을 가진 플로우를 생성해줍니다.
          .build();
          
        Flow flow3 = new FlowBuilder<Flow>("flow3")
          .start(step3()) // Step3 을 가진 플로우를 생성해줍니다.
          .build();
          
        Flow doItParallelSteps = new FlowBuilder<Flow>("doItParallelSteps")
          .split(new SimpleAsyncTaskExecutor())
          .add(flow1, flow2, flow3) // 동시에 실행될 flow 들을 넣어줍니다.
          .build();
          
        return jobBuilderFactory.get(JOB_NAME)
          .start(doItParallelSteps)
          .next(step4())
          .build();
     
    }
    
    
    @Bean(STEP1_NAME)
    public Step step1() {
      return stepBuilderFactory.get(STEP1_NAME)
          .tasklet((contribution, chunk) -> {
              for(int i=0; i<1000; i++) {
                  System.out.println("Step1 " + i + " executed!!");
              }
              
          })
          .build();
    }
    
    @Bean(STEP2_NAME)
    public Step step2() {
      return stepBuilderFactory.get(STEP2_NAME)
          .tasklet((contribution, chunk) -> {
              for(int i=1000; i<3000; i++) {
                  System.out.println("Step2 " + i + " executed!!");
              }
              
          })
          .build();
    }
    
    @Bean(STEP3_NAME)
    public Step step3() {
      return stepBuilderFactory.get(STEP3_NAME)
          .tasklet((contribution, chunk) -> {
              for(int i=3000; i<5000; i++) {
                  System.out.println("Step3 " + i + " executed!!");
              }
              
          })
          .build();
    }
    
    @Bean(STEP4_NAME)
    public Step step4() {
      return stepBuilderFactory.get(STEP4_NAME)
          .tasklet((contribution, chunk) -> {
              for(int i=5000; i<7000; i++) {
                  System.out.println("Step4 " + i + " executed!!");
              }
              
          })
          .build();
    }
}

 

위처럼 코드를 작성하면 요구사항에 맞춰 Step1, Step2, Step3 가 모두 실행되고 끝난 뒤에 Step4 가 시작되는 구조로 실행할 수 있습니다. 그럼 Flow 객체는 무슨 의미일까요? 

스프링 배치는 Job 아래에 여러 개의 Step을 구성한 형태로 존재하게 됩니다. 이 Step 들이 중구난방으로 나열이 되어 있을 때 정렬을 해주는 것이 Flow의 역할이라고 생각하면 쉽습니다. 쉽게 말해 순서를 정해주어 다른 Step 들이 어떤 조건에 실행이 되면 되는 것인지 정의해주는 인터페이스 입니다. 

관련 내용은 공식 문서에 보다 자세히 나와 있습니다.

https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#scalabilityParallelSteps

 

Scaling and Parallel Processing

As long as the application logic that needs to be parallelized can be split into distinct responsibilities and assigned to individual steps, then it can be parallelized in a single process. Parallel Step execution is easy to configure and use. For example,

docs.spring.io

 

위의 문서에서는 Flow 와 TaskExecutor 를 각각 Bean 선언하여 정의하였습니다만, 저는 Step 을 Bean 선언하여 배치를 작성하였기에 이와 같이 작성해 보았습니다 :) 

@Bean
public Job job() {
    return jobBuilderFactory.get("job")
        .start(splitFlow())
        .next(step4())
        .build()        //builds FlowJobBuilder instance
        .build();       //builds Job instance
}

@Bean
public Flow splitFlow() {
    return new FlowBuilder<SimpleFlow>("splitFlow")
        .split(taskExecutor())
        .add(flow1(), flow2())
        .build();
}

@Bean
public Flow flow1() {
    return new FlowBuilder<SimpleFlow>("flow1")
        .start(step1())
        .next(step2())
        .build();
}

@Bean
public Flow flow2() {
    return new FlowBuilder<SimpleFlow>("flow2")
        .start(step3())
        .build();
}

@Bean
public TaskExecutor taskExecutor() {
    return new SimpleAsyncTaskExecutor("spring_batch");
}

 

위 내용들은 모두 Spring Batch 4 버전 이상에서만 실행이 가능하고, 하위 버전에서는 특정 클래스를 추가하여야 사용이 가능하다고 합니다. 왜냐하면 이 내용들은 모두 멀티쓰레딩에 근거하여 진행을 하기 때문입니다. 즉, Step1 부터 3까지 호출되는 데에 각각의 쓰레드가 1,2,3번이 모두 들어가서 수행을 하고 나온 뒤 먼저 끝난 쓰레드가 가장 마지막에 끝난 쓰레드를 기다리는 형태입니다.

@Bean public Job job() { return jobBuilderFactory.get("job") .start(splitFlow()) .next(step4()) .build() //builds FlowJobBuilder instance .build(); //builds Job instance } @Bean public Flow splitFlow() { return new FlowBuilder<SimpleFlow>("splitFlow") .split(taskExecutor()) .add(flow1(), flow2()) .build(); } @Bean public Flow flow1() { return new FlowBuilder<SimpleFlow>("flow1") .start(step1()) .next(step2()) .build(); } @Bean public Flow flow2() { return new FlowBuilder<SimpleFlow>("flow2") .start(step3()) .build(); } @Bean public TaskExecutor taskExecutor() { return new SimpleAsyncTaskExecutor("spring_batch"); }

반응형
댓글
공지사항