Use EHPC to achieve "perfectly parallel" efficient batch processing program

Use EHPC to achieve "perfectly parallel" efficient batch processing program

Use EHPC to achieve "perfectly parallel" efficient batch processing program

In the high-performance computing scenario, a user's business calculation can be divided into a large number of tasks. The processing logic of each task is the same, but the input files, parameter settings, and output files are different. Because each task has similar processing logic and does not depend on each other during execution, according to the parallel computing mode of high-performance computing, it can be classified as "embarrassing parallel" (also known as perfect parallel problems). There are few or no such problems. The problem needs to be divided into many parallel tasks. There is little or no dependency or communication between these parallel tasks. This kind of problem has another name, called "batch processing", which is the most "perfect" in the field of high-performance computing. A scene. Here, an array job solution based on Alibaba Cloud's flexible high-performance computing scenario is given using the E-HPC integrated job scheduling system to automatically assign the user's batch processing tasks to the array job, which is implemented on the cloud supercomputer cluster High concurrent execution. At the same time, relying on "cloud" elasticity, dynamically expand the computing resources of the cluster and control the completion time of batch processing.

Background introduction

This section first introduces batch processing scenarios through an example, and then discusses high-performance computing clusters and array jobs.

Batch processing

In the field of high-performance computing, there are large batches of computing scenarios that can be processed at the same time. For example, the following freebayes application scenarios. Different tasks use the freebayes application, but each task processes different input files (--bam-list). The parameters (-r) and different result files (--vcf). Due to the huge amount of work, concurrent execution of tasks is required to shorten task processing time.

Introduction to high-performance computing clusters and array jobs

A high-performance computing cluster connects a large number of computing nodes through a network for unified management and scheduling, providing a computing environment for large-scale application operation, including account management, scheduling management, file system, cluster monitoring and other modules.

Since the cluster contains a large number of computing nodes, it is usually used by multiple users. Each user can submit multiple jobs, and each job requires one or more computing nodes. The allocation of cluster resources is coordinated by scheduling management to avoid resource usage conflicts. Commonly used scheduling management software includes PBS, Slurm, SGE, LSF, etc.

An array job is a collection of a group of jobs. You can execute a command to submit a job and submit all jobs in the job set. Each job is distinguished by its own index value.

If you use the PBS scheduler to submit an array job, the file name is qjob.sh, and the content is as follows:

#!/bin/bash

#PBS -N arrjob                 #  
#PBS -l nodes=1:ppn=1         #  1 1 
#PBS -J 1-3                 #  1 2 3

echo $PBS_ARRAY_ID             #  PBS_ARRAY_ID   

The qjob.sh script defines an array job, which contains 3 jobs. The job number range is -Jspecified, and the value is 1-3. When a specific job is executed, the number of each job is obtained through environment variables $PBS_ARRAY_ID. The qjob.sh job can be submitted by the following command:

qsub ./qjob.sh 

At this time, three jobs are created, and whether the job can be executed immediately requires the scheduler to determine according to the cluster's idle resources and the resource requirements of the job. If resources are abundant, 3 jobs can run at the same time.

Use array jobs to solve batch tasks

From the introduction of batch processing and array jobs, array jobs are suitable for batch computing scenarios, but they still have the following problems if they are easy to use:

  1. Correspondence between batch tasks and jobs? When the number of tasks is huge, is one task just one job, or does one job contain multiple tasks?
  2. How $PBS_ARRAY_IDto link to different tasks? And can it easily correspond to different parameters of different tasks?
  3. How to track the execution of tasks? How can I easily view the task log? After individual tasks fail to execute, how can they be quickly screened and re-executed after adjustment?

To this end, we give a solution for batch processing using array jobs, including batch processing tasks to job assignment, batch processing task definition, and task running and tracking functions.

Batch task to job assignment

When the number of batch processing tasks is huge, if each task is assigned a job, the load of the scheduler will increase. Although the scheduler can display the running status of different jobs, the number of jobs is too large, which will also cause inconvenience to view. In addition, adjacent tasks are executed on one node, and if the same file is used, the node's local cache can be reused.

For this reason, if the number of tasks is Nt and the number of jobs is Nj , the number of tasks processed by each job is Nt/Nj . If it is not evenly divisible, the job with a job number less than Nt%Nj will process one more task. As in the batch processing task above, if Nt/Nj = 2, but not divisible, the job with a small job number will process 3 tasks, and a job with a large number will process 2 tasks.

Batch task definition

From the example of batch processing tasks, we can see that some parameters of each task are different. If these changed parts are replaced with variables, the processing script of the batch task is (stored in the file task.sh):

