"""Wrapper over BEA web api and data source files
- Bureau of Economic Analysis: Input-Output Use Tables
BEA released initial results of the comprehensive update of the
National Economic Accounts (NEAs), which include the National Income
and Product Accounts (NIPAs) and the Industry Economic Accounts
(IEAs), on September 28, 2023.
Copyright 2022, Terence Lim
MIT License
"""
from typing import Dict, List, Any
import io
import json
import re
import numpy as np
import pandas as pd
from pandas import DataFrame, Series
from pandas.api import types
from sqlalchemy import Integer, String, Column
from pandas.api import types
from finds.database.redisdb import RedisDB
from finds.readers.readers import requests_get
_VERBOSE = 1
[docs]class BEA:
    """Class and methods to retrieve BEA web api and IOUse datasets
    Args:
        rdb: Redis connection instance to cache data from BEA web site
        userid: Register with BEA to get key for their web api
    Attributes:
        short_desc: Reference Series maps BEA industry to custom descriptions
        params: Some common parameter sets for BEA web api
        
    Examples:
    >>> bea = BEA()
    >>> df = bea.get(**BEA.items['industries'])
    >>> sql.load_dataframe('gdpindustry', df, index_label=None, if_exists='replace')
    >>> df = bea.get(**BEA.items['ioUse'], year=2018)
    """
    _xls_url = 'https://apps.bea.gov/industry/xls/io-annual/'    
    # custom short labels BEA industries
    short_desc = Series({'11': 'Agriculture',
                         '111CA': 'Farms',
                         '113FF': 'Forestry,fishing',
                         '21': 'Mining',
                         '211': 'Oil, gas',
                         '212': 'Mining',
                         '213': 'Support mining',
                         '22': 'Utilities',
                         '23': 'Construction',
                         '31G': 'Manufacturing',
                         '33DG': 'Durable goods',
                         '321': 'Wood',
                         '327': 'Nonmetallic',
                         '331': 'Metals',
                         '332': 'Fabricated metal',
                         '333': 'Machinery',
                         '334': 'Computer',
                         '335': 'Electrical',
                         '3361MV': 'Motor vehicles',
                         '3364OT': 'Transport equip',
                         '337': 'Furniture',
                         '339': 'Manufacturing',
                         '31ND': 'Nondurable goods',
                         '311FT': 'Food',
                         '313TT': 'Textile',
                         '315AL': 'Apparel',
                         '322': 'Paper',
                         '323': 'Printing',
                         '324': 'Petroleum, coal',
                         '325': 'Chemical',
                         '326': 'Plastics, rubber',
                         '42': 'Wholesale',
                         '44RT': 'Retail',
                         '441': 'Motor dealers',
                         '445': 'Food stores',
                         '452': 'General stores',
                         '4A0': 'Other retail',
                         '48' : 'Transportation',
                         '48TW': 'Transport, warehouse',
                         '481': 'Air transport',
                         '482': 'Rail transport',
                         '483': 'Water transport',
                         '484': 'Truck transport',
                         '485': 'Transit ground',
                         '486': 'Pipeline transport',
                         '487OS': 'Other transport',
                         '493': 'Warehousing, storage',
                         '51': 'Information',
                         '511': 'Publishing',
                         '512': 'Motion picture',
                         '513': 'Broadcasting',
                         '514': 'Data processing',
                         '52': 'Finance,insurance',
                         '521CI': 'Banks',
                         '523': 'Securities',
                         '524': 'Insurance',
                         '525': 'Funds, trusts',
                         '53': 'Real estate',
                         '531': 'Real estate',
                         'HS': 'Housing',
                         'ORE': 'Other real estate',
                         '532RL': 'Rental',
                         '54': 'Professional services',
                         '5411': 'Legal services',
                         '5415': 'Computer services',
                         '5412OP': 'Misc services',
                         '55': 'Management',
                         '56': 'Administrative and waste management',
                         '561': 'Administrative',
                         '562': 'Waste management',
                         '6': 'Educational services',
                         '61': 'Educational',
                         '62': 'Healthcare',
                         '621': 'Ambulatory',
                         '622HO': 'Hospitals, nursing',
                         '622': 'Hospitals',
                         '623': 'Nursing',
                         '624': 'Social',
                         '7': 'Arts, entertain, rec, accommodation, food svcs',
                         '71': 'Arts, entertain, rec',
                         '711AS': 'Performing arts',
                         '713': 'Recreation',
                         '72': 'Accommodation. food svcs',
                         '721': 'Accommodation',
                         '722': 'Food services',
                         '81': 'Other services',
                         'G': 'Government',
                         'GF': 'Federal',
                         'GFG': 'General government',
                         'GFGD': 'Defense',
                         'GFGN': 'Nondefense',
                         'GFE': 'Federal enterprises',
                         'GSL': 'State local',
                         'GSLG': 'State local general',
                         'GSLE': 'State local enterprises'},
                        name='label')
    
    # group BEA IO-Use table coarser by vintage year    
    _bea_vintages = {
        9999: {'531': ['HS','ORE']},   
        1963: {'48': ['481','482','483','484','485','486','487OS'],
               '51': ['511','512','513','514'],
               '52': ['521CI','523','524','525'],
               '54': ['5411','5415','5412OP'],
               '56': ['561','562'],
               '62': ['621','622','623','624','622HO'],
               '71': ['711AS','713']},
        1997: {'44RT': ['441', '445', '452', '4A0'],
               'GFG': ['GFGD', 'GFGN'],
               '622HO': ['622','623']}}
    
    # parameters for favorite BEA web api    
    params = {'gdpindustry': {'datasetname': 'GDPbyIndustry', 
                              'index': 'key',
                              'parametername': 'Industry'},
              'ioUse': {'datasetname': 'inputoutput',
                        'tableid': 259}}
    def __init__(self, rdb: RedisDB | None, userid: str, verbose: int =_VERBOSE):
        """Open connection to BEA web api"""
        self.rdb = rdb
        self.userid = userid
        self.verbose = verbose
