Source code for gerrytools.data.census

from functools import reduce
from itertools import combinations
from typing import Iterable

import censusdata
import pandas as pd
import requests


def _rjoin(df, columns) -> Iterable:
    """
    Private method for elementwise concatenating string dataframe columns.

    Args:
        df (pd.DataFrame): DataFrame to which the columns belong.
        columns (list): List of column names to be concatenated in
            left-to-right order.

    Returns:
        An iterable representing the column of concatenated column entries.
    """
    stringified = [df[c].astype(str) for c in columns]
    return reduce(lambda left, right: left + right, stringified[1:], stringified[0])


[docs] def census10(state, table="P8", columns={}, geometry="block"): """ Retrieves `geometry`-level 2010 Summary File 1 data via the Census API. Args: state (State): `us.State` object (e.g. `us.states.WI`). table (string, optional): Table from which we retrieve data. Defaults to the P8 table, which contains population by race regardless of ethnicity. columns (dict, optional): Dictionary which maps Census column names (from the correct table) to human-readable names. We require this to be a dictionary, _not_ a list, as specifying human-readable names will implicitly protect against incorrect column names and excessive API calls. geometry (string, optional): Geometry level at which we retrieve data. Defaults to `"block"` to retrieve block-level data for the state provided. Accepted values are `"block"`, `"block group`", and `"tract"`. Returns: A DataFrame with columns renamed according to their Census description designation and a unique identifier column for joining to geometries. """ # Check whether the geometry is right. If not, warn the user and set it # properly. if geometry not in {"block", "tract", "block group"}: raise ValueError(f'Geometry "{geometry}" not accepted.') # Check whether we're providing an appropriate table name. if table not in {"P8", "P9", "P10", "P11"}: raise ValueError(f'Unknown table "{table}".') # Create the right geometry identifiers. geometries = [("state", str(state.fips)), ("county", "*"), ("tract", "*")] if geometry in {"block group", "block"}: geometries += [(geometry, "*")] # Create an identifier column. identifier = geometry.replace(" ", "").upper() + "10" varmap = columns if columns else variables(table) vars = list(varmap.keys()) # Download data. raw = censusdata.download( "sf1", 2010, censusdata.censusgeo(geometries), ["GEO_ID"] + vars, ) # Rename columns and send back to the caller! raw = raw.rename({"GEO_ID": identifier, **columns}, axis=1) raw[identifier] = raw[identifier].str[9:] clean = raw.reset_index(drop=True) clean = clean.rename(varmap, axis=1) return clean
[docs] def census20( state, table="P1", columns={}, geometry="block", key="75c0c07e6f0ab7b0a9a1c14c3d8af9d9f13b3d65", ) -> pd.DataFrame: """ Retrieves `geometry`-level 2020 Decennial Census PL94-171 data via the Census API. Args: state (State): `us.State` object (e.g. `us.states.WI`). table (string, optional): Table from which we retrieve data. Defaults to the P1 table, which gets populations by race regardless of ethnicity. columns (dict, optional): Dictionary which maps Census column names (from the correct table) to human-readable names. We require this to be a dictionary, _not_ a list, as specifying human-readable names will implicitly protect against incorrect column names and excessive API calls. geometry (string, optional): Geometry level at which we retrieve data. Defaults to `"block"` to retrieve block-level data for the state provided. Accepted values are `"block"`, `"block group`", and `"tract"`. key (string, optional): Census API key. Returns: A DataFrame with columns renamed according to their Census description designation and a `GEOID20` column for joining to geometries. """ # Check whether the geometry is right. If not, warn the user and set it # properly. if geometry not in {"block", "tract", "block group"}: print(f'Geometry "{geometry}" not accepted; defaulting' 'to "block".') geometry = "block" # Check whether we're providing an appropriate table name. if table not in {"P1", "P2", "P3", "P4"}: print(f'Table "{table}" not accepted; defaulting to "P1."') table = "P1" # Set the base Census API URL and get the keys for the provided table. base = "https://api.census.gov/data/2020/dec/pl" varmap = columns if columns else variables(table) vars = list(varmap.keys()) # Create the end part of the query string. q = [ ("key", key), ("for", f"{geometry.replace(' ', r'%20')}:*"), ("in", f"state:{str(state.fips).zfill(2)}"), ("in", "county:*"), ] # Based on the geometry type, add an additional entry; this is required to # match the Census geographic hierarchy. if geometry in {"block", "block group"}: q.append(("in", "tract:*")) # Now, since the Census doesn't allow us to request more than 50 variables # at once, we request things in two parts and then merge them together. mergeable = [] # Split up start and stop positions based on the number of variables. if len(vars) < 45: positions = [(0, len(vars))] else: positions = [(0, 45), (45, len(vars))] for start, stop in positions: # Get the chunk of variables and create a tail of columns (geographic # identifiers). varchunk = vars[start:stop] last = [geometry] if geometry in {"block group", "block"} else [] tail = ["state", "county", "tract"] + last # Create an unescaped query string. unescaped = q.copy() unescaped.append(("get", ",".join(varchunk))) # Create an escaped query string from the previous. escaped = "?" + "&".join(f"{param}={value}" for param, value in unescaped) # Send the request and create a dataframe. req = requests.get(base + escaped).json() header, data = req[0], req[1:] chunk = pd.DataFrame(data, columns=header) # Get a GEOID column and drop old columns. chunk["GEOID20"] = _rjoin(chunk[tail], tail) chunk = chunk.drop(tail, axis=1) mergeable.append(chunk) # Merge the dataframes, rename everything, make the columns # ints, and return. merged = reduce(lambda left, right: pd.merge(left, right, on="GEOID20"), mergeable) merged = merged.rename(varmap, axis=1) merged = merged.astype({var: int for var in varmap.values()}) # Make the GEOID20 column the first column. merged = merged[["GEOID20"] + list(varmap.values())] return merged
[docs] def variables(table) -> dict: """ Produces variable names for the 2020 Census PL94-171 tables. Variables are determined from patterns apparent in PL94 variable [lists for tables P1 through P4](https://tinyurl.com/2s3btptn). Args: table (string): The table for which we're generating variables. Returns: A dictionary mapping Census variable codes to human-readable ones. """ # List the categories of Census variables and find the combinations in the # correct order. This *should* be the original order in which they're # listed, but these have been spot-checked to verify their correctness. # These names are also modified based on the table passed; for example, # if the table passed is P2 or P4, we prepend an "NH" to the beginning, # as these columns are explicitly non-hispanic people. If the table # passed is P3 or P4, we append a "VAP" to the end to signify these # are people of voting age; otherwise, we add "POP." categories = ["WHITE", "BLACK", "AMIN", "ASIAN", "NHPI", "OTH"] prefix = "NH" if table in {"P2", "P4", "P9", "P11"} else "" suffix = "VAP" if table in {"P3", "P4", "P10", "P11"} else "POP" year_suff = "20" if table in {"P1", "P2", "P3", "P4"} else "10" combos = list( pd.core.common.flatten( [ prefix + "".join(list(combo)) + suffix + year_suff for i in range(1, len(categories) + 1) for combo in list(combinations(categories, i)) ] ) ) # Now, for each of the combinations, we map the appropriate variable # name to the descriptor. Each of these tranches should have a width # of 6 choose i, where i is the number of categories in the # combination. For example, the second tranch (from 13 to 27) has # width 15, as 6C2=15. if table in {"P1", "P3", "P8", "P10"}: tranches = [(3, 8), (11, 25), (27, 46), (48, 62), (64, 69), (71, 71)] else: tranches = [(5, 10), (13, 27), (29, 48), (50, 64), (66, 71), (73, 73)] # Create variable numbers. numbers = list(pd.core.common.flatten([list(range(i, j + 1)) for i, j in tranches])) # Edit these for specific tables. For example, in tables P2 and P3, we want # to get the total Hispanic population and the total population. if table in {"P2", "P4", "P9", "P11"}: numbers = [1, 2] + numbers if table in {"P4", "P11"}: hcol = f"HVAP{year_suff}" tcol = f"VAP{year_suff}" else: hcol = f"HPOP{year_suff}" tcol = f"TOTPOP{year_suff}" combos = [tcol, hcol] + combos else: numbers = [1] + numbers if table in {"P3", "P4", "P10", "P11"}: tcol = f"VAP{year_suff}" else: tcol = f"TOTPOP{year_suff}" combos = [tcol] + combos # Create the variable names and zip the names together # with the combinations. if year_suff == "20": names = [f"{table}_{str(n).zfill(3)}N" for n in numbers] else: names = [ f"P{str(table.split('P')[-1]).zfill(3)}{str(n).zfill(3)}" for n in numbers ] return dict(zip(names, combos))