Code
import json
import os
import pandas as pd
from openai import OpenAI
from string import Template
from enum import Enum
from pydantic import BaseModel, Field, ValidationError
# define some local AI stuff
LOCAL_MODEL = 'qwen3:4b'
CLIENT = client = OpenAI(
base_url='http://localhost:11434/v1',
api_key='ollama', \
)
# Define some extraction schema
class CityAgency(str, Enum):
DHS = "Department of Homeless Services"
DOB = "Department of Buildings"
DSNY = "Department of Sanitation"
DEP = "Department of Environmental Protection"
NYPD = "New York City Police Department"
HPD = "Department of Housing Preservation and Development"
DPR = "Department of Parks and Recreation"
DOT = "Department of Transportation"
DCWP = "Department of Consumer and Worker Protection"
class AgencyExtraction(BaseModel):
"""Schema for extracting city agency mentions from text."""
complaint: str = Field(
description="5 word or less description of the complaint"
)
agency: CityAgency = Field(
description="Agency most responsible for the complaint"
)
ROLE = ("You route New York City resident complaints to the most relevant agency."
"Select only from the provided list of agencies")
BASE_PROMPT_STR = """
Closely follow these instructions for routing resident complaints:
1. Review the resident complaint and identify the core issue
2. Based on determination of the core issue, assign the complaint to the most relevant city agency
3. Return your output as a JSON output strictly following the schema below:
${extraction_schema}
TEXT TO PROCESS:
${complaint}
"""
# Create the template object
prompt_template = Template(BASE_PROMPT_STR)
# invoke qwen
def invoke(client, user_complaint):
schema_str = json.dumps(AgencyExtraction.model_json_schema(), indent=2)
prompt = prompt_template.substitute(
extraction_schema=schema_str,
complaint=user_complaint
)
try:
response = client.chat.completions.create(
messages=[
{'role': 'system', 'content': ROLE},
{'role': 'user', 'content': prompt}
],
model=LOCAL_MODEL,
temperature=0,
response_format={"type": "json_object"}
)
raw_content = response.choices[0].message.content
return AgencyExtraction.model_validate_json(raw_content)
except (ValidationError, Exception) as e:
print(f"Error during LLM invocation or validation: {e}")
return None
# Parse a single complaint
def process_complaint(complaint_text):
response = invoke(CLIENT, complaint_text)
if response is None:
return None
return {'agency': response.agency.value, 'summary': response.complaint}
def main():
path = "data/Public_feedback_on_311_request_complaint_types_20260310.csv"
complaints_df = pd.read_csv(path).head(20)
complaint_list = complaints_df["Customer Message"].dropna().astype(str).tolist()
# just store results in a list
all_results = []
print(f"Starting extraction for {len(complaint_list)} complaints...")
for single_complaint in complaint_list:
result = process_complaint(single_complaint)
if result:
all_results.append(result)
print(f"Processed: {result['summary']}")
# 4. Save the full run as a single JSON object
output_file = os.path.join("output", "processed_complaints.json")
with open(output_file, "w", encoding="utf-8") as f:
json.dump(all_results, f, indent=4)
print(f"\nSuccessfully saved {len(all_results)} results to {output_file}")
if __name__ == "__main__":
main()