Text-to-SQL을 production에서 어떻게 구현하나? — pgai 코드 뜯어보기
Published:
Part 1. 머릿속의 가설
계기
text-to-SQL을 어떻게 만드냐고 누가 묻는다면 처음에는 이렇게 답했을 것 같다.
자연어 질문에서 채워야 할 자리를 미리 잡자.
<CONTENT>만 토큰화해서SELECT <CONTENT> FROM TABLE같은 슬롯-fill 쿼리를 만들고, LLM이 hallucination으로 잘못 채우면 “이전에 안 됐다”는 신호와 함께 다시 시도. 루프.
그럴듯해 보였다. 한 문장으로 줄이면:
- 출력 형태를 강제해서 LLM이 헛소리할 자리를 줄인다.
- 실패하면 루프해서 마지막엔 맞는 게 나오게 한다.
근데 적자마자 구멍이 보인다.
- 슬롯 자체를 LLM이 잘못 만들면? (
SELECT name FROM users인데 컬럼이username이라면?) - 어디서 “안 됐다”를 판정하지? LLM에게 다시 LLM이 한 걸 검증시키면 또 hallucinate한다.
- 루프는 도는데, 매 시도마다 어떤 정보가 더 들어가야 다음번에는 맞을까?
핵심은 검증 가능한 외부 ground truth가 없다는 거였다. LLM이 자기를 자기로 채점하는 구조.
그래서 이미 production에서 굴러가는 오픈소스가 어떻게 풀고 있는지 보고 싶었다. 마침 TimescaleDB의 pgai가 PostgreSQL extension으로 풀려 있길래 코드를 뜯어봤다.
Part 2. pgai 한 눈에
pgai의 text-to-SQL 모듈은 projects/pgai/pgai/semantic_catalog/ 아래에 있다. 핵심 의존성은 네 개다.
- Pydantic AI — LLM agent / tool use 추상화
- pgvector — Postgres 안의 벡터 검색
- psycopg (async) — DB 드라이버
- Jinja2 — 프롬프트 템플릿
CLI entry point는 한 줄이다.
pgai semantic-catalog generate-sql \
--model anthropic:claude-3-opus-20240229 \
--prompt "지난 달 가입한 모든 사용자 찾기" \
--iteration-limit 10
이게 cli.py → SemanticCatalog.generate_sql() → gen_sql.generate_sql()로 이어진다. 메인 루프는 모두 마지막 한 함수 안에 있다.
전체 흐름을 그림으로 그리면 이렇다.
User Question
↓
Embed → Find Context (semantic catalog 검색)
↓
Render LLM Prompt (Jinja2)
↓
LLM (Pydantic AI, tool-only output)
├── search_for_context ── 컨텍스트 부족 → 다시 위로
└── record_sql_answer
↓
EXPLAIN validator
├── invalid → 에러 붙여서 다시 LLM
└── valid → return SQL
내 가설과 골격은 비슷하다. LLM 호출 → 검증 → 실패면 루프. 다른 점은 검증을 LLM이 아니라 Postgres planner가 한다는 것, 그리고 입력 쪽에 두꺼운 RAG 단계가 붙어 있다는 것.
이제 조각별로 본다.
Part 3. 컨텍스트는 어디에서 오는가 — Semantic Catalog
pgai의 README가 던지는 첫 번째 질문은 이거다.
그냥 schema 통째로 LLM에 던지면 되는 거 아냐?
답은 “안 된다.” 이유 두 가지.
- Schema에는 비즈니스 의미가 없다.
orders테이블이 고객 주문인지 발주인지, schema만 보고는 모른다. - 토큰이 폭발한다. 큰 DB는 schema만 수만 토큰이고, 거기에 무관한 정보가 LLM을 헷갈리게 한다.
그래서 pgai는 RAG로 푼다. semantic catalog라는 별도의 메타 저장소를 만들고, 거기서 질문에 관련 있는 조각만 골라 LLM에 주입.
저장소는 세 종류의 테이블로 구성된다 (catalog id가 1이라면).
ai.semantic_catalog_obj_1— DB 객체 (테이블 / 뷰 / 함수)와 그 자연어 descriptionai.semantic_catalog_sql_1— SQL 예제와 그 descriptionai.semantic_catalog_fact_1— 자유 형식 사실 (e.g. “deleted_at IS NULL이면 active row”)
각 테이블에는 description 컬럼과 함께 pgvector 임베딩 컬럼이 붙어 있고, 검색은 <=> (cosine distance) 연산자로 한다. 임베딩 모델은 OpenAI / Ollama / sentence-transformers 중 선택.
세 카테고리 중에 Facts가 가장 영리하다. Schema에 안 적힌 도메인 지식을 그냥 자연어로 한 줄 넣어두면, 그게 RAG에 끼어 LLM에게 단서로 간다. “환불은 status=’REFUNDED’가 아니라 별도 refunds 테이블에 들어간다” 같은 거. 회사 내 위키에 흩어져 있는 암묵지를 catalog로 형식화하는 셈이다.
객체 description도 사람이 다 적을 필요는 없다. describe.py가 또 다른 Pydantic AI agent를 띄워서 — 이번엔 SQL 생성이 아니라 description 생성용 — record_table_description(table_id, description) tool 하나로 LLM이 직접 채우게 한다. catalog가 catalog를 만들어 채우는 구조.
Part 4. 컨텍스트를 LLM에게 어떻게 보여주는가
검색이 끝나면 결과를 프롬프트에 박아야 한다. 여기서 pgai의 디자인 결정이 재밌다.
LLM은 SQL을 가장 많이 학습했다. JSON이나 자체 포맷 만들지 말고 SQL 그 자체로 schema + 의미 + 샘플을 다 표현하자.
검색된 테이블 하나가 LLM 프롬프트에 들어갈 때 모양은 다음과 같다 (pgai docs 인용).
<table id="76">
CREATE TABLE postgres_air.flight
( flight_id integer NOT NULL nextval('postgres_air.flight_flight_id_seq'::regclass)
, flight_no text NOT NULL
, scheduled_departure timestamp with time zone NOT NULL
, ...
);
ALTER TABLE postgres_air.flight ADD CONSTRAINT flight_pkey PRIMARY KEY (flight_id);
ALTER TABLE postgres_air.flight ADD CONSTRAINT aircraft_code_fk
FOREIGN KEY (aircraft_code) REFERENCES postgres_air.aircraft(code);
COMMENT ON TABLE postgres_air.flight IS
$$The flight table tracks scheduled and actual flight details...$$;
COMMENT ON COLUMN postgres_air.flight.flight_id IS
$$A unique identifier for each flight.$$;
COMMENT ON COLUMN postgres_air.flight.scheduled_departure IS
$$Scheduled departure time for the flight.$$;
...
COPY (SELECT * FROM "postgres_air"."flight" LIMIT 3) TO STDOUT
WITH (FORMAT TEXT, HEADER true);
/*
flight_id flight_no scheduled_departure departure_airport ...
181960 4946 2024-07-12 15:05:00+00 MEX ...
203092 2167 2024-07-16 00:35:00+00 IAD ...
*/
</table>
한 블록 안에 DDL + 제약조건 + 자연어 COMMENT + 실제 샘플 3개가 다 들어 있다. 다 진짜 PostgreSQL 문법으로. LLM은 이미 이 문법을 끝까지 학습했으니 별도 파싱 없이 바로 이해한다.
<table id="76">이라는 XML 태그가 한 가지 더 일을 한다. 답을 낼 때 LLM이 “이 답에 76번 객체를 썼다”라고 보고할 수 있게 해 두는 슬롯이다 (이건 Part 5에서).
SQL 예제와 fact도 비슷한 모양으로 직렬화된다.
<sql-example id="5">
/* 카테고리별 주문 수 집계 */
SELECT category, COUNT(*) FROM orders GROUP BY category;
</sql-example>
<fact id="10">
환불은 status='REFUNDED'가 아니라 refunds 테이블에 별도 row로 들어간다.
</fact>
Part 5. Agent loop — tool 두 개뿐
이제 LLM 호출 부분. system prompt부터 짧게 보자.
You are an expert at analyzing PostgreSQL database schemas and coding SQL statements.
## Process:
* Think carefully about the task posed by the user.
* Analyze the context provided.
* Identify the elements of the context that are relevant to the user's question.
* Evaluate whether the context provided is sufficient...
* If the context is sufficient, author a valid SQL statement and use a tool to record this answer.
{% if semantic_search_available %}
* If the context is not sufficient, use a tool to search for more context in the lacking area.
{% endif %}
## RULES:
* Do not alias columns in the SELECT clause unless explicitly asked...
* ONLY use database elements that have been described to you in the context.
* ONLY use syntax that is valid for the PostgreSQL dialect{{ " for version " + pgversion|string if pgversion }}.
그리고 LLM에 노출되는 tool은 정확히 두 개.
# 컨텍스트 부족 → 추가 RAG 요청
search_for_context(questions: list[str])
# 답 확정
record_sql_answer(
sql_statement: str,
command_type: str, # SELECT / INSERT / UPDATE / DELETE / MERGE / VALUES
relevant_object_ids: list[int], # 답에 실제로 쓴 객체 id (catalog usage 카운터에 반영)
relevant_sql_example_ids: list[int],
relevant_fact_ids: list[int],
)
record_sql_answer의 relevant_*_ids 필드가 단순한데 영리하다. LLM이 자기 답에 어떤 catalog 항목을 썼는지 보고하면, pgai가 그 id의 usage 카운터를 +1 한다. 다음에 누가 비슷한 질문을 던질 때 “frequently used objects” 힌트로 프롬프트 앞쪽에 끼워 넣어서, RAG 결과보다도 먼저 보이게 한다. 일종의 self-reinforcing 캐시.
루프 본체는 다음과 같다.
while _continue:
iteration += 1
if iteration > iteration_limit:
raise IterationLimitExceededException(limit=iteration_limit)
system_prompt = _template_system_prompt.render(
pgversion=pgversion,
semantic_search_available=(iteration < iteration_limit),
)
user_prompt = _template_user_prompt.render(
ctx=ctx, prompt=prompt, prior_prompts=prior_prompts,
freq_used_objects=freq_used_objects, error=error,
)
model_response = await model_request(
model=model,
messages=[ModelRequest(parts=[
SystemPromptPart(content=system_prompt),
UserPromptPart(content=user_prompt),
])],
model_request_parameters=ModelRequestParameters(
function_tools=[_search_tool_definition()]
if iteration < iteration_limit else [],
output_tools=[_answer_tool_definition()],
allow_text_output=False,
),
model_settings=model_settings,
)
여기에 두 가지 강제 메커니즘이 박혀 있다.
allow_text_output=False— LLM이 자유 텍스트로 “여기 SQL 있어요”라고 답하는 걸 막는다. 답하려면 무조건record_sql_answertool을 콜해야 한다. 출력 형태를 강제해서 파싱 에러를 0으로 만든다.function_tools=[_search_tool_definition()] if iteration < iteration_limit else []— 마지막 iteration에서는 search tool 자체를 빼버린다. “이제 더 검색 못 해, 답만 내라.” 검색 루프가 영원히 돌까봐 강제로 닫는 게이트.
이거 하나가 내 원래 가설의 “슬롯 강제”에 가장 가까운 부분이다. 슬롯을 SQL 안에 박는 대신, 출력 채널을 tool call로 박았다.
LLM이 search를 골랐는지 answer를 골랐는지는 응답 part 종류로 분기한다.
for part in model_response.parts:
if part.part_kind == "tool-call":
if part.tool_name == _SEARCH_TOOL_NAME:
# questions 받아서 catalog에 다시 RAG, ctx 누적
for q in part.args_as_dict()["questions"]:
prior_prompts.append(q)
ctx = await fetch_database_context(..., q, ctx, ...)
elif part.tool_name == _RECORD_TOOL_NAME:
answer = part.args_as_dict()["sql_statement"]
_continue = False
# ... validate_sql_statement(target_con, answer) → 에러나면 다시 _continue=True
Search를 고르면 prior_prompts에 누적해서 다음 user prompt에 “여기까지 검색해 봤어”라고 보여 준다. 같은 검색을 무한 반복하지 말라는 신호.
Part 6. 검증 — EXPLAIN, 그리고 진짜 ground truth
여기가 내 원래 가설이 가장 약했던 자리고, pgai가 가장 깔끔하게 푼 자리다.
LLM이 답을 내면 validate_sql_statement()가 받아서 target DB에 직접 EXPLAIN을 친다. 단, force-rollback 트랜잭션 안에서.
async def validate_sql_statement(target_con, sql_statement):
async with (
target_con.cursor(row_factory=dict_row) as cur,
target_con.transaction(force_rollback=True) as _,
):
try:
await cur.execute(f"explain (format json) {sql_statement}")
plan = await cur.fetchone()
return plan, None
except psycopg.Error as e:
msg = diagnostic_to_str(e.diag) or str(e)
return None, msg
왜 EXPLAIN인가, 한 줄 정리.
- 저렴하다. 쿼리를 실행하지 않는다. 행을 한 줄도 읽지 않고 플랜만 만든다.
- 안전하다. 그래도 force-rollback 트랜잭션으로 한 번 더 막아둔다. 만에 하나 LLM이 부작용 있는 SQL을 내도 DB에는 아무 일도 안 일어난다.
- Deterministic하다. Postgres planner 자체가 ground truth. 문법 / 객체 존재 / 컬럼 타입 호환 / 권한까지 다 잡는다.
이게 내 가설의 가장 큰 구멍 — “안 됐다는 걸 누가 판정?” — 을 메우는 답이다. LLM 외부의 deterministic checker를 둔다. 그것도 추가 인프라가 아니라 이미 깔린 Postgres가.
에러 메시지도 그냥 str(e) 던지는 게 아니다. diagnostic_to_str()이 psycopg.errors.Diagnostic을 통째로 뜯어서 LLM이 고치는 데 필요한 모든 단서를 추린다.
def diagnostic_to_str(d):
msgs = []
if d.message_primary: msgs.append(d.message_primary)
if d.message_detail: msgs.append(f"DETAIL: {d.message_detail}")
if d.message_hint: msgs.append(f"HINT: {d.message_hint}")
if d.statement_position: msgs.append(f"STATEMENT POSITION: {d.statement_position}")
if d.context: msgs.append(f"CONTEXT: {d.context}")
if d.schema_name: msgs.append(f"SCHEMA NAME: {d.schema_name}")
if d.table_name: msgs.append(f"TABLE NAME: {d.table_name}")
if d.column_name: msgs.append(f"COLUMN NAME: {d.column_name}")
if d.constraint_name: msgs.append(f"CONSTRAINT NAME: {d.constraint_name}")
if d.sqlstate: msgs.append(f"SQLSTATE: {d.sqlstate}")
return "\n".join(msgs).strip() or None
message_hint까지 챙긴다. Postgres가 “You meant username?” 같은 힌트를 주면 LLM이 그걸 그대로 받아 다음번엔 맞춘다.
에러가 있으면 user prompt 템플릿이 분기를 바꿔서 다음 iteration에 다시 넘어간다.
{% if error is none %}
## Task:
Author a valid SQL statement to address the following directive/question.
USER: "{{ prompt }}"
{% else %}
## Task:
A SQL statement was authored to address the user's directive/question, however it had errors.
Fix the SQL statement.
USER: "{{ prompt }}"
<sql-statement>
{{ answer }}
</sql-statement>
ERROR: {{ error }}
{% endif %}
루프는 valid SQL이 나올 때까지, 또는 iteration limit (CLI 기본 5 / Python API 기본 10)에 닿을 때까지 돈다.
Part 7. 내 가설과 pgai의 답
다 보고 나서 처음의 가설을 다시 꺼내 본다.
| 내 가설 | pgai | |
|---|---|---|
| Hallucination 줄이기 | 출력 슬롯 강제 (SELECT <CONTENT> FROM <TABLE>) | 입력 정보 강제 (DDL + COMMENT + 샘플 + facts RAG) + 출력 채널 강제 (allow_text_output=False, tool-only) |
| 검증 | LLM이 자기 답을 다시 보거나, 단순 syntax check | DB planner의 EXPLAIN (외부 deterministic checker) |
| 루프 | “이전에 안 됐다” 신호와 함께 재시도 | 같은 골격. 단, 에러 메시지를 Diagnostic 통째로 풀어서 단서로 줌 |
| 종료 조건 | 명시 안 함 (무한 루프 위험) | iteration limit + 마지막 iteration에서 search tool 박탈로 강제 종료 |
골격은 비슷하다. 둘 다 “LLM은 틀린다, 외부 검증과 루프가 필요하다”는 전제 위에 서 있다. 그게 절반 맞은 부분.
다른 절반은 production이 어디에 시간을 쓰는지에서 갈렸다. 내 가설은 출력 형태를 좁혀서 hallucination을 줄이려 했고, pgai는 입력 정보를 두텁게 해서 줄이려 했다. 검증도 — 내 가설은 “LLM이 자기 답을 보면 알겠지”에서 멈췄고, pgai는 이미 깔린 Postgres에 EXPLAIN을 쳐서 deterministic한 truth를 가져온다.
가장 인상적이었던 한 줄은 README의 이 문장이다.
By automatically running
EXPLAINon the generated SQL, we utilize the realtime state of the database to verify the query.
LLM의 weight나 catalog의 임베딩이나 다 “학습 시점의 스냅샷”인데, 진짜 DB는 지금 이 순간의 상태다. 그 사이의 거리를 EXPLAIN 한 번으로 매번 좁히고 들어가는 거다. 텍스트 위주의 시스템이 외부 deterministic 시스템을 끌어다 정합성을 맞추는 패턴 — 이게 production-grade text-to-SQL이 결국 도달한 자리였다.
