A look at the service sector, according to the Quarterly Services Survey from the US Census Bureau.
import pandas as pd
import altair as alt
import numpy as np
import re
from joblib import Memory
import datetime
today_date = datetime.date.today()
currentYear = today_date.year
currentQuarter = int(pd.PeriodIndex([today_date], freq='Q').strftime('%q').values[0])
memory = Memory('data/', verbose=0)
# https://www.census.gov/services/qss/historic_data.html
urlFor = lambda y, q: 'http://www2.census.gov/services/qss/{0}/all_{0}q{1}.xls'.format(y, q) if y < 2019\
else 'http://www2.census.gov/services/qss/{0}/all_{0}Q{1}.xls'.format(y, q) if y < 2020 or (y < 2020 and q == 1)\
else 'https://www2.census.gov/services/qss/{0}/all_{0}Q{1}.xlsx'.format(y, q)
shouldInclude = lambda y, q: (y < currentYear) or ((y == currentYear) and (q < max([1, currentQuarter - 2])))
# [1:] -> omit Q1 2004, :-1 -> omit QN 20XX
urls = [(urlFor(y, q), y, q) for y in range(2004, currentYear + 1) for q in range(1, 5) if shouldInclude(y, q)][1:]
# TODO - try to automate this
# Manually append the most recent reporting period
#urls = urls + [('https://www.census.gov/services/qss/qss-current.xls', 2020, currentQuarter - 1)]
from time import sleep
from urllib.error import HTTPError
isCurrent = lambda y, q: (y == currentYear) and (q == (currentQuarter - 1))
# Depending on the year and quarter, the most useful sheet name
# evolves from table1 -> table1B -> table1b -> tableA1
def sheetNameFor(y, q):
if isCurrent(y, q):
return 'tableA1-{0}Q{1}'.format(y, q)
if y <= 2007:
return 'table1-{0}Q{1}'.format(y, q)
elif y == 2008 and q != 4:
if q != 4:
return 'table1-{0}Q{1}'.format(y, q)
else:
return 'table1B-{0}Q{1}'.format(y, q)
elif y < 2016:
return 'table1B-{0}Q{1}'.format(y, q)
elif y == 2016:
if q != 4:
return 'table1B-{0}Q{1}'.format(y, q)
else:
return 'table1b-{0}Q{1}'.format(y, q)
else:
return 'table1b-{0}Q{1}'.format(y, q)
#@memory.cache
def tryFetch(u, retries=5):
if retries < 0:
return None
sleep(1)
try:
return (u[1], u[2], pd.read_excel(u[0], sheet_name=sheetNameFor(u[1], u[2])))
except HTTPError as e:
#print(e)
return tryFetch(u, retries-1)
except Exception as e:
#print(f"failed: {u}")
return None
dfs = [tryFetch(u, retries=5) for u in urls]
if currentQuarter == 0:
NOOP = 1
elif currentQuarter == 1:
try:
# https://www.census.gov/services/qss/qss-current.xlsx
q = 4
current = pd.read_excel('https://www.census.gov/services/qss/qss-current.xlsx',
sheet_name=sheetNameFor(currentYear - 1, q), engine='openpyxl')
except Exception:
q = 3
current = pd.read_excel('https://www.census.gov/services/qss/qss-current.xlsx',
sheet_name=sheetNameFor(currentYear - 1, q), engine='openpyxl')
dfs.append((currentYear - 1, q, current))
else:
try:
# https://www.census.gov/services/qss/qss-current.xlsx
q = currentQuarter - 1
current = pd.read_excel('https://www.census.gov/services/qss/qss-current.xlsx',
sheet_name=sheetNameFor(currentYear, q), engine='openpyxl')
except Exception:
q = currentQuarter - 2
try:
current = pd.read_excel('https://www.census.gov/services/qss/qss-current.xlsx',
sheet_name=sheetNameFor(currentYear, q), engine='openpyxl')
dfs.append((currentYear, q, current))
except Exception:
pass
# Depending on the year and quarter, we need to skip differing numbers of "header" rows
def indexFor(y, q):
if y < 2006:
return 6
elif y == 2006 and q < 2:
return 6
else:
return 4
revenues = [(d[0], d[1], d[2].iloc[indexFor(d[0], d[1]):, 0:8]) for d in dfs if d is not None]
#%%time
# To make the various NAICS codes align, we have to cleanup certain codes and mark aggregate categories accordingly
def remapCategory(code):
if 'pt' in code:
return re.sub('[^\d]+', '', code) + 'pt'
elif code in ['11', '21', '22', '23', '31-33', '42', '44-45', '48-49', '51', '52', '53',\
'54', '55', '56', '61', '62', '71', '72', '81', '92']:
return code + 'c'
else:
return code
cleanIt = lambda s: re.sub('(,$|,\sand.*$)', '', re.sub('\s?\(.*$', '', re.sub('\s*\d+$', '', re.sub('\s+', ' ', s.replace('…', '').replace('.', '').strip()))))
# Depending on the year and quarter we need to provide consistent column headers looking back in time from current
def getPrevious(y, q):
allLastYear = ['{0}Q{1}'.format(y-1, n+1) for n in reversed(range(4))]
if q == 1:
return ['{}Q1'.format(y)] + allLastYear
elif q == 2:
return ['{}Q2'.format(y)] + ['{}Q1'.format(y)] + allLastYear[:-1]
elif q == 3:
return ['{}Q3'.format(y)] + ['{}Q2'.format(y)] + ['{}Q1'.format(y)] + allLastYear[:-2]
elif q == 4:
return ['{0}Q{1}'.format(y, n+1) for n in reversed(range(4))] + ['{}Q4'.format(y-1)]
crevs = []
for y, q, df in revenues:
tmp = df.copy()
try:
# "Current" report has different columns than historical ones
if isCurrent(y, q):
#print(y, q)
#print("Is Current")
tmp = tmp.iloc[:, :5]
tmp.columns = ['Code', 'Business'] + getPrevious(y, q)[:-2]
else:
tmp.columns = ['Code', 'Business', '{0}Q{1}YTD'.format(y, q)] + getPrevious(y, q)
tmp['Source'] = tmp.Code.map(lambda c: "{0}Q{1}".format(y, q))
tmp = tmp[pd.notnull(tmp.Business)]
tmp = tmp[pd.notna(tmp.Business)]
tmp = tmp[pd.notnull(tmp.Code)]
# Exclude records reporting "previous definitions"
tmp = tmp[tmp.Code.map(lambda c: '*' not in str(c))]
tmp['Code'] = tmp.Code.map(lambda s: remapCategory(str(s)))
tmp['Business'] = tmp['Business'].map(lambda s: cleanIt(s) if isinstance(s, str) else s)
crevs.append(tmp.set_index('Code').T)
except ValueError as e:
print("Failed", e)
print(y, q)
# Combine the revised dataframes into one
catrevs = pd.concat(crevs, sort=False)
# Use the first valid identifier as the new column header
codeLut = {}
for c in catrevs.columns:
codeLut[c] = catrevs[c].loc['Business'].dropna().iloc[0]
# Apply categories markers to each code for aggregation
catrevs.columns = list(map(lambda v: codeLut.get(v) + ' ' + v, catrevs.columns))
catrevs = catrevs[catrevs.index.map(lambda v: v[-3:] != 'YTD' and v not in ['Source', 'Business'])]
catrevs = catrevs[~catrevs.index.isin(['2003Q2', '2003Q3'])]
catrevs = catrevs.apply(pd.to_numeric, errors='coerce')
shiftToQuarterEnd = lambda d: d - pd.tseries.offsets.DateOffset(days=1) + pd.tseries.offsets.QuarterEnd()
catrevs['dt'] = catrevs.index.map(pd.to_datetime).map(shiftToQuarterEnd)
revfinal = catrevs.set_index('dt')
#revfinal.tail(15)
# print out the categories
cats = [c for c in revfinal.columns if 'c' in c[-1]]
#cats
# Imports for reading FRED API
from os import environ
from io import BytesIO
from zipfile import ZipFile
from urllib.request import urlopen
try:
# for local execution
apiKeyFromFile = open("/Users/kyledunn/fredApiKey.txt", "r").read().strip()
except FileNotFoundError:
apiKeyFromFile = None
pass
# for CI
apiKey = environ.get("FRED_API_KEY", apiKeyFromFile)
def getSeries(series="", apiKey=apiKey, description=None):
# Construct URL template, fetching as much data as possible
fetchCommand = "https://api.stlouisfed.org/fred/series/observations?series_id={s}&realtime_end=9999-12-31&api_key={k}&file_type=txt"
# Call the FRED API
resp = urlopen(fetchCommand.format(s=series, k=apiKey))
# Read and extract the data from the Zipfile response
zipfile = ZipFile(BytesIO(resp.read()))
filesInZip = zipfile.namelist()
data = zipfile.open(filesInZip[1])
if description is None:
description = series
# Make a well-formed dataframe
df = pd.read_csv(data, sep="\t", header=None, skiprows=1,
names=["date", description, "rt_start", "rt_end"], na_values=".")
df['date'] = pd.to_datetime(df.date)
return df.set_index("date")
# Melt and re-aggregate to combine preliminary and final revisions (via average)
tmp = revfinal[cats].reset_index().melt(id_vars=['dt']).copy()
categories_agg = tmp.groupby(['dt', 'variable']).mean().reset_index()
alt.Chart(categories_agg).mark_line().encode(
alt.X('dt', axis=alt.Axis(title=None)),
alt.Y('value:Q', axis=alt.Axis(title='Quarterly Revenues [Million USD]')),
alt.Color('variable:N', title="NAICS Group"),
tooltip=[alt.Tooltip('dt:T', format="%b %Y"), alt.Tooltip('variable:N'), alt.Tooltip('value:Q', format="$,d")]
).properties(
title="Quarterly Revenues by NAICS category (Census Quarterly Services Survey)",
background="white",
width=750,
height=450
)
alt.Chart(categories_agg).mark_bar().encode(
alt.X('dt:T', axis=alt.Axis(title=None)),
alt.Y('value:Q', axis=alt.Axis(title='Quarterly Revenues [Million USD]')),
alt.Color('variable:N', title="NAICS Group"),
tooltip=[alt.Tooltip('dt:T', format="%b %Y"), alt.Tooltip('variable:N'), alt.Tooltip('value:Q', format="$,d")]
).properties(
title="Quarterly Revenues by NAICS category (Census Quarterly Services Survey)",
background="white",
width=750,
height=450
)
# TODO:
# - Correct these values for inflation
# - Try to correlate tax revenues per sector with each sectors' contribution to GDP
df_cpi = getSeries("CWUR0000SA0", apiKey)
df_ppi = getSeries("PPIACO", apiKey)
def adjustValueWithDate(val, d):
date = d + pd.tseries.offsets.DateOffset(days=1)
valOrVals = (val / (df_ppi.iloc[df_ppi.index.get_loc(date)]['PPIACO'] / 100.))
return valOrVals if isinstance(valOrVals, np.float64) else valOrVals[-1]
adj_categories_agg = pd.DataFrame.from_records([(dt, variable, value, adjustValueWithDate(value, dt))
for ix, dt, variable, value in categories_agg.itertuples()])
adj_categories_agg.columns = ['dt', 'variable', 'value', 'value_adj']
alt.Chart(adj_categories_agg).mark_line().encode(
alt.X('dt', axis=alt.Axis(title=None)),
alt.Y('value_adj:Q', axis=alt.Axis(title='Quarterly Revenues [Million 1982-1984 USD]')),
alt.Color('variable:N', title="NAICS Group"),
tooltip=[alt.Tooltip('dt:T', format="%b %Y"), alt.Tooltip('variable:N'), alt.Tooltip('value:Q', format="$,d")]
).properties(
title="Quarterly Revenues by NAICS category (Census Quarterly Services Survey, PPI-adjusted)",
width=750,
height=450
)
%%capture
def getEmploymentSeries(seriesId="CEU5500000001", name="financial activities"):
# Finance and insurance
url = 'https://data.bls.gov/timeseries/{}?amp%222bdata_tool=XGtable&output_view=data&from_year=1990&to_year=2020'
furl = url.format(seriesId)
#print(furl)
df_employment = pd.read_html(furl)[1]
#print(df_employment.head())
df_employment.columns = ['Year'] + df_employment.columns.tolist()[1:]
df_emp = df_employment.iloc[:, :-1].melt(id_vars='Year')
df_emp = df_emp[df_emp.Year != 'P : preliminary']
df_emp['Date'] = pd.to_datetime(df_emp['variable'] + " " + df_emp['Year'].apply(str), format="%b %Y")
df_emp[name] = pd.to_numeric(df_emp['value'], errors='coerce')
#print(df_emp.head(1))
return df_emp[['Date', name]].sort_values('Date').set_index('Date').copy()
dfs_employment = []
industries = {
'goods-producing': 'CEU0600000001',
'mining and logging': 'CEU1000000001',
'construction': 'CEU2000000001',
'manufacturing': 'CEU3000000001',
'retail trade': 'CEU4200000001',
'transportation and warehousing': 'CEU4300000001',
'information': 'CEU5000000001',
'financial activities': 'CEU5500000001',
'professional services': 'CEU6000000001',
'education and health': 'CEU6500000001',
'government': 'CEU9000000001'
}
for i in industries.keys():
dfs_employment.append(getEmploymentSeries(industries[i], i))
df_employment = pd.concat(dfs_employment, axis=1)
exclude = [
'Adminstrative and support and waste management 56c',
'Arts, entertainment 71c',
'Other services 81c',
'Utilities 22c',
'Real estate and rental and leasing 53c',
'Educational services 61c'
]
def computeDollarsPerCapita(val, variable, date):
if variable == 'Information 51c':
return val / df_employment.iloc[df_employment.index.get_loc(pd.to_datetime(date), method='nearest')]['information']
elif variable == 'Finance and insurance 52c':
return val / df_employment.iloc[df_employment.index.get_loc(pd.to_datetime(date), method='nearest')]['financial activities']
elif variable == 'Health care and social assistance 62c':
return val / df_employment.iloc[df_employment.index.get_loc(pd.to_datetime(date), method='nearest')]['education and health']
elif variable == 'Professional, scientific 54c':
return val / df_employment.iloc[df_employment.index.get_loc(pd.to_datetime(date), method='nearest')]['professional services']
elif variable == 'Transportation and warehousing 48-49c':
return val / df_employment.iloc[df_employment.index.get_loc(pd.to_datetime(date), method='nearest')]['transportation and warehousing']
revenue_per_employee = pd.DataFrame.from_records([(dt, variable, value, computeDollarsPerCapita(value_adj, variable, dt))
for ix, dt, variable, value, value_adj in adj_categories_agg[~adj_categories_agg.variable.isin(exclude)].itertuples()])
revenue_per_employee.columns = ['dt', 'variable', 'value', 'value_adj']
c = alt.Chart(revenue_per_employee).mark_line().encode(
alt.X('dt', axis=alt.Axis(title=None)),
alt.Y('value_adj:Q', axis=alt.Axis(title='Quarterly Revenues per Employee [Million USD]')),
alt.Color('variable:N', title='NAICS Group')
).properties(
title="Quarterly Revenues per Employee by NAICS category (Census Quarterly Services Survey, PPI-adjusted)",
background="white",
width=750,
height=450
)
c.display()
step = 65
overlap = 3
def doRidgeLineFor(df, x, y, row, title='Quarterly Revenue Growth By Sector [YoY % Change]'):
return alt.Chart(df, height=step).transform_joinaggregate(
mean_val=f'mean({x})', groupby=[row]
).mark_area(
interpolate='monotone',
fillOpacity=0.8,
stroke='lightgray',
strokeWidth=0.5
).encode(
alt.X(f'{x}:T', title=''),
alt.Y(
f'{y}:Q',
scale=alt.Scale(range=[step, -step * overlap]),
axis=None
),
alt.Fill(
f'{row}:N',
title='Sector',
#legend=None,
scale=alt.Scale(scheme='accent')
),
tooltip=[alt.Tooltip(f'{x}:T'), alt.Tooltip(f'{row}:N'), alt.Tooltip(f'{y}:Q', format=',.02f')]
).facet(
row=alt.Row(
f'{row}:N',
title=None,
header=alt.Header(labelColor='white') #, labelAnchor='end')
)
).properties(
title=title,
bounds='flush'
).configure_facet(
spacing=0
).configure_view(
stroke=None,
width=650
).configure_title(
anchor='middle'
)
categories_yoy = adj_categories_agg.copy().set_index('dt')[['variable', 'value_adj']].pivot(columns='variable')\
.applymap(float).pct_change(4).applymap(lambda v: v * 100).reset_index().melt(id_vars='dt')
c = doRidgeLineFor(categories_yoy[['dt', 'variable', 'value']].dropna(), 'dt', 'value', 'variable')
c.save('services.png')
c.display()
r_p_e = pd.DataFrame.from_records([(dt, variable, value, computeDollarsPerCapita(value_adj, variable, dt))
for ix, dt, variable, value, value_adj in adj_categories_agg.itertuples()])
r_p_e.columns = ['dt', 'variable', 'value', 'value_adj']
r_p_e_yoy = r_p_e.set_index('dt')[['variable', 'value_adj']].pivot(columns='variable')\
.applymap(float).pct_change(4).applymap(lambda v: v * 100).reset_index().melt(id_vars='dt')
doRidgeLineFor(r_p_e_yoy[['dt', 'variable', 'value']], 'dt', 'value', 'variable', title='Quarterly Revenue Growth (per employee) By Sector [YoY % Change]')