Over the last decade, the internet has delivered a huge volume of stock market data into the hands of individual investors who have basic technology skills. As we’ve seen with the rise of individual traders on Robinhood and other brokerage platforms over the last year, investing has become truly democratized. Now individual investors can even create their own personal market data warehouses on cloud-based database platforms at very little cost. In this article, I will show you how to set up a basic Extract, Transform and Load (ETL) process to pull end-of-day stock market exchange data into the Relational Database Service (RDS platform) provided by Amazon Web Services (AWS) using the Python scripting language and the MySQL database language.
The advantages of using AWS for this purpose include the ability to access the raw data from any internet-connected device, run your own custom queries and build your own custom calculations, indices and data visualizations. In this article, we will discuss how to create an initial database of historical market data with automatic daily updates.
At a high level, we will create four Python ETL scripts to feed data into two primary MySQL tables:
I’m assuming the reader has some previous experience working with Python and SQL. If you need a refresher on these languages, I recommend either Udemy or Coursera. Both sites offer excellent tutorials for free or at a minimal cost.
For this project, our primary data source will be Intrinio, a financial market data aggregator based in St Petersburg, FL. In past gigs, I have been both a supplier and customer of Intrinio and found them to be very reliable and responsive. They’re good people. I could have used the Yahoo Finance Python API for free, but I want a stable reliable source that I know will be in business for the long term and that provides support if anything doesn’t work as expected. For individual investors, Intrinio offers data packages, accessible through REST APIs, starting at $200 per month.
The tools used in this project will include Python 3.9, MySQL, AWS RDS (free tier) and AWS S3 to store the data, Jupyter Notebook to assemble the Python scripts, and MySQL Workbench or DBeaver (free edition) to run SQL statements directly. Once the scripts are created, we will use Prefect.io to set them up on automatic update schedules. All of these tools have free versions which are plenty powerful enough for this project.
Initially, we will set these processes up on a personal computer. You will need a powerful one with plenty of DRAM memory because we will be working with some large datasets, containing over 40 million rows of data. A high bandwidth internet connection is also necessary to reduce the time it takes to move this data from source to machine and then to AWS. I happen to be using a Macintosh desktop with 64 gigs of RAM which gets the job done. I have a 500 Gbps fiber optic internet connection, which is very helpful, but a regular 100 Gbps cable connection should work as well.
There are about a dozen Python libraries that I use to accomplish the various tasks in this process. All of them are free, or have free versions available. Some simply require a Python import statement in your Jupyter Notebook. A few require installation, usually with PIP (“pip install <library name>” in Jupyter). I have posted a list of them, along with links to documentation and installation instructions, in a text file on my Github account.
So now, on with the show. The first order of business is to set up your Amazon Web Services account and your own RDS instance and S3 bucket. Instead of going through that process in detail here, I’m going to refer you to a few Youtube videos:
First, for security purposes, we want to create a credentials file in a secure place on your hard drive that contains all your passwords and API keys. There are many ways to do this, but I think the easiest way is to create a CSV or JSON file. Here’s an example of a credentials file I created in JSON, which is really just a dictionary of key:value pairs saved as a *.json file.
{"file_path": <my file path>,
"intrinio_key": <my Intrinio API key>,
"aws_access_key": <my AWS access key>,
"aws_secret_key": <my AWS secret key>,
"rds_host": <my RDS host URL>,
"rds_user": <my RDS user name>,
"rds_password": <my RDS password>,
"rds_database": <my RDS database name>,
"rds_charset": <my RDS characterset code>}
You will open this file at the beginning of each Python script using a json.load command, then map it to variables as shown here:
https://gist.github.com/bnsheehy/00e2e439882e10b515f6bfb8332be65e
Next, we want to create the Python scripts that will run the ETL process. I’m going to describe these scripts in general terms here, with a few sample code snippets. The full code files are available in my Github repository here:
https://github.com/bnsheehy/Stock-Market_Data_ETL
The first script we want to create would pull historical e-o-d market data for US stocks, ETFs and ADRs from Intrinio, clean it up a bit, assemble it into a Python DataFrame, archive it on AWS S3, then push it into an RDS MySQL table.
Price History Table - The modules are set up to execute the following steps:
There are two ways to do this kind of iteration. You can use a regular Python For Loop which will take a couple of hours to go through thousands of instruments, or you can use the Concurrent Futures tool (ThreadPoolExecutor) for Python, which will distribute the work over several “worker’ processors, hitting the API several times at once and cutting the query time by about one third.
One caveat however, some data vendors impose rate limits on their APIs, which will limit the number of times that you can query them per second, minute, hour, etc. If that happens, you can either negotiate an upgrade to the rate limit, or install a small “sleep” function (“time.sleep(<decimal fraction of second>)”) in your code to stay under the rate limit. It just means that your script might take a little longer to finish.
For mapping the right data to the right instrument, I found it more reliable to use the Bloomberg Financial Instrument Global Identifier (FIGI) codes rather than trading symbols or “tickers”. As stocks get delisted and relisted, their tickers can be reused over time, which causes no end of grief for market data archivers. FIGI codes are uniquely mapped to each company and each of its trading instruments, they are never reused, and they are available to use free of charge. Intrinio maintains FIGI codes for each instrument they track in their data system.
AWS provides a library, Boto3, that facilitates moving files in and out of their S3 storage system. Amazon’s Boto documentation is extensive, but hard to follow. For a quick start tutorial, I found this site, RealPython.com to be more helpful.
For those who don’t know about the parquet file format, it is designed to feed into “column-oriented” databases such as Amazon’s Redshift database. Column-oriented databases like Redshift are designed to handle very large datasets, consisting of many columns and billions of rows. Without going into all the details, the primary advantage of these databases is that they allow you to run queries much faster, which is facilitates more advanced machine learning algorithms. So down the road, if you want to start running complicated algorithms against this data, saving it in a parquet format will allow you to easily import it into Redshift or other “Big Data” platforms for more creative analyses. Here’s a notebook that you might want to check out to compare the performance of the most popular file storage formats - CSV, parquet and HDF5.
Since you may only need to do this one time, you could just save the price history data to a large CSV file and use a couple of SQL statements to create the table and upload the data manually. These commands would do the trick in MySQL Workbench or DBeaver:
Create the Price History table:
https://gist.github.com/bnsheehy/1dad68b5e88ac92c1ef52f53a44c916a
Load the data:
https://gist.github.com/bnsheehy/e142066481fa0d0ed240b01712604629
If you have a fat fiber optic internet pipe like I do, this operation could take as little as 20 minutes. Or if you have a more typical cable pipe with slower upload speeds, it could take several hours. In total, you’d be loading about 12 Gb of data.
Once the data is loaded, you want to index several of the fields that you will probably be using in queries more often. I suggest that you index the ticker, figi, date, last_update_date, last_corporate_action_date, and key_id fields. Indexing will dramatically speed up queries that filter results using these data fields. The key_id field is a concatenated combination of date, ticker and FIGI code. You could use these SQL commands to create the indexes:
Index the fields:
https://gist.github.com/bnsheehy/e1e3ed23425d7d58b3033f6069c89d57
Finally, I suggest you also create a full backup price history table, again just in case something gets screwed up and you need a clean version. These SQL commands would do that trick:
Create backup table:
https://gist.github.com/bnsheehy/1d9409a10c83167f33c6b29ac6375fb4
Now once you’re done with this table, I would suggest you run this process and replace the price history table entirely about once a quarter. Daily updates are great and pretty reliable, but Murphy’s Law says some details will be missed. In that case, you might want to run the whole thing from a Python script and not rely on manual SQL commands. For that purpose, I included a module in the Notebook that uses the SQL Alchemy library to execute the SQL commands above straight from the Jupyter Notebook interface. It is slower, but more automated.
Again, that Notebook is located at https://github.com/bnsheehy/Stock-Market_Data_ETL.
The next step is to create two scripts, one to update the market data for each new trading day and the other to adjust the data for stock splits and dividends.
Unlike other kinds of data, historical stock prices must constantly be readjusted each time a company issues a stock split or pays a dividend. Otherwise, if you plot the historical data on a time-series graph, you will see sudden ups and downs in the chart that do not reflect actual price movements.
Here’s an example of what unadjusted data can look like if you don’t adjust it for splits and dividends:
The red line in the chart above is the historical unadjusted stock price for AAPL from Jan 2019 forward. The yellow line is the adjusted stock closing price. As we know, AAPL executed a stock split on 4 to 1 stock split on Aug 28, 2020. The unadjusted price went from about $500 per share to about $125 per share in one day. But we know that AAPL’s stock did not lose 75% of its value that day. All the company did was increase the nominal shares outstanding while lowering the price per share proportionately. The market cap (total value) for the stock stayed the same. But if we did not back-adjust our historical share price records for AAPL accordingly, we would see a big drop in the share price on Aug 28 which never actually happened.
Stock Price Updates - The modules in this script are set up to execute the following steps:
The max number of records pulled per query is 10,000, and this endpoint will actually return 20K+ records. So I have written the code with a While Loop to run multiple queries using the next_page parameter if needed (“ while next_page != None:”), just to make sure we get all the data that we actually want. Then we filter the returned records to just the equity, ETF and ADR instruments we actually want.
Intrinio does a pretty good job of filtering a lot of that crap out, but no data vendor is perfect. ;-) Remember, capital markets data is inherently messy, and there is no data vendor who does a perfect job of cleaning it up. So you will need to be prepared to code workarounds on your side when you find data errors from time to time.
The full code is in a Jupyter Notebook in my Github repository at https://github.com/bnsheehy/Stock-Market_Data_ETL.
So as of this writing, I ran the Stock Price Update script yesterday, 12/05/2021. Here are the results:
Note that I waited nine days between updates, and four of those were weekend days. So the updated data covered five trading days. With about 8,000+ active tickers, the total number of new rows added should be more than 40,000. In fact, the actual number of new rows is 42,774 as shown in the Jupyter Notebook screenshot above. So let’s check to see if that number is reflected in the database by running the following SQL commands in our MySQL IDE:
Once the price history data is updated, we need to find out which instruments have issued stock splits or dividends since the last corporate actions update and grab the newly adjusted price history data for those instruments.
Corporate Actions Updates - The modules in this script are set up to execute the following steps:
By the way, each time we run a query to fetch data for a list of instruments, we are using the FIGI codes as the primary identifier, not the trading tickers, as mentioned earlier. Trading symbols are not always unique to each instrument, whereas FIGI codes are unique.
Since the list of adjusted instruments can be kind of large, the dataset returned by this query can be larger than allowed by the maximum number of rows permitted in the AWS Free Tier pricing level, I included a For Loop that breaks the query up into smaller chunks to avoid a query time-out problem.
First, delete the old data, which we have already saved separately in an S3 bucket. Second, insert the newly adjusted data into the price history table. We can use the PyMySQL library for this job since it seems to handle larger data loads a little better than the SQL Connector library does.
Once again, the full code for this script is in a Jupyter Notebook in my Github repository at https://github.com/bnsheehy/Stock-Market_Data_ETL.
The last step in this project is to create and update what I call a Security Information table. It will contain metadata for all the stocks, ADRs and ETFs that we want to track, including which ones are actively traded or delisted and the sectors and industries that each company belongs to. We will want to use this information for sorting and screening as we create investing strategies from this data later on.
Before you start on the Python script, you want to create the Security Info table in RDS with the columns already defined. You can do that by running this SQL command in MySQL Workbench or DBeaver:
Create the Security Info table:
https://gist.github.com/bnsheehy/2c7cb11dd891c3d6452d32b00b187d7e
You actually want to run this SQL command twice, to create two tables - security_info and security_info_old. I’ll explain why below.
Security Info Table - The modules in this Python script are set up to execute the following steps:
This is the one instance where we key on ticker rather than FIGI code. The objective is to maintain a Security Info table where each record is flagged as active_status = 0 for dead or 1 for active. At any given time, we want the trading tickers to be unique to each actively traded security since that’s what we will be searching for in trading indicators we develop later on.
We still want to keep data for the newly deceased tickers though because we will want to use that data in strategy back-testing routines to minimize look-ahead bias. So we will set the active_status column to 0 for the dead tickers and add them back to the active ticker data before we update the table.
Also in this module, we make sure all the date columns are properly set to the DateTime data type to be SQL friendly on the upload. In fact, all date fields need to be set to the DateTime format to be SQL friendly.
I recommend running this Securities Info routine once a week to capture new IPOs and flag dead tickers on a regular basis.
We have now created our ETL processes to harvest stock market data from Intrinio to our very own stock market data warehouse in the AWS cloud. But we’re not quite done yet. We still need to automate the update processes from start to finish.
For that job, we will use a new data pipeline automation application called Prefect. Prefect works well with Python and offers a free tier for small jobs like this one. Prefect works by analyzing the deliverables generated by each Python function in our scripts and the dependencies between them. Then it creates what’s called a Directed Acyclic Graph (DAG) model which determines the order in which our modules need to be executed. Then Prefect adds a flexible scheduling service that lets us determine when and how often each process should run. I won’t get deep into the Prefect instructions here, and would instead encourage you to browse through Prefect’s tutorial docs when you get a chance.
With Prefect, the key is to think about what variables and objects are passed from one function to the next in the scripts. Then make sure those items are included in the “return” statement at the end of the function where they are created, and in the parentheses part of the function declaration statements where they are going to be used. Also, it is a good idea to declare all variables and objects as “global” in the functions where they are created. That makes troubleshooting easier if/when errors might pop up.
For example in this function, the df_price_update_total DataFrame is passed in from the previous function, a new DataFrame, df_shares_out, is created and returned for use in the next function. In addition, both the new DataFrame and an interim NumPy array, shares_out_lists_combined, are declared as global variables.
Prefect should then recognize the objects passed into and returned by each function to determine the workflow task order. But just to make sure, you can directly control the task order by inserting “upstream_tasks = [<preceding task>] “ and specifying each task dependency in this parameter. For an example of how to do this, see the Triggers section of the Prefect docs.
If the Prefect jobs run successfully, you should see results that look like these Jupyter Notebook screenshots:
Stock Price Update:
Corporate Actions Update: