1212
1313
1414def run_employment (year : int , debug : bool ):
15- """Control function to create jobs data by naics_code (NAICS) at the MGRA level.
15+ """Control function to create jobs data by industry_code at the MGRA level.
1616
1717 Get the LEHD LODES data, aggregate to the MGRA level using the block to MGRA
1818 crosswalk, then apply control totals from QCEW using integerization.
1919
2020 Functionality is split apart for code encapsulation (function inputs not included):
2121 _get_jobs_inputs - Get all input data related to jobs, including LODES data,
2222 block to MGRA crosswalk, and control totals from QCEW. Then process the
23- LODES data to the MGRA level by naics_code .
23+ LODES data to the MGRA level by industry_code .
2424 _validate_jobs_inputs - Validate the input tables from the above function
2525 _create_jobs_output - Apply control totals to employment data using
2626 utils.integerize_1d() and create output table
@@ -34,14 +34,14 @@ def run_employment(year: int, debug: bool):
3434 jobs_inputs = _get_jobs_inputs (year )
3535 _validate_jobs_inputs (jobs_inputs )
3636
37- jobs_outputs = _create_jobs_output (jobs_inputs )
37+ jobs_outputs = _create_jobs_output (jobs_inputs , year )
3838 _validate_jobs_outputs (jobs_outputs )
3939
4040 _insert_jobs (jobs_inputs , jobs_outputs , debug )
4141
4242
4343def _get_lodes_data (year : int ) -> pd .DataFrame :
44- """Retrieve LEHD LODES data for a specified year and split naics_code 72 into
44+ """Retrieve LEHD LODES data for a specified year and split industry_code 72 into
4545 721 and 722 using split percentages.
4646
4747 Args:
@@ -68,23 +68,23 @@ def _get_lodes_data(year: int) -> pd.DataFrame:
6868 params = {"year" : year },
6969 )
7070
71- # Split naics_code 72 and combine with other industries
72- lodes_72_split = lodes_data .loc [lambda df : df ["naics_code " ] == "72" ].merge (
71+ # Split industry_code 72 and combine with other industries
72+ lodes_72_split = lodes_data .loc [lambda df : df ["industry_code " ] == "72" ].merge (
7373 split_naics_72 , on = "block" , how = "left"
7474 )
7575
7676 combined_data = pd .concat (
7777 [
78- lodes_data .loc [lambda df : df ["naics_code " ] != "72" ],
78+ lodes_data .loc [lambda df : df ["industry_code " ] != "72" ],
7979 lodes_72_split .assign (
80- naics_code = "721" , jobs = lambda df : df ["jobs" ] * df ["pct_721" ]
80+ industry_code = "721" , jobs = lambda df : df ["jobs" ] * df ["pct_721" ]
8181 ),
8282 lodes_72_split .assign (
83- naics_code = "722" , jobs = lambda df : df ["jobs" ] * df ["pct_722" ]
83+ industry_code = "722" , jobs = lambda df : df ["jobs" ] * df ["pct_722" ]
8484 ),
8585 ],
8686 ignore_index = True ,
87- )[["year" , "block" , "naics_code " , "jobs" ]]
87+ )[["year" , "block" , "industry_code " , "jobs" ]]
8888
8989 return combined_data
9090
@@ -100,13 +100,13 @@ def _aggregate_lodes_to_mgra(
100100 MGRAs.
101101
102102 Args:
103- combined_data: LODES data with columns: year, block, naics_code , jobs
103+ combined_data: LODES data with columns: year, block, industry_code , jobs
104104 xref: Crosswalk with columns: block, mgra, pct_edd, pct_area, edd_flag
105105 year: The year for which to aggregate data
106106
107107 Returns:
108108 Aggregated data at MGRA level with columns: run_id, year, mgra,
109- naics_code , value
109+ industry_code , value
110110 """
111111 # Get MGRA data from SQL
112112 with utils .ESTIMATES_ENGINE .connect () as con :
@@ -124,28 +124,83 @@ def _aggregate_lodes_to_mgra(
124124 )
125125
126126 # Get unique industry codes and cross join with MGRA data
127- unique_industries = combined_data ["naics_code " ].unique ()
127+ unique_industries = combined_data ["industry_code " ].unique ()
128128 jobs = (
129- mgra_data .merge (pd .DataFrame ({"naics_code " : unique_industries }), how = "cross" )
129+ mgra_data .merge (pd .DataFrame ({"industry_code " : unique_industries }), how = "cross" )
130130 .assign (year = year )
131131 .merge (
132132 combined_data .merge (xref , on = "block" , how = "inner" )
133133 .assign (
134134 value = lambda df : df ["jobs" ]
135135 * np .where (df ["edd_flag" ] == 1 , df ["pct_edd" ], df ["pct_area" ])
136136 )
137- .groupby (["year" , "mgra" , "naics_code " ], as_index = False )["value" ]
137+ .groupby (["year" , "mgra" , "industry_code " ], as_index = False )["value" ]
138138 .sum (),
139- on = ["year" , "mgra" , "naics_code " ],
139+ on = ["year" , "mgra" , "industry_code " ],
140140 how = "left" ,
141141 )
142142 .fillna ({"value" : 0 })
143- .assign (run_id = utils .RUN_ID )[["run_id" , "year" , "mgra" , "naics_code" , "value" ]]
143+ .assign (run_id = utils .RUN_ID )[
144+ ["run_id" , "year" , "mgra" , "industry_code" , "value" ]
145+ ]
144146 )
145147
146148 return jobs
147149
148150
151+ def _distribute_self_emp_to_mgra (
152+ bg_data : pd .DataFrame , xref : pd .DataFrame
153+ ) -> pd .DataFrame :
154+ """Distribute self-employed block group data to MGRA level using allocation
155+ percentages.
156+
157+ Args:
158+ bg_data: DataFrame with block group self-employed data. Must include columns:
159+ year, blockgroup, industry_code, value
160+ xref: Crosswalk DataFrame with columns: blockgroup, mgra, flag, pct_18_64,
161+ pct_pop, pct_split
162+
163+ Returns:
164+ Self-Emp Data at MGRA level with columns: run_id, year, mgra, industry_code, value
165+ """
166+ # Merge block group data to MGRA crosswalk
167+ merged = bg_data .merge (xref , on = "blockgroup" , how = "inner" )
168+
169+ # Calculate weighted value based on flag
170+ merged = merged .assign (
171+ weighted_value = np .select (
172+ [
173+ merged ["flag" ] == "pct_18_64" ,
174+ merged ["flag" ] == "pct_pop" ,
175+ merged ["flag" ] == "pct_split" ,
176+ ],
177+ [
178+ merged ["value" ] * merged ["pct_18_64" ],
179+ merged ["value" ] * merged ["pct_pop" ],
180+ merged ["value" ] * merged ["pct_split" ],
181+ ],
182+ default = np .nan ,
183+ )
184+ )
185+
186+ if merged ["weighted_value" ].isna ().any ():
187+ raise ValueError (
188+ "Unexpected allocation flag found; expected one of {'pct_18_64', 'pct_pop', 'pct_split'}"
189+ )
190+
191+ # Group by year, mgra, industry_code and sum, then assign run_id and reorder columns
192+ self_emp = (
193+ merged .groupby (["year" , "mgra" , "industry_code" ], as_index = False )[
194+ "weighted_value"
195+ ]
196+ .sum ()
197+ .rename (columns = {"weighted_value" : "value" })
198+ .assign (run_id = utils .RUN_ID )
199+ )[["run_id" , "year" , "mgra" , "industry_code" , "value" ]]
200+
201+ return self_emp
202+
203+
149204def _get_jobs_inputs (year : int ) -> dict [str , pd .DataFrame ]:
150205 """Get input data related to jobs for a specified year.
151206
@@ -160,8 +215,8 @@ def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]:
160215 jobs_inputs ["lodes_data" ] = _get_lodes_data (year )
161216
162217 with utils .GIS_ENGINE .connect () as con :
163- # get crosswalk from Census blocks to MGRAs
164- with open (utils .SQL_FOLDER / "employment/edd_land_use_split .sql" ) as file :
218+ # Get crosswalk from Census blocks to MGRAs
219+ with open (utils .SQL_FOLDER / "employment/xref_block_to_mgra .sql" ) as file :
165220 jobs_inputs ["xref_block_to_mgra" ] = utils .read_sql_query_fallback (
166221 sql = sql .text (file .read ()),
167222 con = con ,
@@ -172,20 +227,53 @@ def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]:
172227 )
173228
174229 with utils .ESTIMATES_ENGINE .connect () as con :
175- # get regional employment control totals from QCEW
176- with open (utils .SQL_FOLDER / "employment/QCEW_control .sql" ) as file :
230+ # Get regional employment control totals from QCEW
231+ with open (utils .SQL_FOLDER / "employment/get_region_qcew .sql" ) as file :
177232 jobs_inputs ["control_totals" ] = utils .read_sql_query_fallback (
178233 sql = sql .text (file .read ()),
179234 con = con ,
180235 params = {
181236 "year" : year ,
182237 },
183238 )
184- jobs_inputs ["control_totals" ]["run_id" ] = utils .RUN_ID
185239
186- jobs_inputs ["lehd_jobs" ] = _aggregate_lodes_to_mgra (
187- jobs_inputs ["lodes_data" ], jobs_inputs ["xref_block_to_mgra" ], year
188- )
240+ # Get self-employed totals and append to control_totals
241+ with open (utils .SQL_FOLDER / "employment/get_region_self_emp.sql" ) as file :
242+ self_emp_control = utils .read_sql_query_fallback (
243+ sql = sql .text (file .read ()),
244+ con = con ,
245+ params = {
246+ "year" : year ,
247+ },
248+ )
249+
250+ jobs_inputs ["control_totals" ] = pd .concat (
251+ [jobs_inputs ["control_totals" ], self_emp_control ],
252+ ignore_index = True ,
253+ )
254+
255+ jobs_inputs ["control_totals" ]["run_id" ] = utils .RUN_ID
256+
257+ # Get self-employed block group data
258+ with open (utils .SQL_FOLDER / "employment/get_B24080.sql" ) as file :
259+ jobs_inputs ["B24080" ] = utils .read_sql_query_fallback (
260+ sql = sql .text (file .read ()),
261+ con = con ,
262+ params = {
263+ "year" : year ,
264+ },
265+ )
266+
267+ # Get block group to MGRA crosswalk
268+ with open (utils .SQL_FOLDER / "employment/xref_bg_to_mgra.sql" ) as file :
269+ jobs_inputs ["xref_bg_to_mgra" ] = pd .read_sql_query (
270+ sql = sql .text (file .read ()),
271+ con = con ,
272+ params = {
273+ "run_id" : utils .RUN_ID ,
274+ "year" : year ,
275+ },
276+ )
189277
190278 return jobs_inputs
191279
@@ -200,32 +288,39 @@ def _validate_jobs_inputs(jobs_inputs: dict[str, pd.DataFrame]) -> None:
200288 negative = {},
201289 null = {},
202290 )
291+ # Self Employed only includes block groups with self-employed individuals therefore
292+ # no row count validation performed
293+ tests .validate_data (
294+ "Self-employed block group data" ,
295+ jobs_inputs ["B24080" ],
296+ negative = {},
297+ null = {},
298+ )
203299 # No row count validation performed as xref is many-to-many
204- # check
205300 tests .validate_data (
206- "xref " ,
301+ "xref_block_to_mgra " ,
207302 jobs_inputs ["xref_block_to_mgra" ],
208303 negative = {},
209304 null = {},
210305 )
306+ # No row count validation performed as xref is many-to-many
211307 tests .validate_data (
212- "QCEW control totals" ,
213- jobs_inputs ["control_totals" ],
214- row_count = {"key_columns" : {"naics_code" }},
308+ "xref_bg_to_mgra" ,
309+ jobs_inputs ["xref_bg_to_mgra" ],
215310 negative = {},
216311 null = {},
217312 )
218313 tests .validate_data (
219- "LEHD jobs at MGRA level " ,
220- jobs_inputs ["lehd_jobs " ],
221- row_count = {"key_columns" : {"mgra" , "naics_code " }},
314+ "QCEW control totals " ,
315+ jobs_inputs ["control_totals " ],
316+ row_count = {"key_columns" : {"industry_code " }},
222317 negative = {},
223318 null = {},
224319 )
225320
226321
227322def _create_jobs_output (
228- jobs_inputs : dict [str , pd .DataFrame ],
323+ jobs_inputs : dict [str , pd .DataFrame ], year : int
229324) -> dict [str , pd .DataFrame ]:
230325 """Apply control totals to employment data using utils.integerize_1d().
231326
@@ -235,22 +330,35 @@ def _create_jobs_output(
235330 Returns:
236331 Controlled employment data.
237332 """
238- # Sort the input data and get unique naics codes
239- sorted_jobs = jobs_inputs ["lehd_jobs" ].sort_values (by = ["mgra" , "naics_code" ])
240- naics_codes = sorted_jobs ["naics_code" ].unique ()
333+ # Create MGRA level jobs data by combining LODES and self-employment data
334+ mgra_jobs = pd .concat (
335+ [
336+ # Aggregate LODES jobs to MGRA level
337+ _aggregate_lodes_to_mgra (
338+ jobs_inputs ["lodes_data" ], jobs_inputs ["xref_block_to_mgra" ], year
339+ ),
340+ # Distribute self-employment data to MGRA level
341+ _distribute_self_emp_to_mgra (
342+ jobs_inputs ["B24080" ], jobs_inputs ["xref_bg_to_mgra" ]
343+ ),
344+ ],
345+ ignore_index = True ,
346+ ).sort_values (by = ["mgra" , "industry_code" ])
241347
242348 # Create list to store controlled values for each industry
243349 results = []
244350
245- # Apply integerize_1d to each naics code
246- for naics_code in naics_codes :
247- # Filter for this naics code
248- naics_mask = sorted_jobs . query ( "naics_code == @naics_code" )
351+ # Apply integerize_1d to each industry_code
352+ for industry_code in mgra_jobs [ "industry_code" ]. unique () :
353+ # Filter for this industry_code
354+ naics_mask = mgra_jobs . loc [ mgra_jobs [ "industry_code" ] == industry_code ]
249355
250356 # Get control value and apply integerize_1d
251357 control_value = (
252358 jobs_inputs ["control_totals" ]
253- .query ("naics_code == @naics_code" )["value" ]
359+ .loc [
360+ jobs_inputs ["control_totals" ]["industry_code" ] == industry_code , "value"
361+ ]
254362 .iloc [0 ]
255363 )
256364
@@ -273,7 +381,7 @@ def _validate_jobs_outputs(jobs_outputs: dict[str, pd.DataFrame]) -> None:
273381 tests .validate_data (
274382 "Controlled jobs data" ,
275383 jobs_outputs ["results" ],
276- row_count = {"key_columns" : {"mgra" , "naics_code " }},
384+ row_count = {"key_columns" : {"mgra" , "industry_code " }},
277385 negative = {},
278386 null = {},
279387 )
0 commit comments