zl程序教程

您现在的位置是:首页 >  工具

当前栏目

DBMS_PARALLEL_EXECUTE包的应用

应用 execute Parallel DBMS
2023-09-14 09:14:48 时间
  • 前言叙述:

1 把数据集分割成小的块
2 在每一个块上以并行的方式应用update语句,在每个块执行完成后,提交!

此更新技术有如下好处:
1 在执行update的时候,仅仅锁住一个shunk而非锁住整个表!
2 因为每个chunk 执行完毕就提交,所以当update操作失败后,之前变更的并不会回滚!
3 减小回滚空间的使用
4 提高性能

dbms_parallel_execute一般分为3个步骤:

创建一个TASK;然后创建CHUNK把数据进行分批;最后是执行这个TASK。注意,该包是Oracle 11g 以后才有的。

DBMS_PARALLEL_EXECUTE 使用三种方法来将一个表的数据分割成chunk
CREATE_CHUNKS_BY_NUMBER_COL : 通过指定的字段来切割表
CREATE_CHUNKS_BY_ROWID : 通过ROWID来切割表
CREATE_CHUNKS_BY_SQL : 通过用户提供的sql语句来切割表

  • DBMS_PARALLEL_EXECUTE语法:来自官方文档

CREATE_TASK Procedure

Syntax:
DBMS_PARALLEL_EXECUTE.CREATE_TASK (
   task_name        IN   VARCHAR2,
   comment          IN   VARCHAR2 DEFAULT NULL);

CREATE_CHUNKS_BY_NUMBER_COL Procedure

Syntax:

DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_NUMBER_COL (
   task_name       IN  VARCHAR2,
   table_owner     IN  VARCHAR2,
   table_name      IN  VARCHAR2,
   table_column    IN  VARCHAR2,
   chunk_size      IN  NUMBER);


注意:ORACLE并不是那么智能的,所以如果进行CHUNK的列中如果有异常数据导致某一两个值特别大,那么就会因此产生很多很多空的CHUNK,
导致整个CHUNK过程需要很长时间。而且,接下来的RUN_TASK的时候,传入的START_ID和END_ID将是这里CHUNK使用的值,
那么如果这个值上没有索引或者索引的可选择性不高,那整个执行过程就是噩梦。


CREATE_CHUNKS_BY_ROWID Procedure

Syntax:

DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_ROWID (
   task_name       IN  VARCHAR2,
   table_owner     IN  VARCHAR2,
   table_name      IN  VARCHAR2,
   by_row          IN  BOOLEAN,
   chunk_size      IN  NUMBER);

BY_ROW:分CHUNK的类型。如果为TRUE,则后面的CHUNK_SIZE表示是行;如果是FALSE,则后面的CHUNK_SIZE表示的是BLOCK。
CHUNK_SIZE:CHUNK大小。如果BY_ROW为TRUE,表示多少行分为一个CHUNK;如果BY_ROW为FALSE,则表示多少块分为一个CHUNK。

注意:这里的TABLE_OWNER和TABLE_NAME是只能用大写的,不知道是否是BUG,小写的时候会报ORA-29491错误。

CREATE_CHUNKS_BY_SQL Procedure

Syntax:

DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL (
   task_name       IN  VARCHAR2,
   sql_statement   IN  CLOB,
   by_rowid        IN  BOOLEAN);

如果只需要对表中的部分数据,或者基于某个查询的结果集进行并行操作,那根据SQL结果进行CHUNK就派上用场了

DROP_TASK Procedure

Syntax:

DBMS_PARALLEL_EXECUTE.DROP_TASK (
   task_name       IN VARCHAR2);

RESUME_TASK Procedures

Syntax:

DBMS_PARALLEL_EXECUTE.RESUME_TASK (
   task_name                           IN  VARCHAR2,
   sql_stmt                               IN  CLOB,   
   language_flag                      IN  NUMBER,
   edition                                IN  VARCHAR2  DEFAULT NULL,
   apply_crossedition_trigger    IN  VARCHAR2  DEFAULT NULL,
   fire_apply_trigger                 IN  BOOLEAN   DEFAULT TRUE,
   parallel_level                       IN  NUMBER    DEFAULT 0,
   job_class                             IN  VARCHAR2  DEFAULT 'DEFAULT_JOB_CLASS',
   force                                  IN  BOOLEAN   DEFAULT FALSE);

DBMS_PARALLEL_EXECUTE.RESUME_TASK (
   task_name                     IN  VARCHAR2,
   force                             IN  BOOLEAN   DEFAULT FALSE);

RUN_TASK Procedure

Syntax:

