Building off our Simple Examples Series, we wanted to take five minutes and show you how to recognize the power of partitioning.
For a more detailed article on partitioning, Cloudera had a nice blog write-up, including some pointers.
The Cardinality of the Column
One of the pointers that should resonate is the cardinality of the column, which is another way of saying how many directories/files you are going to partition across. With the way HDFS works, each file’s meta-data is loaded into memory on the Name Node. So if you end up with too many files in HDFS, you end up with memory capacity problems. This is also called the “Small Files Problem” in Hadoop.
In this example, we’re going to end up with about 350 directories with one small file in them. In the real world, the file sizes are specifically something that would not follow a best practice. Ideally, we want them equal or above your minimum block size (e.g. 128MB).
The first thing we need to do is create a partitioned table. This should be almost just like creating the ORC table in our previous example, but with the PARTITIONED BY command. What we’re saying here is that we want all the rows in a day, separated out in a separate directory and file(s).
So when you issue Hive, it doesn’t have to scan an entire data set.
CREATE TABLE temps_orc_partition_date (statecode STRING, countrycode STRING, sitenum STRING, paramcode STRING, poc STRING, latitude STRING, longitude STRING, datum STRING, param STRING, timelocal STRING, dategmt STRING, timegmt STRING, degrees double, uom STRING, mdl STRING, uncert STRING, qual STRING, method STRING, methodname STRING, state STRING, county STRING, dateoflastchange STRING) PARTITIONED BY (datelocal STRING) STORED AS ORC;
Next, we need to override some default settings. “Dynamic” partitioning is disabled by default to help avoid cutting your fingers off. See the pointer in Cloudera’s blog above on “Cardinality” for why. In our world, we’re pretending we know what we’re doing so we’re going to override the values.
The “pernode” setting defaults to 100. This is being done on a very small cluster so I had to update this.
SET hive.exec.dynamic.partition = true; SET hive.exec.dynamic.partition.mode = nonstrict; SET hive.exec.max.dynamic.partitions.pernode = 400;
Now, let’s load some data. We are inserting data from the temps_txt table that we loaded in the previous examples. The big difference here is that we are PARTITION’ed on datelocal, which is a date represented as a string. E.g. “2014-01-01”. The one thing to note here is that see that we moved the “datelocal” column to being last in the SELECT.
For dynamic partitioning to work in Hive, this is a requirement.
INSERT INTO TABLE temps_orc_partition_date PARTITION (datelocal) SELECT statecode, countrycode, sitenum, paramcode, poc, latitude, longitude, datum, param, timelocal, dategmt, timegmt, degrees, uom, mdl, uncert, qual, method, methodname, state, county, dateoflastchange, datelocal
Lastly, let’s see some results. In my very small cluster, the partition query was taking half the time of the non-partitioned query. Hopefully, this simple example helps show why partitioning matters
SELECT * FROM temps_orc_partition_date WHERE degrees = 25.0 AND datelocal = '"2014-01-01"' AND county = '"Campbell"' AND state = '"Wyoming"'; SELECT * FROM temps_orc WHERE degrees = 25.0 AND datelocal = '"2014-01-01"' AND county = '"Campbell"' AND state = '"Wyoming"'