$ cat task.sh
#!/bin/bash
echo "process $bamlist and $chrvar"
#other shell commands  cd $bamlist 
freebayes --bam-list   $bamlist -r $chrvar  --vcf  /home/user/result/out-$bamlist.vcf
ret=$? #  
# other shell commands
# ... ... 

exit $ret #  0 0  

Among them, is used to $bamlistindicate the change of the --bam-list option and the change of the --vcf parameter, and is used to $chrvarindicate the change of the -r selection.

Store the specific changed values in a file with the same variable name. Each line represents a different value. In the example, there are two variables, so two files are needed-bamlist and chrvar.

$ cat bamlist
bam1_100
bam101_200
bam201_300
bam301_400
bam401_500
bam501_600
bam601_700
bam701_800
bam801_900
... ...
bam901_1000
bam1001_1100 
$ cat chrvar
chr01:1-1000
chr01:1001-2000
chr03:100-200
chr12:1000-2000
chr02:100-1100
chr03:1000-2000
chr05:1000-2000
chr08:1000-2000
chr08:3000-6000
... ...
chr01:8000-9000
chr06:1000-2000 

Task operation and tracking

After the batch processing task is defined, it is necessary to implement task and job mapping, analysis and assignment of variable files. For these general functions, E-HPC provides ehpcarrayjob.pypython scripts for processing. If the script name of the array job is qjob.sh, its content is:

$ cat qjob.sh
#!/bin/bash

PBS -N bayes.job
#PBS -l nodes=1:ppn=1
#PBS -J 1-Nj

cd $PBS_O_WORKDIR #  

python ehpcarrayjob.py -n Nj -e ./task.sh bamlist chrvar 

It is submitted to the cluster through the qsub command, and PBS is used for scheduling to realize batch execution (where Nj is the number of jobs, which are replaced according to requirements).

$ python ehpcarrayjob.py -h
usage: ehpcarrayjob.py [-h] -n NJOBS -e EXECFILE argfiles [argfiles ...]

positional arguments:
  argfiles

optional arguments:
  -h, --help            show this help message and exit
  -n NJOBS, --njobs NJOBS
                        number of jobs
  -e EXECFILE, --execfile EXECFILE
                        job command file 

among them:

-n indicates how many jobs there are

-e specifies the processing script of each task (path is required)

argfiles One or more, specify multiple parameter files.

After the job is submitted, the array job will be assigned a job id, such as "1[].manager", and each sub-job has its own sub-job number, such as 1-Nj.

ehpcarrayjob.py will generate a directory named "job id" (such as 1[].manager), and each sub-job has a log file named "log.sub-job number" in this directory to record the execution of each job .

When the return status of the task is non-zero (failure), the value of the task variable will be recorded in the file named "fails. variable name. sub-job number" in the "job id" directory . After determining the cause of the failure and modifying the processing script, it is convenient to resubmit the job.

summary

From the user's point of view, every time a numerical calculation task comes, in addition to dividing the batch of tasks, even if there are legacy scripts, it is necessary to rewrite the processing script of each task.

In addition, we have to face the following problems in operating scenarios:

How many resources are needed for this calculation?
Where to find these resources?
Can the task be run, and how can I find the cause of an error?
Will the task be recalculated or missed?
Can the machine be connected, and will there be long periods of idle time?

Using the batch processing solution of Alibaba Cloud Elastic High Performance Computing (E-HPC) can solve the above problems and make work more focused.

It can be seen that users only need to go through the following steps with the help of the E-HPC solution:

  1. Extract the changed values in the batch task and store them in a file separately. The file name conforms to the shell specification, such as bamlist, chrvar.
  2. Write a script for task processing, and replace the changed value in the task with the variable name (the file name has the same name), such as task.sh.
  3. Write the array job script, specify the resource requirements of each job, the total number of jobs, and call ehpcarrayjob.py to start the batch job execution, such as qjob.sh.

Submit the job with qsub, and enter the current task processing progress of "Job id" and the list of tasks with problems. The running status of the job is judged according to the resource status of the cluster. If the cluster nodes are sufficient, all jobs can be run; if the resources are not satisfied, a small number of jobs can be executed first.

