Hive Cost Based Optimization

This post has been republished via RSS; it originally appeared at: New blog articles in Microsoft Tech Community.

Query optimizations in a relational query engine can be broadly classified as logical query optimizations and physical query optimizations. Logical query optimizations generally refer to query optimizations that can be derived based on relational algebra independent (Calcite) of the physical layer in which query is executed. Physical query optimizations are query optimizations that are cognizant of physical layer primitives. For Hive, the physical layer implies Map-Reduce and Tez primitives.

 

Currently logical query optimizations in Hive can be broadly categorized as follows:

  • Projection Pruning
  • Deducing Transitive Predicates
  • Predicate Push down
  • Merging of Select-Select, Filter-Filter in to single operator
  • Multi-way Join
  • Query Rewrite to accommodate for Join skew on some column values

 

Physical optimizations in Hive can be broadly classified as follows:

  • Partition Pruning
  • Scan pruning based on partitions and bucketing
  • Scan pruning if query is based on sampling
  • Apply Group By on the map side in some cases
  • Perform Join on the Mapper
  • Optimize Union so that union can be performed on map side only
  • Decide which table to stream last, based on user hint, in a multi way join
  • Remove unnecessary reduce sink operators
  • For queries with limit clause, reduce the number of files that needs to be scanned for the table.
  • For queries with Limit clause, restrict the output coming from mapper by restricting what Reduce Sink operator generates.
  • Reduce the number of Tez jobs needed for answering user submitted SQL query
  • Avoid Map-Reduce jobs in case of simple fetch query
  • For simple fetch queries with aggregation, perform aggregation without Map-Reduce tasks
  • Rewrite Group By Query to use index table instead of original table
  • Use Index scans when predicates above table scan is equality predicates and columns in predicate have indexes on it.

 

Cost Based Optimization:

 

History:

 

Calcite is an open source, Apache Licensed, query planning and execution framework. Relational algebra is at the heart of Calcite. Every query is represented as a tree of relational operators. You can translate from SQL to relational algebra, or you can build the tree directly.

 

Planner rules transform expression trees using mathematical identities that preserve semantics. For example, it is valid to push a filter into an input of an inner join if the filter does not reference columns from the other input.

 

Calcite optimizes queries by repeatedly applying planner rules to a relational expression. A cost model guides the process, and the planner engine generates an alternative expression that has the same semantics as the original but a lower cost.

 

The planning process is extensible. You can add your own relational operators, planner rules, cost model, and statistics.

 

Sindhu_Subhas_0-1638884315077.jpeg

 

Enabling the CBO:

 

The CBO is enabled by default in Hive 0.14 and later. If you need to enable it manually, set the following property in hive-site.xml:

SET hive.cbo.enable=true;

 

For the physical optimizer, set the following properties in hive-site.xml to generate statistics:

SET hive.stats.fetch.column.stats=true;

SET hive.stats.fetch.partition.stats=true;

 

Hive Query PlanningHive Query PlanningCBO FlowCBO Flow

 

Gathering Statistics--Critical to the CBO:

 

The CBO requires both table-level and column-level statistics to generate efficient execution plans by examining the tables and conditions specified in the query, ultimately cutting down on query execution time and reducing resource utilization.

 

Table-level statistics:

Table-level statistics should always be collected. Make sure the following property is set as follows in hive-site.xml to collect table-level statistics:

SET hive.stats.autogather=true;

- This is applicable for newly created tables and/or partitions (that are populated through the INSERT OVERWRITE command), statistics are automatically computed by default. 
- This would add to the execution time of the queries along with Reducers tasks being created.

 

If you have an existing table that does not have statistics collected, you can collect them by running the following query:

 

ANALYZE TABLE <table_name> COMPUTE STATISTICS;

 

Column-level statistics (critical):

Column-level statistics are expensive to compute and are not yet automated. The recommended process to use for Hive 0.14 and later is to compute column statistics for all of your existing tables using the following command:

 