[docs]    @staticmethod
    def sectoring(year: int, source: str = ""):
        """Returns BEA sector definitions, based on naics, from xls on BEA website
        Args:
            year: Year of historical BEA scheme, in {1997, 1964, 1947}
            source: Source url or local file
        Notes:
        - https://www.bea.gov/industry/input-output-accounts-data
        - https://apps.bea.gov/industry/xls/io-annual/
          - IOUse_Before_Redefinitions_PRO_1947-1962_Summary.xlsx
          - Use_SUT_Framework_2007_2012_DET.xlsx 
        - Replace "HS" "ORE" with "531"
        """
        
        if not source:
            filename = {   # there are 3 historical schmes
                1997: 'Use_SUT_Framework_2007_2012_DET.xlsx',   
                1963: 'IoUse_Before_Redefinitions_PRO_1963-1996_Summary.xlsx',
                1947: 'IoUse_Before_Redefinitions_PRO_1947-1962_Summary.xlsx'}
            if year not in filename:
                raise Exception('bea year not in ' + str(list(filename.keys())))
            source = BEA._xls_url + filename[year]
        # get the xls and parse the NAICS Codes sheet
        x = pd.ExcelFile(source)
        df = x.parse('NAICS Codes')
        # extract main groups and labels from columns 0,1
        beg = np.where(df.iloc[:, 0].astype(str).str[0].str.isdigit())[0]
        labels = {str(df.iloc[i,0]) : df.iloc[i,1] for i in beg}
        # Kludge: some labels missing, so fill from custom abbreviations
        labels.update({k: v for k,v in BEA.short_desc.items()
                       if k not in labels})
        # Now extract beg and end row of groups and labels from columns 1,2
        beg, = np.where(df.iloc[:, 1].astype(str).str[0].str.isdigit()
                        | df.iloc[:, 1].astype(str).isin(["HS", "ORE"]))
        end = np.append(beg[1:], len(df.index))
        # for each group, parse naics code ranges from col 6, store in {results}
        #   e.g. '7223-4, 722514-5' --> [722300, 722514]
        # also, parse leading digits of summary name if at least len 2 (pad 0's)
        result = []
        for imin, imax in zip(beg, end):
            code_title = [(str(col6).split('-')[0], col4)
                          for col6, col4 in zip(df.iloc[imin:imax, 6],
                                                df.iloc[imin:imax, 4])
                          if str(col6)[0].isdigit()]
            
            # '491' is postal industry
            naics_title = [[(re.sub("[^0-9]", "", code).ljust(6, '0'), title)
                            for code in codes.split(',')]
                           for codes, title in code_title if codes not in ['491']]
            new_codes = [[int(code), title]
                         for codes in naics_title for code, title in codes]
            # include summary code by right-padding up to six 0s
            name = str(df.iloc[imin, 1])
            m = re.match('\d+', name)
            if m and len(m.group()) >= 2:
                new_codes.append([int(m.group().ljust(6, '0')), df.iloc[imin, 2]])
            new_df = DataFrame.from_records(new_codes, columns=['code', 'title'])\
                              