At the same time, the E-HPC "cloud" supercomputing solution has the following advantages:

  1. With the original characteristics of HPC cluster, it is convenient for users to log in to the cluster to compile and debug the processing logic of a single task, and monitor, analyze, and optimize application operation behavior through the integrated E-HPC built-in application-level monitoring module.
  2. With E-HPC, the configured environment can be directly extended to newly added computing nodes. At the same time, use low-configuration login and control nodes to retain the configured environment for a long time.
  3. According to the current task processing efficiency, dynamically change the type of computing instance on the "cloud" and expand the computing resources to adjust the processing time of the task to deal with urgent task processing.

    Use EHPC to achieve "perfectly parallel" efficient batch processing program

    In the high-performance computing scenario, a user's business calculation can be divided into a large number of tasks. The processing logic of each task is the same, but the input files, parameter settings, and output files are different. Because each task has similar processing logic and does not depend on each other during execution, according to the parallel computing mode of high-performance computing, it can be classified as "embarrassing parallel" (also known as perfect parallel problems). There are few or no such problems. The problem needs to be divided into many parallel tasks. There is little or no dependency or communication between these parallel tasks. This kind of problem has another name, called "batch processing", which is the most "perfect" in the field of high-performance computing. A scene. Here, an array job solution based on Alibaba Cloud's flexible high-performance computing scenario is given using the E-HPC integrated job scheduling system to automatically assign the user's batch processing tasks to the array job, which is implemented on the cloud supercomputer cluster High concurrent execution. At the same time, relying on "cloud" elasticity, dynamically expand the computing resources of the cluster and control the completion time of batch processing.

    Background introduction

    This section first introduces batch processing scenarios through an example, and then discusses high-performance computing clusters and array jobs.

    Batch processing

    In the field of high-performance computing, there are large batches of computing scenarios that can be processed at the same time. For example, the following freebayes application scenarios. Different tasks use the freebayes application, but each task processes different input files (--bam-list). The parameters (-r) and different result files (--vcf). Due to the huge amount of work, concurrent execution of tasks is required to shorten task processing time.

    Introduction to high-performance computing clusters and array jobs

    A high-performance computing cluster connects a large number of computing nodes through a network for unified management and scheduling, providing a computing environment for large-scale application operation, including account management, scheduling management, file system, cluster monitoring and other modules.

    Since the cluster contains a large number of computing nodes, it is usually used by multiple users. Each user can submit multiple jobs, and each job requires one or more computing nodes. The allocation of cluster resources is coordinated by scheduling management to avoid resource usage conflicts. Commonly used scheduling management software includes PBS, Slurm, SGE, LSF, etc.

    An array job is a collection of a group of jobs. You can execute a command to submit a job and submit all jobs in the job set. Each job is distinguished by its own index value.

    If you use the PBS scheduler to submit an array job, the file name is qjob.sh, and the content is as follows:

    #!/bin/bash
    
    #PBS -N arrjob                 #  
    #PBS -l nodes=1:ppn=1         #  1 1 
    #PBS -J 1-3                 #  1 2 3
    
    echo $PBS_ARRAY_ID             #  PBS_ARRAY_ID   

    The qjob.sh script defines an array job, which contains 3 jobs. The job number range is -Jspecified, and the value is 1-3. When a specific job is executed, the number of each job is obtained through environment variables $PBS_ARRAY_ID. The qjob.sh job can be submitted by the following command:

    qsub ./qjob.sh 

    At this time, three jobs are created, and whether the job can be executed immediately requires the scheduler to determine according to the cluster's idle resources and the resource requirements of the job. If resources are abundant, 3 jobs can run at the same time.

    Use array jobs to solve batch tasks

    From the introduction of batch processing and array jobs, array jobs are suitable for batch computing scenarios, but they still have the following problems if they are easy to use:

  4. Correspondence between batch tasks and jobs? When the number of tasks is huge, is one task just one job, or does one job contain multiple tasks?
  5. How $PBS_ARRAY_IDto link to different tasks? And can it easily correspond to different parameters of different tasks?
  6. How to track the execution of tasks? How can I easily view the task log? After individual tasks fail to execute, how can they be quickly screened and re-executed after adjustment?

To this end, we give a solution for batch processing using array jobs, including batch processing tasks to job assignment, batch processing task definition, and task running and tracking functions.

Batch task to job assignment

When the number of batch processing tasks is huge, if each task is assigned a job, the load of the scheduler will increase. Although the scheduler can display the running status of different jobs, the number of jobs is too large, which will also cause inconvenience to view. In addition, adjacent tasks are executed on one node, and if the same file is used, the node's local cache can be reused.

For this reason, if the number of tasks is Nt and the number of jobs is Nj , the number of tasks processed by each job is Nt/Nj . If it is not evenly divisible, the job with a job number less than Nt%Nj will process one more task. As in the batch processing task above, if Nt/Nj = 2, but not divisible, the job with a small job number will process 3 tasks, and a job with a large number will process 2 tasks.

Batch task definition

From the example of batch processing tasks, we can see that some parameters of each task are different. If these changed parts are replaced with variables, the processing script of the batch task is (stored in the file task.sh):