ANALYZE TABLE <table_name> COMPUTE STATISTICS for COLUMNS;

 

As new partitions are added to the table, if the table is partitioned on "col1" and the new partition has the key "x," then you must also use the following command:

 

ANALYZE TABLE <table_name> partition (coll="x") COMPUTE STATISTICS for COLUMNS;

 

Please note: Analyze statements are recommended to run when the cluster is idle (no production load). This query runs are heavy and would acquire most resources on YARN and locks on the Hive tables.

 

Quick Overview:

Description

Stored in

Collected by

Since

Number of partition the dataset consists of

Fictional metastore property: numPartitions

computed during displaying the properties of a partitioned table

Hive 2.3

Number of files the dataset consists of

Metastore table property: numFiles

Automatically during Metastore operations

 

Total size of the dataset as its seen at the filesystem level

Metastore table property: totalSize

 

Uncompressed size of the dataset

Metastore table property: rawDataSize

Computed, these are the basic statistics. Calculated automatically when hive.stats.autogather is enabled.
Can be collected manually by: ANALYZE TABLE ... COMPUTE STATISTICS

Hive 0.8

Number of rows the dataset consist of

Metastore table property: numRows

 

Column level statistics

Metastore; TAB_COL_STATS table

Computed, Calculated automatically when hive.stats.column.autogather is enabled.
Can be collected manually by: ANALYZE TABLE ... COMPUTE STATISTICS FOR COLUMNS

 

 

Validate the CBO with explain plans:

 

Hive provides an EXPLAIN command that shows the execution plan for a query.

Example:

 

EXPLAIN select sum(hash(key)), sum(hash(value)) from src_orc_merge_test_part where ds='2012-01-03' and ts='2012-01-03+14:46:31'

 

 

Output:

 

Plan optimized by CBO. Vertex dependency in root stage Reducer 2 <- Map 1 (SIMPLE_EDGE) Stage-0 Fetch Operator limit:-1 Stage-1 Reducer 2 File Output Operator [FS_8] compressed:false Statistics:Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE table:{"serde:":"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe","input format:":"org.apache.hadoop.mapred.TextInputFormat","output format:":"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"} Group By Operator [GBY_6] | aggregations:["sum(VALUE._col0)","sum(VALUE._col1)"] | outputColumnNames:["_col0","_col1"] | Statistics:Num rows: 1 Data size: 16 Basic stats: COMPLETE Column stats: COMPLETE …

 

 

Upcoming Hive 4.1 feature wrt Hive explain and CBO clause:

The CBO clause outputs the plan generated by Calcite optimizer. It can optionally include information about the cost of the plan using Calcite default cost model and cost model used for join reordering.

Available from: Hive release 4.0.0 (HIVE-17503 / HIVE-21184).

 

EXPLAIN [FORMATTED] CBO [COST|JOINCOST]

 

COST option prints the plan and cost calculated using Calcite default cost model.

JOINCOST option prints the plan and cost calculated using the cost model used for join reordering.

 

Sample output:

 

EXPLAIN CBO COST WITH customer_total_return AS (SELECT sr_customer_sk AS ctr_customer_sk, sr_store_sk AS ctr_store_sk, SUM(SR_FEE) AS ctr_total_return FROM store_returns, date_dim WHERE sr_returned_date_sk = d_date_sk AND d_year =2000 GROUP BY sr_customer_sk, sr_store_sk) SELECT c_customer_id FROM customer_total_return ctr1, store, customer WHERE ctr1.ctr_total_return > (SELECT AVG(ctr_total_return)*1.2 FROM customer_total_return ctr2 WHERE ctr1.ctr_store_sk = ctr2.ctr_store_sk) AND s_store_sk = ctr1.ctr_store_sk AND s_state = 'NM' AND ctr1.ctr_customer_sk = c_customer_sk ORDER BY c_customer_id LIMIT 100

 