DBMS_PARALLEL_EXECUTE.RUN_TASK (
   task_name                     IN  VARCHAR2,
   sql_stmt                         IN  CLOB,
   language_flag                 IN  NUMBER,
   edition                           IN  VARCHAR2  DEFAULT NULL,
   apply_crossedition_trigger    IN  VARCHAR2  DEFAULT NULL,
   fire_apply_trigger            IN  BOOLEAN   DEFAULT TRUE,
   parallel_level                   IN  NUMBER    DEFAULT 0,
   job_class                        IN  VARCHAR2  DEFAULT 'DEFAULT_JOB_CLASS');

TASK_STATUS Procedure

Syntax:

DBMS_PARALLEL_EXECUTE.TASK_STATUS (
   task_name       IN VARCHAR2);

STOP_TASK Procedure  中途终止任务
Syntax:

DBMS_PARALLEL_EXECUTE.STOP_TASK (
   task_name       IN VARCHAR2);

  • 常用监控视图:

 这时可以通过视图来查看任务的创建情况:
select task_name,chunk_type,status from dba_parallel_execute_tasks;

分好后的CHUNK可以使用视图查看:
SQL> select chunk_id,task_name,status,start_rowid,end_rowid from dba_parallel_execute_chunks where rownum<=2;

------------------------------------------------------------------------------------实例操作-------------------------------------------------------------------------

  • 前期准备:

使用上述功能的用户必须拥有CREATE JOB 权限,执行DBMS_SQL的权限,因为CHUNK_BY_SQL, RUN_TASK, 和RESUME_TASK.

建表:
 create table  zhangtabid(id number,ms varchar(50),num_col number) ;

 INSERT /*+ APPEND */
 INTO zhangtabid
   SELECT level id,
          'Description for ' || level ms,
          CASE
            WHEN MOD(level, 5) = 0 THEN
             10
            WHEN MOD(level, 3) = 0 THEN
             20
            ELSE
             30
          END num_col
     FROM dual
   CONNECT BY level <= 500000;
 
commit;

分组统计数据量:
SELECT num_col, COUNT(*) FROM zhangtabid GROUP BY num_col ORDER BY num_col;

  • 1. 使用 CREATE_CHUNKS_BY_ROWID

BEGIN
DBMS_PARALLEL_EXECUTE.DROP_TASK ('test_task');
END;
/

注意事项:最好命令行执行,删除时  如果任务不存在,会提示ORA-29498错误 忽略即可。

DECLARE
  l_task     VARCHAR2(30) := 'test_task';
  l_sql_stmt VARCHAR2(32767);
  l_try      NUMBER;
  l_status   NUMBER;
BEGIN
  -- Create the TASK
  DBMS_PARALLEL_EXECUTE.create_task(task_name => l_task);

  -- Chunk the table by the ROWID
  DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name   => l_task,
                                               table_owner => 'SJJC_BZ',
                                               table_name  => 'ZHANGTABID',
                                               by_row      => TRUE,
                                               chunk_size  => 10000);
  -- DML to be execute in parallel
  l_sql_stmt := 'UPDATE /*+ ROWID (dda) */ ZHANGTABID t SET t.num_col = t.num_col + 10 WHERE rowid BETWEEN :start_id AND :end_id';
  -- Run the task
  DBMS_PARALLEL_EXECUTE.run_task(task_name      => l_task,
                                 sql_stmt       => l_sql_stmt,
                                 language_flag  => DBMS_SQL.NATIVE,
                                 parallel_level => 10);

  -- If there is error, RESUME it for at most 2 times.
  l_try    := 0;
  l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  WHILE (l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) Loop
    l_try := l_try + 1;
    DBMS_PARALLEL_EXECUTE.resume_task(l_task);
    l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  END LOOP;
  -- Done with processing; drop the task
  DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/

 

SELECT num_col, COUNT(*) FROM ZHANGTABID GROUP BY num_col ORDER BY num_col;

  • 2. 使用 CREATE_CHUNKS_BY_NUMBER_COL


CREATE OR REPLACE PROCEDURE process_update (p_start_id IN NUMBER, p_end_id IN NUMBER) AS
      BEGIN
        UPDATE /*+ ROWID (dda) */ ZHANGTABID t 
          SET t.num_col = t.num_col + 10
          WHERE id BETWEEN p_start_id AND p_end_id;
      END;
    /


DECLARE
  l_task     VARCHAR2(30) := 'test_task';
  l_sql_stmt VARCHAR2(32767);
  l_try      NUMBER;
  l_status   NUMBER;
