Source code for finds.readers.bea

"""Wrapper over BEA web api and data source files

- Bureau of Economic Analysis: Input-Output Use Tables

BEA released initial results of the comprehensive update of the
National Economic Accounts (NEAs), which include the National Income
and Product Accounts (NIPAs) and the Industry Economic Accounts
(IEAs), on September 28, 2023.

Copyright 2022, Terence Lim

MIT License

"""
from typing import Dict, List, Any
import io
import json
import re
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from pandas.api import types
from sqlalchemy import Integer, String, Column
from pandas.api import types
from finds.database.redisdb import RedisDB
from finds.readers.readers import requests_get
_VERBOSE = 1

[docs]class BEA: """Class and methods to retrieve BEA web api and IOUse datasets Args: rdb: Redis connection instance to cache data from BEA web site userid: Register with BEA to get key for their web api Attributes: short_desc: Reference Series maps BEA industry to custom descriptions params: Some common parameter sets for BEA web api Examples: >>> bea = BEA() >>> df = bea.get(**BEA.items['industries']) >>> sql.load_dataframe('gdpindustry', df, index_label=None, if_exists='replace') >>> df = bea.get(**BEA.items['ioUse'], year=2018) """ _xls_url = 'https://apps.bea.gov/industry/xls/io-annual/' # custom short labels BEA industries short_desc = Series({'11': 'Agriculture', '111CA': 'Farms', '113FF': 'Forestry,fishing', '21': 'Mining', '211': 'Oil, gas', '212': 'Mining', '213': 'Support mining', '22': 'Utilities', '23': 'Construction', '31G': 'Manufacturing', '33DG': 'Durable goods', '321': 'Wood', '327': 'Nonmetallic', '331': 'Metals', '332': 'Fabricated metal', '333': 'Machinery', '334': 'Computer', '335': 'Electrical', '3361MV': 'Motor vehicles', '3364OT': 'Transport equip', '337': 'Furniture', '339': 'Manufacturing', '31ND': 'Nondurable goods', '311FT': 'Food', '313TT': 'Textile', '315AL': 'Apparel', '322': 'Paper', '323': 'Printing', '324': 'Petroleum, coal', '325': 'Chemical', '326': 'Plastics, rubber', '42': 'Wholesale', '44RT': 'Retail', '441': 'Motor dealers', '445': 'Food stores', '452': 'General stores', '4A0': 'Other retail', '48' : 'Transportation', '48TW': 'Transport, warehouse', '481': 'Air transport', '482': 'Rail transport', '483': 'Water transport', '484': 'Truck transport', '485': 'Transit ground', '486': 'Pipeline transport', '487OS': 'Other transport', '493': 'Warehousing, storage', '51': 'Information', '511': 'Publishing', '512': 'Motion picture', '513': 'Broadcasting', '514': 'Data processing', '52': 'Finance,insurance', '521CI': 'Banks', '523': 'Securities', '524': 'Insurance', '525': 'Funds, trusts', '53': 'Real estate', '531': 'Real estate', 'HS': 'Housing', 'ORE': 'Other real estate', '532RL': 'Rental', '54': 'Professional services', '5411': 'Legal services', '5415': 'Computer services', '5412OP': 'Misc services', '55': 'Management', '56': 'Administrative and waste management', '561': 'Administrative', '562': 'Waste management', '6': 'Educational services', '61': 'Educational', '62': 'Healthcare', '621': 'Ambulatory', '622HO': 'Hospitals, nursing', '622': 'Hospitals', '623': 'Nursing', '624': 'Social', '7': 'Arts, entertain, rec, accommodation, food svcs', '71': 'Arts, entertain, rec', '711AS': 'Performing arts', '713': 'Recreation', '72': 'Accommodation. food svcs', '721': 'Accommodation', '722': 'Food services', '81': 'Other services', 'G': 'Government', 'GF': 'Federal', 'GFG': 'General government', 'GFGD': 'Defense', 'GFGN': 'Nondefense', 'GFE': 'Federal enterprises', 'GSL': 'State local', 'GSLG': 'State local general', 'GSLE': 'State local enterprises'}, name='label') # group BEA IO-Use table coarser by vintage year _bea_vintages = { 9999: {'531': ['HS','ORE']}, 1963: {'48': ['481','482','483','484','485','486','487OS'], '51': ['511','512','513','514'], '52': ['521CI','523','524','525'], '54': ['5411','5415','5412OP'], '56': ['561','562'], '62': ['621','622','623','624','622HO'], '71': ['711AS','713']}, 1997: {'44RT': ['441', '445', '452', '4A0'], 'GFG': ['GFGD', 'GFGN'], '622HO': ['622','623']}} # parameters for favorite BEA web api params = {'gdpindustry': {'datasetname': 'GDPbyIndustry', 'index': 'key', 'parametername': 'Industry'}, 'ioUse': {'datasetname': 'inputoutput', 'tableid': 259}} def __init__(self, rdb: RedisDB | None, userid: str, verbose: int =_VERBOSE): """Open connection to BEA web api""" self.rdb = rdb self.userid = userid self.verbose = verbose
[docs] @staticmethod def sectoring(year: int, source: str = ""): """Returns BEA sector definitions, based on naics, from xls on BEA website Args: year: Year of historical BEA scheme, in {1997, 1964, 1947} source: Source url or local file Notes: - https://www.bea.gov/industry/input-output-accounts-data - https://apps.bea.gov/industry/xls/io-annual/ - IOUse_Before_Redefinitions_PRO_1947-1962_Summary.xlsx - Use_SUT_Framework_2007_2012_DET.xlsx - Replace "HS" "ORE" with "531" """ if not source: filename = { # there are 3 historical schmes 1997: 'Use_SUT_Framework_2007_2012_DET.xlsx', 1963: 'IoUse_Before_Redefinitions_PRO_1963-1996_Summary.xlsx', 1947: 'IoUse_Before_Redefinitions_PRO_1947-1962_Summary.xlsx'} if year not in filename: raise Exception('bea year not in ' + str(list(filename.keys()))) source = BEA._xls_url + filename[year] # get the xls and parse the NAICS Codes sheet x = pd.ExcelFile(source) df = x.parse('NAICS Codes') # extract main groups and labels from columns 0,1 beg = np.where(df.iloc[:, 0].astype(str).str[0].str.isdigit())[0] labels = {str(df.iloc[i,0]) : df.iloc[i,1] for i in beg} # Kludge: some labels missing, so fill from custom abbreviations labels.update({k: v for k,v in BEA.short_desc.items() if k not in labels}) # Now extract beg and end row of groups and labels from columns 1,2 beg, = np.where(df.iloc[:, 1].astype(str).str[0].str.isdigit() | df.iloc[:, 1].astype(str).isin(["HS", "ORE"])) end = np.append(beg[1:], len(df.index)) # for each group, parse naics code ranges from col 6, store in {results} # e.g. '7223-4, 722514-5' --> [722300, 722514] # also, parse leading digits of summary name if at least len 2 (pad 0's) result = [] for imin, imax in zip(beg, end): code_title = [(str(col6).split('-')[0], col4) for col6, col4 in zip(df.iloc[imin:imax, 6], df.iloc[imin:imax, 4]) if str(col6)[0].isdigit()] # '491' is postal industry naics_title = [[(re.sub("[^0-9]", "", code).ljust(6, '0'), title) for code in codes.split(',')] for codes, title in code_title if codes not in ['491']] new_codes = [[int(code), title] for codes in naics_title for code, title in codes] # include summary code by right-padding up to six 0s name = str(df.iloc[imin, 1]) m = re.match('\d+', name) if m and len(m.group()) >= 2: new_codes.append([int(m.group().ljust(6, '0')), df.iloc[imin, 2]]) new_df = DataFrame.from_records(new_codes, columns=['code', 'title'])\ .sort_values(['code']) new_df['name'] = name new_df['description'] = df.iloc[imin, 2] result.append(new_df.drop_duplicates()) result = pd.concat(result, axis=0, ignore_index=True) # replace earlier vintage years with coarser industries for vintage, codes in BEA._bea_vintages.items(): if year < vintage: for k,v in codes.items(): result.loc[result['name'].isin(v), ['name','description']] = [k, labels[k]] # clean-up data frame and its index for return return result.drop_duplicates(['code'])\ .set_index('code')\ .sort_index()\ .rename_axis(columns=str(year))
[docs] def get(self, datasetname: str = "", parametername: str = "", cache_mode: str = "rw", **kwargs) -> DataFrame: """Execute common BEA web api calls Args: datasetname: Name of dataset to retrieve, e.g. 'ioUse' parametername: Parameter to retrieve, e.g. 'TableID' cache_mode: 'r' to try read from cache first, 'w' to write to cache kwargs: Additional parameters, such as tableid or year Examples: >>> datasetname='ioUse' >>> tableid=259 >>> year = 2017 >>> parametername = 'TableID' >>> bea.get() >>> bea.get(datasetname) >>> bea.get(datasetname, parametername=parametername) >>> bea.get('ioUse') >>> df = bea.get(datasetname='ioUse', tableid=259, year=2018) >>> df = bea.get(datasetname='GDPbyIndustry', parametername='Industry') """ url = 'https://apps.bea.gov/api/data?&UserID=' + self.userid if not datasetname: url += '&method=GETDATASETLIST' else: url += '&datasetname=' + datasetname if parametername: url += '&method=GetParameterValues' url += '&parametername=' + parametername else: if len(kwargs) == 0: url += '&method=GetParameterList' else: url += '&method=GetData' for k,v in kwargs.items(): if isinstance(v, list): v = ",".join(v) url += "&" + str(k) + "=" + str(v) if self.verbose: print(url, str(kwargs)) if 'r' in cache_mode and self.rdb and self.rdb.redis.exists(url): if self.verbose: print('(BEA get rdb)', url) return self.rdb.load(url) response = requests_get(url) f = io.BytesIO(response.content) data = json.loads(f.read().decode('utf-8')) try: if not datasetname: df = DataFrame(data['BEAAPI']['Results']['Dataset']) elif parametername: df = DataFrame(data['BEAAPI']['Results']['ParamValue']) elif len(kwargs) == 0: df = DataFrame(data['BEAAPI']['Results']['Parameter']) else: df = DataFrame(data['BEAAPI']['Results'][0]['Data']) except: print('***', datasetname, parametername, '***', data) raise Exception df.columns = df.columns.map(str.lower).map(str.rstrip) if 'index' in kwargs: df = df.set_index(kwargs['index']) if 'w' in cache_mode and self.rdb: self.rdb.dump(url, df) return df
[docs] def read_ioUse_xls(self, year: int, cache_mode: str = "rw", source: str | None = None) -> DataFrame: """Helper to load a year's ioUSE table from vintage xls on website Args: year: year of IoUse to fetch cache_mode: 'r' to try read from cache first, 'w' to write to cache source: url or filename to read from """ filename = { # early years' ioUse tables 1963: 'IoUse_Before_Redefinitions_PRO_1963-1996_Summary.xlsx', 1947: 'IoUse_Before_Redefinitions_PRO_1947-1962_Summary.xlsx'} url = (source or BEA._xls_url) url += ('' if url.endswith('/') else '/') url += filename[1963 if year >= 1963 else 1947] if self.verbose: print(url) if 'r' in cache_mode and self.rdb and self.rdb.redis.exists(url): if self.verbose: print('(BEA get rdb)', url) return self.rdb.load(url) x = pd.ExcelFile(url) # x.sheet_names if self.verbose: print(x.sheet_names) df = x.parse(str(year)) # parse the sheet for the desired year # seek cells with "Code" in top left, and startswith "T0" at corners top, = np.where(df.iloc[:, 0].astype(str).str.startswith('Code')) right, = np.where(df.iloc[top[0],:].astype(str).str.startswith('T0')) bottom, = np.where(df.iloc[:, 0].astype(str).str.startswith('T0')) # stack all data columns into {result} result = [] for col in range(2, right[0]): out = DataFrame(data=list(df.iloc[top[0]+1:bottom[0], 0]), columns=['rowcode']) out['datavalue'] = list(pd.to_numeric( df.iloc[top[0]+1:bottom[0], col], errors='coerce')) out['colcode'] = str(df.iloc[top[0], col]) result.append(out[out['datavalue'].notna()]) result = pd.concat(result, axis=0, ignore_index=True, sort=True) if 'w' in cache_mode and self.rdb: self.rdb.dump(url, result) # save to redis return result
[docs] def read_ioUse(self, year: int, vintage: int = 0, cache_mode: str = "rw") -> DataFrame: """Load ioUse table from BEA web api (or xls if early vintage) Args: year: Year of IO-Use to load vintage: Year of sectoring; allows different eras to be compared cache_mode: 'r' to try read from cache first, 'w' to write to cache Returns: DataFrame in stacked form, flows amounts in 'datavalue' column, and columns 'rowcode', 'colcode' label maker and user industry Notes: - rows, column codes that start with ('T','U','V','Other') are dropped - 'F': final_use, - 'G': govt, - 'T': total&tax, - 'U': used, - 'V':value_added, - 'O':imports - generally, should drop = ('F','T','U','V','Other') Examples: >>> bea=BEA() >>> data = bea.read_ioUse(1996) >>> sql.load_dataframe('ioUse', data, index_label=None, if_exists='replace') >>> data = sql.select('ioUse', where={'year': 2017, 'tableid' : 259}) >>> df = data.pivot(index='rowcode', columns='colcode', values='datavalue') """ if year >= 1997: # after 1997, via web apis; before, in xls spreadsheets df = self.get(**self.params['ioUse'], year=year, cache_mode="rw") df.columns = df.columns.map(str.lower) df = df[['colcode','rowcode','datavalue']] df['datavalue'] = pd.to_numeric(df['datavalue'], errors='coerce') df = df[df['datavalue'] > 0] else: df = self.read_ioUse_xls(year) # merge industries for vintage year, using historical sectoring scheme if not vintage: vintage = year # no vintage specified, so use same year for key, codes in self._bea_vintages.items(): # step thru vintages if vintage < key: # if required vintage predates scheme year for k,v in codes.items(): if self.verbose: print(k, '<--', v) oldRows = df[df['rowcode'].isin(v)].drop(columns='rowcode') keep = [c for c in oldRows.columns if c != 'datavalue'] newRows = oldRows.groupby(by=keep, as_index=False).sum() newRows['rowcode'] = k if len(newRows): df = pd.concat([df, newRows], ignore_index=True, sort=True) oldCols = df[df['colcode'].isin(v)].drop(columns='colcode') keep = [c for c in oldCols.columns if c != 'datavalue'] newCols = oldCols.groupby(by=keep, as_index=False).sum() newCols['colcode'] = k if len(newCols): df = pd.concat([df, newCols], ignore_index=True, axis = 0, sort=True) df = df[~df['colcode'].isin(v)] df = df[~df['rowcode'].isin(v)] keep = ('F','G') drop = ('T','U','V','Other') df = df[~df['colcode'].str.startswith(drop) & ~df['rowcode'].str.startswith(drop)] return df.rename_axis(index=str(vintage), columns=str(year))
if __name__ == "__main__": from secret import credentials, paths from finds.database import SQL, RedisDB downloads = paths['scratch'] sql = SQL(**credentials['sql']) rdb = RedisDB(**credentials['redis']) bea = BEA(rdb, **credentials['bea']) # Get sectoring table df = BEA.sectoring(1997) # Read GDPbyIndustry descriptions df = bea.get(datasetname='GDPbyIndustry', parametername='Industry') print(df) df = bea.get(datasetname='GDPbyIndustry', parametername='Industry', index='key') print(df) # Read ioUse table, and regroup by vintage year scheme # # Feb 1, 2024: Make tables, use tables, and import matrices, Annual, 2017-2022 # => tables between 1998 and 2016 inclusive are not available # cache_mode = 'r' # read-only to verify in rdb # cache_mode = 'w' # write-only to reload from BEA website and save to rdb years = list(range(2022, 1946, -1)) ioUses = {} for vintage in [1997, 1963, 1947]: for year in [y for y in years if y >= vintage]: df = bea.read_ioUse(year, vintage=vintage, cache_mode=cache_mode) ioUses[(vintage, year)] = df print(f"Vintage {vintage}, Year {year}: {len(df)} records") # get desc: code -> long description desc = bea.get(**BEA.params['gdpindustry'], cache_mode='r')['desc'] # show desc codes without short_desc missing = set(desc.index).difference(BEA.short_desc.index) print(desc[list(missing)].rename('Missing short desc').to_frame().to_string()) # build labels: short label -> long description labels = {BEA.short_desc[c]: (desc[c] if c in desc.index else BEA.short_desc[c]) for c in BEA.short_desc.index} # code -> short label # show vintage sectoring scheme BEA.sectoring(1947)