It will produce a similar plan, but the cost for each operator will be embedded next to the operator descriptors:

CBO PLAN:

 

HiveSortLimit(sort0=[$0], dir0=[ASC], fetch=[100]): rowcount = 100.0, cumulative cost = {2.395588892021712E26 rows, 1.197794434438787E26 cpu, 0.0 io}, id = 1683 HiveProject(c_customer_id=[$1]): rowcount = 1.1977944344387866E26, cumulative cost = {2.395588892021712E26 rows, 1.197794434438787E26 cpu, 0.0 io}, id = 1681 HiveJoin(condition=[AND(=($3, $7), >($4, $6))], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 1.1977944344387866E26, cumulative cost = {1.1977944575829254E26 rows, 4.160211553874922E10 cpu, 0.0 io}, id = 1679 HiveJoin(condition=[=($2, $0)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 2.3144135067474273E18, cumulative cost = {2.3144137967122499E18 rows, 1.921860676139634E10 cpu, 0.0 io}, id = 1663 HiveProject(c_customer_sk=[$0], c_customer_id=[$1]): rowcount = 7.2E7, cumulative cost = {2.24E8 rows, 3.04000001E8 cpu, 0.0 io}, id = 1640 HiveFilter(condition=[IS NOT NULL($0)]): rowcount = 7.2E7, cumulative cost = {1.52E8 rows, 1.60000001E8 cpu, 0.0 io}, id = 1638 HiveTableScan(table=[[default, customer]], table:alias=[customer]): rowcount = 8.0E7, cumulative cost = {8.0E7 rows, 8.0000001E7 cpu, 0.0 io}, id = 1055 HiveJoin(condition=[=($3, $1)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 2.1429754692105807E11, cumulative cost = {2.897408225471977E11 rows, 1.891460676039634E10 cpu, 0.0 io}, id = 1661 HiveProject(sr_customer_sk=[$0], sr_store_sk=[$1], $f2=[$2]): rowcount = 6.210443022113779E9, cumulative cost = {7.544327346205959E10 rows, 1.891460312135634E10 cpu, 0.0 io}, id = 1685 HiveAggregate(group=[{1, 2}], agg#0=[sum($3)]): rowcount = 6.210443022113779E9, cumulative cost = {6.92328304399458E10 rows, 2.8327405501500005E8 cpu, 0.0 io}, id = 1654 HiveJoin(condition=[=($0, $4)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 6.2104430221137794E10, cumulative cost = {6.2246082040067795E10 rows, 2.8327405501500005E8 cpu, 0.0 io}, id = 1652 HiveProject(sr_returned_date_sk=[$0], sr_customer_sk=[$3], sr_store_sk=[$7], sr_fee=[$14]): rowcount = 4.198394835000001E7, cumulative cost = {1.4155904670000002E8 rows, 2.8311809440000004E8 cpu, 0.0 io}, id = 1645 HiveFilter(condition=[AND(IS NOT NULL($0), IS NOT NULL($7), IS NOT NULL($3))]): rowcount = 4.198394835000001E7, cumulative cost = {9.957509835000001E7 rows, 1.15182301E8 cpu, 0.0 io}, id = 1643 HiveTableScan(table=[[default, store_returns]], table:alias=[store_returns]): rowcount = 5.759115E7, cumulative cost = {5.759115E7 rows, 5.7591151E7 cpu, 0.0 io}, id = 1040 HiveProject(d_date_sk=[$0]): rowcount = 9861.615, cumulative cost = {92772.23000000001 rows, 155960.615 cpu, 0.0 io}, id = 1650 HiveFilter(condition=[AND(=($6, 2000), IS NOT NULL($0))]): rowcount = 9861.615, cumulative cost = {82910.615 rows, 146099.0 cpu, 0.0 io}, id = 1648 HiveTableScan(table=[[default, date_dim]], table:alias=[date_dim]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}, id = 1043 HiveProject(s_store_sk=[$0]): rowcount = 230.04000000000002, cumulative cost = {2164.08 rows, 3639.04 cpu, 0.0 io}, id = 1659 HiveFilter(condition=[AND(=($24, _UTF-16LE'NM'), IS NOT NULL($0))]): rowcount = 230.04000000000002, cumulative cost = {1934.04 rows, 3409.0 cpu, 0.0 io}, id = 1657 HiveTableScan(table=[[default, store]], table:alias=[store]): rowcount = 1704.0, cumulative cost = {1704.0 rows, 1705.0 cpu, 0.0 io}, id = 1050 HiveProject(_o__c0=[*(/($1, $2), 1.2)], ctr_store_sk=[$0]): rowcount = 6.900492246793088E8, cumulative cost = {8.537206083312463E10 rows, 2.2383508777352882E10 cpu, 0.0 io}, id = 1677 HiveAggregate(group=[{1}], agg#0=[sum($2)], agg#1=[count($2)]): rowcount = 6.900492246793088E8, cumulative cost = {8.468201160844533E10 rows, 2.1003410327994267E10 cpu, 0.0 io}, id = 1675 HiveProject(sr_customer_sk=[$0], sr_store_sk=[$1], $f2=[$2]): rowcount = 6.900492246793088E9, cumulative cost = {8.381945007759619E10 rows, 2.1003410327994267E10 cpu, 0.0 io}, id = 1686 HiveAggregate(group=[{1, 2}], agg#0=[sum($3)]): rowcount = 6.900492246793088E9, cumulative cost = {7.69189578308031E10 rows, 3.01933587615E8 cpu, 0.0 io}, id = 1673 HiveJoin(condition=[=($0, $4)], joinType=[inner], algorithm=[none], cost=[not available]): rowcount = 6.900492246793088E10, cumulative cost = {6.915590405316087E10 rows, 3.01933587615E8 cpu, 0.0 io}, id = 1671 HiveProject(sr_returned_date_sk=[$0], sr_customer_sk=[$3], sr_store_sk=[$7], sr_fee=[$14]): rowcount = 4.66488315E7, cumulative cost = {1.50888813E8 rows, 3.01777627E8 cpu, 0.0 io}, id = 1667 HiveFilter(condition=[AND(IS NOT NULL($0), IS NOT NULL($7))]): rowcount = 4.66488315E7, cumulative cost = {1.042399815E8 rows, 1.15182301E8 cpu, 0.0 io}, id = 1665 HiveTableScan(table=[[default, store_returns]], table:alias=[store_returns]): rowcount = 5.759115E7, cumulative cost = {5.759115E7 rows, 5.7591151E7 cpu, 0.0 io}, id = 1040 HiveProject(d_date_sk=[$0]): rowcount = 9861.615, cumulative cost = {92772.23000000001 rows, 155960.615 cpu, 0.0 io}, id = 1650 HiveFilter(condition=[AND(=($6, 2000), IS NOT NULL($0))]): rowcount = 9861.615, cumulative cost = {82910.615 rows, 146099.0 cpu, 0.0 io}, id = 1648 HiveTableScan(table=[[default, date_dim]], table:alias=[date_dim]): rowcount = 73049.0, cumulative cost = {73049.0 rows, 73050.0 cpu, 0.0 io}, id = 1043

 

 

References:

https://cwiki.apache.org/confluence/display/Hive/Cost-based+optimization+in+Hive

https://cwiki.apache.org/confluence/display/hive/statsdev

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Explain#LanguageManualExplain-TheCBOClause 

REMEMBER: these articles are REPUBLISHED. Your best bet to get a reply is to follow the link at the top of the post to the ORIGINAL post! BUT you're more than welcome to start discussions here:

This site uses Akismet to reduce spam. Learn how your comment data is processed.