.sort_values(['code'])
            new_df['name'] = name
            new_df['description'] = df.iloc[imin, 2]
            result.append(new_df.drop_duplicates())
        result = pd.concat(result, axis=0, ignore_index=True)
        # replace earlier vintage years with coarser industries
        for vintage, codes in BEA._bea_vintages.items():
            if year < vintage:
                for k,v in codes.items():
                    result.loc[result['name'].isin(v),
                               ['name','description']] = [k, labels[k]]
        # clean-up data frame and its index for return
        return result.drop_duplicates(['code'])\
                     
.set_index('code')\
                     
.sort_index()\
                     
.rename_axis(columns=str(year)) 
    
[docs]    def get(self, datasetname: str = "", parametername: str = "",
            cache_mode: str = "rw", **kwargs) -> DataFrame:
        """Execute common BEA web api calls
        Args:
            datasetname: Name of dataset to retrieve, e.g. 'ioUse'
            parametername: Parameter to retrieve, e.g. 'TableID'
            cache_mode: 'r' to try read from cache first, 'w' to write to cache
            kwargs: Additional parameters, such as tableid or year
        Examples:
        >>> datasetname='ioUse'
        >>> tableid=259
        >>> year = 2017
        >>> parametername = 'TableID'
        >>> bea.get()
        >>> bea.get(datasetname)
        >>> bea.get(datasetname, parametername=parametername)
        >>> bea.get('ioUse')
        >>> df = bea.get(datasetname='ioUse', tableid=259, year=2018)
        >>> df = bea.get(datasetname='GDPbyIndustry', parametername='Industry')
        """
        url = 'https://apps.bea.gov/api/data?&UserID=' + self.userid
        if not datasetname:
            url += '&method=GETDATASETLIST'
        else:
            url += '&datasetname=' + datasetname
            if parametername:
                url += '&method=GetParameterValues'
                url += '¶metername=' + parametername
            else:
                if len(kwargs) == 0:
                    url += '&method=GetParameterList'
                else:
                    url += '&method=GetData'
                    for k,v in kwargs.items():
                        if isinstance(v, list):
                            v = ",".join(v)
                        url += "&" + str(k) + "=" + str(v)
        if self.verbose:
            print(url, str(kwargs))
        if 'r' in cache_mode and self.rdb and self.rdb.redis.exists(url):
            if self.verbose:
                print('(BEA get rdb)', url)
            return self.rdb.load(url)
        response = requests_get(url)
        f = io.BytesIO(response.content)
        data = json.loads(f.read().decode('utf-8'))
        try:
            if not datasetname:
                df = DataFrame(data['BEAAPI']['Results']['Dataset'])
            elif parametername:
                df = DataFrame(data['BEAAPI']['Results']['ParamValue'])
            elif len(kwargs) == 0:
                df = DataFrame(data['BEAAPI']['Results']['Parameter'])
            else:
                df = DataFrame(data['BEAAPI']['Results'][0]['Data'])
        except:
            print('***', datasetname, parametername, '***', data)
            raise Exception
        df.columns = df.columns.map(str.lower).map(str.rstrip)
        if 'index' in kwargs:
            df = df.set_index(kwargs['index'])
        if 'w' in cache_mode and self.rdb:
            self.rdb.dump(url, df)
        return df 
