I have a need for an ETL (Extract, Transform, and Load) application for a project I am working on. Since the E and T parts are pretty simple, I didn't want to go out and use one of the many open source ETL frameworks. I'm always looking for a challenge, so I also thought it would be a good exercise for me to see if I can test my SQL knowledge to design the SQL queries to do Load part of the ETL framework myself. Think of it as a poor man's Business Intelligence system that I'm designing.
The Load part of an ETL system can be difficult to design, you must consider:
- the ability to scale well with number of records
- to perform within processing time constraints
- to not put the database in a bad state during the huge database operations
- to maintain data integrity by keeping in sync with any corrections in the data source
I have been operating for a while without any sort of database driven solution by using massive CSV files created from the data source queries. It was practical to prototype using CSV files, but a better designed, better performing long term solution is needed. Also, I'm getting to the point where those CSV files are so big and take up a lot of memory when slurping them into my perl scripts, that I'm running upon RAM limits on the server. Not good at all!
The Load process gets complicated because the individual records I pull in from the data source might be changed or deleted since last time, so I need to be sure to check my data extract against the current stored data, and UPDATE any existing records. Essentially, corrections to the data can happen at any point in the past, and I need to catch them and make sure the data reflects it. Since our data is driven by dates, this means I need to do big queries over large lengths of time. I contemplated a DELETE of all data, and then query it all fresh and INSERT, but I might as well just use CSV files then. We can do better!
So knowing this, the operations that need to be done on the data extract retrieved from the data source are the following:
- Check stored data against extract to delete current stored records that no longer exist
- Check stored data against extract to change current stored records that exist
- Check for records in extract that do not exist in the stored data and add them
It is important to have all of these operations in place to guarantee the stored data is correct.
To accomplish the Load part, I thought it best to use a database table architecture where I have a staging table, and a main database table. I decided on this because I didn't want to be using perl to query the stored data and comparing to the extracted data from the data source. I wanted to keep the perl script relatively simple and try to do as much stuff in smarter SQL queries to avoid having lots of individual INSERT/UPDATE/DELETE statements hitting the server from my perl script. So getting the data up to the staging table is the first step, then running SQL queries on the staging table to modify the main table is the next step.
The data from the extract will be manipulated (transformed) in the perl script, then will be loaded into the staging table, which is an exact copy of the structure of the main table.
To allow the extracts from the data source take less time, I also wanted to be able to pull only certain time ranges and do the Load incrementally. This works since my stored data is based on time.
But since everything can't be simple, I also need to mention that my data is designed where the unique identifiers are from two different columns, one of which is a measurement of time.
So the data is actually laid out like this:
- keya - Integer
- keyb - Integer
- valuea - Integer
- valueb - Integer
To start spewing out some SQL code, once I load the data into the staging table (in table main_stage), to be able to UPDATE the stored data (in table main) with the staging data for records that exist in both, this is the query.
UPDATE main AS m INNER JOIN main_stage AS s USING (keya,keyb) SET m.valuea=s.valuea, m.valueb=s.valueb
That will only update records in table main if the keys match in the table main_stage.
Now, I want to be able to INSERT records into the table main from main_stage where the keys appearing in main_stage do not appear in main. Here is the SQL for that:
INSERT INTO main SELECT * FROM main_stage AS s WHERE NOT EXISTS (SELECT * FROM main AS m WHERE m.keya = s.keya AND m.keyb = s.keyb)
So now you have handled updating existing records, and inserting new ones, so you can empty the staging data table.
TRUNCATE TABLE main_stage
So after all this, you may have noticed that I didn't handle the part about deleting records in my stored data (table main), that didn't exist in the staged data. Because I often will be loading the data into the staged table by date range, I wanted to cover that separately, even though it needs to occur before you TRUNCATE the staged table.
Here is the SQL, assuming that <criteria> is the criteria that you used to select a date range to load the staging table.
DELETE FROM main WHERE <criteria> AND NOT EXISTS (SELECT * FROM main_stage AS s WHERE main.keya = s.keya AND main.keyb = s.keyb)
So for all of the SQL in one block, here is the sequence:
DELETE FROM main WHERE <criteria> AND NOT EXISTS (SELECT * FROM main_stage AS s WHERE main.keya = s.keya AND main.keyb = s.keyb)
UPDATE main AS m INNER JOIN main_stage AS s USING (keya,keyb) SET m.valuea=s.valuea, m.valueb=s.valueb
INSERT INTO main SELECT * FROM main_stage AS s WHERE NOT EXISTS (SELECT * FROM main AS m WHERE m.keya = s.keya AND m.keyb = s.keyb)
TRUNCATE TABLE main_stage
To meet some of the criteria where you don't want to have anything cause the database tables to be in an intermediate state of not being correct, it is best to do the statements above as Transactions, so that only if they all succeed, do the operations actually take place. Here is an example of perl code, running these queries, that has transactions enabled.
my $dsn = "dbi:mysql:dbtest:localhost:3306";
my $oDB = DBI->connect($dsn,"myusername","mypassword") or die "Unable to connect, error: $DBI::errstr\n";
# now process the staging table, using transactions
my $success = 1;
my $result = 1;
# turn autocommit off
$oDB->{'AutoCommit'} = 0;
if ($oDB->{'AutoCommit'}) {
die "AutoCommit disable failed\n";
}
# if any records in main do not exist in staging table, delete the records in main
my $main_delete = "DELETE FROM main WHERE <criteria> AND NOT EXISTS (SELECT * FROM main_stage AS s WHERE main.keya = s.keya AND main.keyb = s.keyb)";
my $main_delete_handle = $oDB->prepare($main_delete);
$success &&= $main_delete_handle->execute();
# if any records in main match what is in staging table, update the records in main
my $main_update = "UPDATE main AS m INNER JOIN main_stage AS s USING (keya,keyb) SET m.valuea=s.valuea, m.valueb=s.valueb";
my $main_update_handle = $oDB->prepare($main_update);
$success &&= $main_update_handle->execute();
# what remains in staging table are new records, so insert any records from staging table into main
my $main_insert = "INSERT INTO main SELECT * FROM main_stage AS s WHERE NOT EXISTS (SELECT * FROM main AS m WHERE m.keya = s.keya AND m.keyb = s.keyb)";
my $main_insert_handle = $oDB->prepare($main_insert);
$success &&= $main_insert_handle->execute();
# delete the remainder of the staging table data
my $main_delete_insert = "TRUNCATE TABLE main_stage";
my $main_delete_insert_handle = $oDB->prepare($main_delete_insert);
$success &&= $main_delete_insert_handle->execute();
# determine whether to commit or not
$result = ($success ? $oDB->commit : $oDB->rollback);
unless($result) {
die "Couldn't finish transaction: " . $oDB->errstr . "\n";
}
# turn autocommit back on
$oDB->{'AutoCommit'} = 1;
if (!$oDB->{'AutoCommit'}) {
die "AutoCommit enable failed\n";
}
$oDB->disconnect;