BEGIN
  -- Create the TASK
  DBMS_PARALLEL_EXECUTE.create_task(task_name => l_task);

  -- Chunk the table by the ROWID
  DBMS_PARALLEL_EXECUTE.create_chunks_by_rowid(task_name   => l_task,
                                               table_owner => 'SJJC_BZ',
                                               table_name  => 'ZHANGTABID',
                                               by_row      => TRUE,
                                               chunk_size  => 10000);
  -- DML to be execute in parallel
  l_sql_stmt := 'UPDATE /*+ ROWID (dda) */ ZHANGTABID t SET t.num_col = t.num_col + 10 WHERE rowid BETWEEN :start_id AND :end_id';
  -- Run the task
  DBMS_PARALLEL_EXECUTE.run_task(task_name      => l_task,
                                 sql_stmt       => l_sql_stmt,
                                 language_flag  => DBMS_SQL.NATIVE,
                                 parallel_level => 10);

  -- If there is error, RESUME it for at most 2 times.
  l_try    := 0;
  l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  WHILE (l_try < 2 and l_status != DBMS_PARALLEL_EXECUTE.FINISHED) Loop
    l_try := l_try + 1;
    DBMS_PARALLEL_EXECUTE.resume_task(l_task);
    l_status := DBMS_PARALLEL_EXECUTE.task_status(l_task);
  END LOOP;
  -- Done with processing; drop the task
  DBMS_PARALLEL_EXECUTE.drop_task(l_task);
END;
/

查看任务执行情况:
 select task_name,chunk_type,status from dba_parallel_execute_tasks;

SELECT num_col, COUNT(*) FROM ZHANGTABID GROUP BY num_col ORDER BY num_col;

  • 3. 使用 CREATE_CHUNKS_BY_SQL.

BEGIN
DBMS_PARALLEL_EXECUTE.DROP_TASK ('test_task');
END;
/
说明:报不存在错误,忽略即可。


DECLARE
  l_chunk_sql VARCHAR2(1000);
  l_sql_stmt  VARCHAR2(1000);
  l_try       NUMBER;
  l_status    NUMBER;
BEGIN
  -- Create the TASK
  DBMS_PARALLEL_EXECUTE.CREATE_TASK('test_task');
  -- Chunk the table by NUM_COL
  l_chunk_sql := 'SELECT DISTINCT num_col, num_col FROM ZHANGTABID';
  DBMS_PARALLEL_EXECUTE.CREATE_CHUNKS_BY_SQL('test_task',
                                             l_chunk_sql,
                                             false);
  -- Execute the DML in parallel
  -- the WHERE clause contain a condition on num_col, which is the chunk
  -- column. In this case, grouping rows is by num_col.
  l_sql_stmt := 'UPDATE /*+ ROWID (dda) */ ZHANGTABID t SET t.num_col = t.num_col + 10 WHERE num_col BETWEEN :start_id AND :end_id';
  DBMS_PARALLEL_EXECUTE.RUN_TASK('test_task',
                                 l_sql_stmt,
                                 DBMS_SQL.NATIVE,
                                 parallel_level => 10);
  -- If there is error, RESUME it for at most 2 times.
  L_try    := 0;
  L_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS('test_task');
  WHILE (l_try < 2 and L_status != DBMS_PARALLEL_EXECUTE.FINISHED) Loop
    L_try := l_try + 1;
    DBMS_PARALLEL_EXECUTE.RESUME_TASK('test_task');
    L_status := DBMS_PARALLEL_EXECUTE.TASK_STATUS('test_task');
  END LOOP;
  -- Done with processing; drop the task
  DBMS_PARALLEL_EXECUTE.DROP_TASK('test_task');
end;
/


查看任务执行情况:
 select task_name,chunk_type,status from dba_parallel_execute_tasks;

SELECT num_col, COUNT(*) FROM ZHANGTABID GROUP BY num_col ORDER BY num_col;

测试过程中发现如果CHUNK很小,则导致分出来的CHUNK过多,那么CHUNK_ID是使用一个SEQUENCE叫 DBMS_PARALLEL_EXECUTE_SEQ$来生成的,
而这个SEQUENCE的默认CACHE值只有20,可以通过加大这里的CACHE值解决一部分性能问题。

  • 总结:

DMBS_PARALLEL_EXECUTE使用步骤基本是:创建任务、把数据进行CHUNK、执行任务三大步骤。

其中CHUNK中的BY_ROWID和BY_COL都比较容易理解,BY_SQL是不大容易理解也是最灵活的方式。其实BY_SQL就是执行一个查询,
但最重要的是查询返回的结果要是能够进行CHUNK的区间,这里怎么写这个SQL就是非常头疼的问题了
,这个CHUNK写好了,后面的问题就简单了。

除了对自己进行数据更新外,也可以进行数据的并行迁移(只要把UPDATE改成INSERT另一个表就好了)等等许多复杂的任务,
而且DBMS_PARALLEL_EXECUTE提供了set_chunk_status、 get_rowid_chunk等多种更灵活的方式来控制整个任务执行的过程,从而实现复杂任务的并行执行。
 

更多精彩内容,还会不断地奉上,有些内容是对其它博客的完善与汇总,请大家多提宝贵意见,定会改进。--【京斗码农奉上】


数据仓库大数据开发学习的心路历程

https://blog.csdn.net/weixin_42163563/article/details/81943758