$ cat task.sh
#!/bin/bash
echo "process $bamlist and $chrvar"
#other shell commands  cd $bamlist 
freebayes --bam-list   $bamlist -r $chrvar  --vcf  /home/user/result/out-$bamlist.vcf
ret=$? #  
# other shell commands
# ... ... 

exit $ret #  0 0  

Among them, is used to $bamlistindicate the change of the --bam-list option and the change of the --vcf parameter, and is used to $chrvarindicate the change of the -r selection.

Store the specific changed values in a file with the same variable name. Each line represents a different value. In the example, there are two variables, so two files are needed-bamlist and chrvar.

$ cat bamlist
bam1_100
bam101_200
bam201_300
bam301_400
bam401_500
bam501_600
bam601_700
bam701_800
bam801_900
... ...
bam901_1000
bam1001_1100 
$ cat chrvar
chr01:1-1000
chr01:1001-2000
chr03:100-200
chr12:1000-2000
chr02:100-1100
chr03:1000-2000
chr05:1000-2000
chr08:1000-2000
chr08:3000-6000
... ...
chr01:8000-9000
chr06:1000-2000 

Task operation and tracking

After the batch processing task is defined, it is necessary to implement task and job mapping, analysis and assignment of variable files. For these general functions, E-HPC provides ehpcarrayjob.pypython scripts for processing. If the script name of the array job is qjob.sh, its content is:

$ cat qjob.sh
#!/bin/bash

PBS -N bayes.job
#PBS -l nodes=1:ppn=1
#PBS -J 1-Nj

cd $PBS_O_WORKDIR #  

python ehpcarrayjob.py -n Nj -e ./task.sh bamlist chrvar 

It is submitted to the cluster through the qsub command, and PBS is used for scheduling to realize batch execution (where Nj is the number of jobs, which are replaced according to requirements).

$ python ehpcarrayjob.py -h
usage: ehpcarrayjob.py [-h] -n NJOBS -e EXECFILE argfiles [argfiles ...]

positional arguments:
  argfiles

optional arguments:
  -h, --help            show this help message and exit
  -n NJOBS, --njobs NJOBS
                        number of jobs
  -e EXECFILE, --execfile EXECFILE
                        job command file 

among them:

-n indicates how many jobs there are

-e specifies the processing script of each task (path is required)

argfiles One or more, specify multiple parameter files.

After the job is submitted, the array job will be assigned a job id, such as "1[].manager", and each sub-job has its own sub-job number, such as 1-Nj.

ehpcarrayjob.py will generate a directory named "job id" (such as 1[].manager), and each sub-job has a log file named "log.sub-job number" in this directory to record the execution of each job .

When the return status of the task is non-zero (failure), the value of the task variable will be recorded in the file named "fails. variable name. sub-job number" in the "job id" directory . After determining the cause of the failure and modifying the processing script, it is convenient to resubmit the job.

summary

From the user's point of view, every time a numerical calculation task comes, in addition to dividing the batch of tasks, even if there are legacy scripts, it is necessary to rewrite the processing script of each task.

In addition, we have to face the following problems in operating scenarios:

How many resources are needed for this calculation?
Where to find these resources?
Can the task be run, and how can I find the cause of an error?
Will the task be recalculated or missed?
Can the machine be connected, and will there be long periods of idle time?

Using the batch processing solution of Alibaba Cloud Elastic High Performance Computing (E-HPC) can solve the above problems and make work more focused.

It can be seen that users only need to go through the following steps with the help of the E-HPC solution:

  1. Extract the changed values in the batch task and store them in a file separately. The file name conforms to the shell specification, such as bamlist, chrvar.
  2. Write a script for task processing, and replace the changed value in the task with the variable name (the file name has the same name), such as task.sh.
  3. Write the array job script, specify the resource requirements of each job, the total number of jobs, and call ehpcarrayjob.py to start the batch job execution, such as qjob.sh.

Submit the job with qsub, and enter the current task processing progress of "Job id" and the list of tasks with problems. The running status of the job is judged according to the resource status of the cluster. If the cluster nodes are sufficient, all jobs can be run; if the resources are not satisfied, a small number of jobs can be executed first.

At the same time, the E-HPC "cloud" supercomputing solution has the following advantages:

  1. With the original characteristics of HPC cluster, it is convenient for users to log in to the cluster to compile and debug the processing logic of a single task, and monitor, analyze, and optimize application operation behavior through the integrated E-HPC built-in application-level monitoring module.
  2. With E-HPC, the configured environment can be directly extended to newly added computing nodes. At the same time, use low-configuration login and control nodes to retain the configured environment for a long time.
  3. According to the current task processing efficiency, dynamically change the type of computing instance on the "cloud" and expand the computing resources to adjust the processing time of the task to deal with urgent task processing.

Original link: yq.aliyun.com/articles/69...