[docs]    def read_ioUse_xls(self, year: int, cache_mode: str = "rw",
                       source: str | None = None) -> DataFrame:
        """Helper to load a year's ioUSE table from vintage xls on website
        
        Args:
            year: year of IoUse to fetch
            cache_mode: 'r' to try read from cache first, 'w' to write to cache
            source: url or filename to read from
        """
        filename = {  # early years' ioUse tables
            1963: 'IoUse_Before_Redefinitions_PRO_1963-1996_Summary.xlsx',
            1947: 'IoUse_Before_Redefinitions_PRO_1947-1962_Summary.xlsx'}
        url = (source or BEA._xls_url)
        url += ('' if url.endswith('/') else '/')
        url += filename[1963 if year >= 1963 else 1947]
        if self.verbose:
            print(url)
        if 'r' in cache_mode and self.rdb and self.rdb.redis.exists(url):
            if self.verbose:
                print('(BEA get rdb)', url)
            return self.rdb.load(url)
        
        x = pd.ExcelFile(url)    # x.sheet_names
        if self.verbose:
            print(x.sheet_names)
        df = x.parse(str(year))  # parse the sheet for the desired year
        # seek cells with "Code" in top left, and startswith "T0" at corners
        top, = np.where(df.iloc[:, 0].astype(str).str.startswith('Code'))
        right, = np.where(df.iloc[top[0],:].astype(str).str.startswith('T0'))
        bottom, = np.where(df.iloc[:, 0].astype(str).str.startswith('T0'))
        # stack all data columns into {result}
        result = []
        for col in range(2, right[0]):
            out = DataFrame(data=list(df.iloc[top[0]+1:bottom[0], 0]),
                            columns=['rowcode'])
            out['datavalue'] = list(pd.to_numeric(
                df.iloc[top[0]+1:bottom[0], col], errors='coerce'))
            out['colcode'] = str(df.iloc[top[0], col])
            result.append(out[out['datavalue'].notna()])
        result = pd.concat(result,
                           axis=0,
                           ignore_index=True,
                           sort=True)
        if 'w' in cache_mode and self.rdb:
            self.rdb.dump(url, result)   # save to redis
        return result 
