Processing TAQ Consolidated Data
March 15, 2024
For a recent revision, I have had to dig into TAQ data. This is something relatively new to me, but I had to figure out some statistics out of the consolidated trade data. I did not want to use SAS because I do not have a PC running windows, and SAS for linux is very painful. So here is a method for getting annoying statistics from annoyingly large data.
The goal was to estimate the order imbalance volatility on the U.S. stock market. Order imbalance is defined as the difference between volume bought and sold scaled by the total volume exchanged:1
OIBNUM
: the number of buyer-initiated trades less the number of seller-initiated trades on day t.OIBSH
: the buyer-initiated shares purchased less the seller-initiated shares sold on day t.OIBDOL
: the buyer-initiated dollars paid less the seller-initiated dollars received on day t.
It is too onerous (memory) to process everything at once, the goal of this guide is to show how to parallelize the process of working with this data stock by stock.
Data #
For this I worked with the Trade and Quote (TAQ) data. Specifically, I downloaded consolidated trades data which is available from 1993 to 2014 directly from WRDS.2
I downloaded the data year by year which took a little while given the size of each individual year (a gzipped extract in 2008 is above 26Gb and expands to 300Gb). Once downloaded, I reuploaded all the data to a s3 bucket where I have easy access to it.
The data includes a symbol for the stock id (there is a match table to permno on WRDS), a price, and a size.
Preprocessing #
I will process one year of data at a time — technically, the algorithm for estimating whether a trade is a buy or sell could suffer from this, but I think the error is minimal here.
The preprocessing happens using a standard shell (bash or zsh).
Defining variables #
It is important to define a few variables for the shell
- The relevant year:
DATEY=2006
- The number of processors available:
nprocs=128
- The memory available
mem_alloc=512
(for example if you have 512G of ram available) - A directory with lots of space (~1Tb) where you can expand and work with the data:
TAQ_HOME="/big_space_disk/"
Download and expand the data #
First I download the data from the s3 bucket using s5cmd
as:3
input_file="TAQ_name_$DATEY"
s5cmd cp s3://my-bucket/TAQ/$input_file.csv.gz $TAQ_HOME/
Then I expand the data; I take advantage of multicore expansion using pigz
.
First I peak at the data to check that the data has the correct columns and that the year is correct:
pigz -cd $TAQ_HOME/$input_file.csv.gz | head -n 5
SYMBOL,DATE,TIME,PRICE,SIZE,G127,CORR,COND,EX
A,2006-01-03,8:00:07,33.29,30000,0,0,U,T
A,2006-01-03,8:08:01,33.29,8300,0,0,T,T
A,2006-01-03,8:17:28,33.29,8600,0,0,T,T
A,2006-01-03,9:30:22,33.4,96200,40,0,,N
Once I have confirmed that I am looking at the right thing I expand the whole file:
pigz -cd $TAQ_HOME/$input_file.csv.gz > $TAQ_HOME/taq_select.csv.gz
For efficiency (untested), I have actually use the binaries from xsv
4 to select only the columns from the file that I needed.
Simply replace the previous command with:
pigz -cd $TAQ_HOME/$input_file.csv.gz | \
xsv select "SYMBOL,DATE,TIME,PRICE,SIZE" > $TAQ_HOME/taq_select.csv
Note that this is fairly slow and can take upwards of 20 minutes even reading the file from the ram disk.
Sort the data #
In theory the data coming out of WRDS are already sorted by “SYMBOL”.
But the strategy relies heavily on this step being accurate and it is always a good thing to learn how to use the very fancy unix sort
function.
We want to split the data in chunks, one chunk for each symbol. It is a lot easier to do once the data is ordered by symbol!
First we pipe the data without the header, then we use sort
based on the first column and allow for parallel processing.
The one thing to watch for is memory usage (this will eat pretty much of all your memory and get your job killed if you don’t watch it).
So we define an upper bound for memory usage.
Given the memory allocation defined above, we restrict sort
to only use 80% of it:
mem_for_sort=$(echo "${mem_alloc%G} * 0.8 / 1" | bc) # bc is the bash calculator
/usr/bin/time tail -n +2 $TAQ_HOME/taq_select.csv | \
sort -t, -k1,1 --buffer-size="${mem_for_sort}G" --parallel=$nprocs --temporary-directory=$TAQ_HOME > $TAQ_HOME/taq_sorted.csv
This can take a while. I have waited close to one hour for the largest files (128 cores, 490Gb of memory allocated).
Note: you can clean up the non sorted file to save a little bit of space at this stage rm $TAQ_HOME/taq_select.csv
Split the data #
Last of the preprocessing, we split the data in chunks: one chunk for each symbol/stock.
Given the sorted nature of the data, this is a straightforward (I only spent a day figuring it out using chatGPT) application of awk
First we make some room for the chunks by giving them their own directory
TAQ_CHUNKS="$TAQ_HOME/chunks/"
mkdir -p $TAQ_CHUNKS
Then we process the whole thing using awk
cat $TAQ_HOME/taq_sorted.csv | \
awk -v chunkDir="$TAQ_CHUNKS" -F, '
{
if (last != $1) {
if (last != "") close(chunkDir "/chunk_" last ".csv");
last = $1;
}
print > (chunkDir "/chunk_" $1 ".csv");
}'
The script reads the whole file line by line; it checks the first column (the “SYMBOL” column), if it is equal to the first column of the previous line, it appends the line to the file, if not it moves to create a new file. The created files are named based on the first column.
This process is sequential and can be quite slow (close to one hour for the largest file).
Processing of stock specific statistics in julia #
We have close to 10,000 files in $TAQ_CHUNKS
ready to be read one by one and processed.
For this we are going to do some standard data processing in julia — I compute OIB here, but this would for everything else that is at the stock-level.
If you have access to multiple cores, it makes sense to process this in paralle. It is fairly easy to implement; you still need to be careful not to trigger segfaults though.
Preamble #
My preamble is pretty standard and only use basic julia DataFrame
stuff.
using CSV
using DataFrames, DataFramesMeta
using Dates, PeriodicalDates
using Pipe: @pipe
import ShiftedArrays: lag
using Statistics
import StatsBase: std
@info Threads.nthreads() # usefule for parallelism later on
# To ingest the command line parameter I pass to the script
datey = ARGS[1]
TAQ_CHUNKS = ARGS[2] # TAQ_CHUNKS="/scratch.global/eloualic/taq/chunks"
We are going to process each symbol one by one and store the statistics in a table.
file_list=readdir(TAQ_CHUNKS);
n_files = length(file_list)
df_oib_vol_array = Vector{Union{DataFrame, Nothing}}(nothing, n_files);
Threads.@threads for i_f = 1:n_files
# read the file for one stock
df_taq_symbol = ingest_file("$TAQ_CHUNKS/$file_in"; verbose=verbose)
# create the trade sign flag
df_taq_symbol = create_trade_sign!(df_taq_symbol)
# create the order imbalance statistic
df_oib = create_oib(df_taq_symbol)
# get the volatility of the order imbalance
df_oib_vol_array[i_f] = create_oib_vol(df_oib)
end
The most important step is how we read the file.
Since this is happening inside a parallel loop, we need to make sure CSV.read
only happens on a single thread.
function read_file(file_in::AbstractString; verbose=false)
df_taq_symbol = CSV.read(file_in, DataFrame, header=false, ntasks=1);
rename!(df_taq_symbol, [:symbol, :date, :time, :price, :size]);
end
Next we follow Chordia et al. to estimate whether the trade is a buy or a sell order. Basically we compare the current price to previous prices (up to a lag of 5). This is the step that is the slowest when working with the whole dataset at a time.
function create_trade_sign!(df_taq_symbol::DataFrame)
@transform!(groupby(df_taq_symbol, :symbol),
:l1_price=lag(:price), :l2_price=lag(:price, 2),
:l3_price=lag(:price, 3), :l4_price=lag(:price, 4),
:l5_price=lag(:price, 5));
@rtransform! df_taq_symbol @passmissing :trd_sgn =
:price > :l1_price ? 1 :
:price < :l1_price ? -1 :
:price > :l2_price ? 1 :
:price < :l2_price ? -1 :
:price > :l3_price ? 1 :
:price < :l3_price ? -1 :
:price > :l4_price ? 1 :
:price < :l4_price ? -1 :
:price > :l5_price ? 1 :
:price < :l5_price ? -1 :
missing
return df_taq_symbol
end
The other two functions are fairly straightforward and not particularly interesting
function create_oib(df) #; to::TimerOutput=to)
symbol_var=df[1,:symbol]
dropmissing!(df, :trd_sgn)
nrow(df)==0 && return DataFrame(symbol=symbol_var, date=missing) # some do not have valid signed trades
df_oib = @combine(groupby(df, [:date,:trd_sgn]),
:oib_shr_sign=sum(:size), :oib_num_sign=length(:size)) |>
(d -> @transform!(groupby(d, :date),
:oib_shr_tot=sum(:oib_shr_sign), :oib_num_tot=sum(:oib_num_sign)) )
@transform!(groupby(df_oib, :date),
:oib_shr_ratio=sum(:trd_sgn .* :oib_shr_sign) ./:oib_shr_tot,
:oib_num_ratio=sum(:trd_sgn .* :oib_num_sign) ./:oib_num_tot,
:symbol=symbol_var)
return df_oib
end
function create_oib_vol(df) #; to::TimerOutput=to)
symbol_var=df[1,:symbol]
dropmissing!(df, :date)
nrow(df)==0 && return DataFrame(symbol=symbol_var,
datem=missing, oib_shr_vol=missing, oib_num_vol=missing)
df_oib_vol =
unique(@rselect(df, :date, :datem=MonthlyDate(:date), :symbol, :oib_shr_ratio, :oib_num_ratio)) |>
(d -> @combine(groupby(d, [:symbol, :datem]),
:oib_shr_vol=std(:oib_shr_ratio), :oib_num_vol=std(:oib_num_ratio)) )
return df_oib_vol
end
I don’t run the code interactively (see the arguments passed above). The command I pass reads:
$ julia -t $nprocs import_taq_chunks.jl $DATEY $TAQ_CHUNKS &> import_taq_chunks.log.jl
SLURM Specifics #
If you are running on the job on a cluster with the slurm scheduler, I have attached the commands I have used to make my life easier.
-
Chordia, Roll, Subrahmanyam (2002): Order imbalance, liquidity, and market returns, Journal of Financial Economics: 65 ↩︎
-
I could not find the relevant postgres database for the product, so I processed everything through webqueries. The product is
taq_common
, library istaq
, and file isct
. ↩︎ -
s5cmd
is faster thans3cmd
, but in the grand scheme of things here this is not going to matter very much. ↩︎ -
I tried installing the more recent
qsv
but could not compile it properly on the Minnesota Supercomputing Institute MSI cluster. ↩︎