[docs]    def read_ioUse(self, year: int, vintage: int = 0,
                   cache_mode: str = "rw") -> DataFrame:
        """Load ioUse table from BEA web api (or xls if early vintage)
        Args:
            year: Year of IO-Use to load
            vintage: Year of sectoring; allows different eras to be compared
            cache_mode: 'r' to try read from cache first, 'w' to write to cache
        Returns:
            DataFrame in stacked form, flows amounts in 'datavalue' column, and
            columns 'rowcode', 'colcode' label maker and user industry
        Notes:
        - rows, column codes that start with ('T','U','V','Other') are dropped
        - 'F': final_use, 
        - 'G': govt, 
        - 'T': total&tax, 
        - 'U': used, 
        - 'V':value_added, 
        - 'O':imports
        - generally, should drop = ('F','T','U','V','Other')
        Examples:
        >>> bea=BEA()
        >>> data = bea.read_ioUse(1996)
        >>> sql.load_dataframe('ioUse', data, index_label=None, if_exists='replace')
        >>> data = sql.select('ioUse', where={'year': 2017, 'tableid' : 259})
        >>> df = data.pivot(index='rowcode', columns='colcode', values='datavalue')
        """
        if year >= 1997: # after 1997, via web apis; before, in xls spreadsheets
            df = self.get(**self.params['ioUse'], year=year, cache_mode="rw")
            df.columns = df.columns.map(str.lower)
            df = df[['colcode','rowcode','datavalue']]
            df['datavalue'] = pd.to_numeric(df['datavalue'], errors='coerce')
            df = df[df['datavalue'] > 0]
        else:
            df = self.read_ioUse_xls(year)
        # merge industries for vintage year, using historical sectoring scheme
        if not vintage:
            vintage = year   # no vintage specified, so use same year
        for key, codes in self._bea_vintages.items():  # step thru vintages
            if vintage < key:         # if required vintage predates scheme year
                for k,v in codes.items():
                    if self.verbose:
                        print(k, '<--', v)
                    oldRows = df[df['rowcode'].isin(v)].drop(columns='rowcode')
                    keep = [c for c in oldRows.columns if c != 'datavalue']
                    newRows = oldRows.groupby(by=keep, as_index=False).sum()
                    newRows['rowcode'] = k
                    if len(newRows):
                        df = pd.concat([df, newRows],
                                       ignore_index=True,
                                       sort=True)
                    oldCols = df[df['colcode'].isin(v)].drop(columns='colcode')
                    keep = [c for c in oldCols.columns if c != 'datavalue']
                    newCols = oldCols.groupby(by=keep, as_index=False).sum()
                    newCols['colcode'] = k
                    if len(newCols):
                        df = pd.concat([df, newCols],
                                       ignore_index=True,
                                       axis = 0,
                                       sort=True)
                    df = df[~df['colcode'].isin(v)]
                    df = df[~df['rowcode'].isin(v)]
        keep = ('F','G')
        drop = ('T','U','V','Other')
        df = df[~df['colcode'].str.startswith(drop)
                & ~df['rowcode'].str.startswith(drop)]        
        return df.rename_axis(index=str(vintage), columns=str(year))  
    
if __name__ == "__main__":
    from secret import credentials, paths
    from finds.database import SQL, RedisDB
    downloads = paths['scratch']
    sql = SQL(**credentials['sql'])
    rdb = RedisDB(**credentials['redis'])
    bea = BEA(rdb, **credentials['bea'])
    # Get sectoring table
    df = BEA.sectoring(1997)
    
    # Read GDPbyIndustry descriptions
    df = bea.get(datasetname='GDPbyIndustry', parametername='Industry')
    print(df)
    df = bea.get(datasetname='GDPbyIndustry', parametername='Industry',
                 index='key')
    print(df)
    
    # Read ioUse table, and regroup by vintage year scheme
    #
    # Feb 1, 2024: Make tables, use tables, and import matrices, Annual, 2017-2022
    # => tables between 1998 and 2016 inclusive are not available
    #
    cache_mode = 'r'  # read-only to verify in rdb
    # cache_mode = 'w'  # write-only to reload from BEA website and save to rdb
    years = list(range(2022, 1946, -1))
    ioUses = {}
    for vintage in [1997, 1963, 1947]:
        for year in [y for y in years if y >= vintage]:
            df = bea.read_ioUse(year, vintage=vintage, cache_mode=cache_mode)
            ioUses[(vintage, year)] = df
            print(f"Vintage {vintage}, Year {year}: {len(df)} records")
    # get desc: code -> long description
    desc = bea.get(**BEA.params['gdpindustry'], cache_mode='r')['desc']
    # show desc codes without short_desc
    missing = set(desc.index).difference(BEA.short_desc.index)
    print(desc[list(missing)].rename('Missing short desc').to_frame().to_string())
    # build labels: short label -> long description
    labels = {BEA.short_desc[c]:
              (desc[c] if c in desc.index else BEA.short_desc[c])
              for c in BEA.short_desc.index}   # code -> short label
    # show vintage sectoring scheme
    BEA.sectoring(1947)