Product Sync Parsers
Write a Python script that turns a product feed (XML / JSON / paginated API) into Octocom catalog rows. Covers the fetch_next_chunk / map_one contract, the state and context dicts, the v4 TurboPuffer output schema, and the runtime helpers (including fetch_via_proxy).
A product-sync parser is a Python script that the aiGenerated product-sync provider runs on a schedule to populate a business's product catalog from its source feed. You write two functions; Octocom drives them in a loop, validates each mapped product against a Pydantic schema, and writes the valid ones straight to TurboPuffer (the v4 product index).
The aiGenerated provider is v4-only. Products land in TurboPuffer, not in the legacy Elasticsearch index. The schema below (
V4ProductInput) reflects that — it's a flat row shape, not a nested catalog object.
This page is the reference for the contract. For end-to-end examples, see the parsers in the sidebar — note that these examples were written for the previous v3 schema and need to be re-read with the new field set in mind:
- houseshop-ro — single static XML feed (simplest shape)
- vita4you — multiple static XML feeds joined via in-memory lookup tables
- superhome — paginated JSON API
The contract
Your parser must define exactly two top-level functions. The runtime will look them up by name in the module's globals() and call them in a loop until the feed is exhausted.
def fetch_next_chunk(state: dict) -> list:
"""Return the next batch of raw items (any Python objects).
Return [] when there is nothing more to fetch.
Mutate state['parser'] (or state['cursor']) to track position so
subsequent calls pick up where you left off."""
def map_one(raw, context: dict) -> dict | None:
"""Pure function. Transform one raw item from the chunk into a
V4ProductInput-shaped dict (see the Output schema section).
Return None to silently skip the item (e.g. inactive, filtered out)."""A few rules the runtime enforces:
fetch_next_chunkmust be cheap-ish. Each call has to finish in under ~150 seconds (Azure sandbox per-call ceiling is 220s, dryrun default is configurable). Make one chunk = one HTTP round-trip or one streamed batch from disk.map_onemust be pure. No I/O, no network, no global mutation. It runs once per raw item and is expected to be fast.- Two consecutive empty chunks ends the run. A single empty chunk is treated as transient; the runtime will call
fetch_next_chunkagain. If you return[]twice in a row, the run finalises. - Five fetch errors also ends the run. Errors raised from
fetch_next_chunkare caught and counted; oncefetch_errors >= 5the run finalises with a failure sample.
The state dict
state is a module-level dict that the runtime owns and passes to fetch_next_chunk on every call. The same dict instance is reused across calls within a sync run, so anything you stash in it survives. Keys the runtime sets and reads:
| Key | Type | Owner | Notes |
|---|---|---|---|
context | dict | runtime | The same dict your map_one(raw, context) receives. See below. |
cursor | any | both | Opaque per-run cursor. Initialised from context.get('cursor'). You may overwrite it. |
started_at | float | runtime | Unix timestamp the run started. |
phase | str | runtime | One of initialised, fetching, mapping, waiting, idle, done. |
calls | int | runtime | Number of step() invocations so far. |
valid | int | runtime | Products that passed schema validation. |
invalid | int | runtime | Products that failed schema validation. |
fetch_errors | int | runtime | Times fetch_next_chunk raised. ≥5 ends the run. |
map_errors | int | runtime | Times map_one raised on an item. |
raw_seen | int | runtime | Total raw items observed. |
empty_streak | int | runtime | Consecutive empty fetch results. ≥2 ends the run. |
errors_sample | list | runtime | Up to 50 captured errors, surfaced in the execution result. |
You own anything else you put in state. The reference parsers all stash their per-parser state under state['parser']:
def fetch_next_chunk(state):
parser = state.setdefault("parser", {})
if not parser.get("initialized"):
_download(FEED_URL, LOCAL_PATH)
parser["iter"] = _make_product_iter(LOCAL_PATH)
parser["initialized"] = True
...You can also use state['cursor'] directly when a single opaque value (page number, since-id, watermark) is enough.
The context dict
context is built once per run from the business's product-sync configuration and passed to both init() (internally) and map_one(raw, context). Treat it as read-only.
Standard keys:
| Key | Type | Notes |
|---|---|---|
business | dict | {id, slug, name} for the business being synced. |
cursor | any | Opaque resume point from the previous run, or null on the first run. |
_octocom_proxy | dict | Proxy credentials used by fetch_via_proxy (see below). Don't access directly. |
When you call dryrun_product_sync(..., parserContext={...}) the extra keys are merged into context. Scheduled production runs don't get a parserContext — anything you need at runtime that isn't on the business config has to be in the parser source or a metafield on the business.
Output schema (map_one return value)
The runtime validates each non-None return value against V4ProductInput. The full schema lives in v4ProductInputSchema.py. The shape is flat — no nested variants, no nested prices, no nested collections. You collapse the source's structure into summary scalars + searchable text + an opaque rawJson blob that carries everything else.
import hashlib
import json
def map_one(raw, context):
# Build the per-variant prices once so we can summarise.
variants = raw.get("variants") or [raw]
prices = [
float(v["price"]) for v in variants
if v.get("price") is not None
]
min_p = min(prices) if prices else None
max_p = max(prices) if prices else None
return {
# Stable, deterministic id. Becomes the row id in TurboPuffer.
# Hashing the source's own stable id keeps re-syncs from duplicating.
"id": hashlib.md5(str(raw["id"]).encode()).hexdigest(),
# Display fields shown in search results before the bot hydrates.
"name": raw["title"],
"shortDescription": (raw.get("short_desc") or raw["long_desc"])[:300],
# Text fed to the embedding model. Concatenate whatever helps
# semantic search — title, descriptions, category names, brand,
# key attributes. Strip HTML; keep under ~28k chars (≈7k tokens).
"embeddableContent": "\n\n".join(filter(None, [
raw["title"],
raw.get("long_desc"),
f"Brand: {raw['brand']}" if raw.get("brand") else None,
f"Categories: {', '.join(c['name'] for c in raw.get('categories', []))}"
if raw.get("categories") else None,
])),
# Summary booleans. The bot uses these as hard filters
# ("show only in-stock items", etc.) so they must be exact.
"hasInStockVariants": any(v.get("in_stock") for v in variants),
"hasOnSaleVariants": any(v.get("on_sale") for v in variants),
"hasPreorderableVariants": any(
v.get("backorders_allowed") and not v.get("in_stock")
for v in variants
),
# Price range across variants. Null if no prices.
"minPrice": min_p,
"maxPrice": max_p,
# Exact-match arrays. Search treats SKUs and URLs as opaque tokens
# to support precise lookup — BM25 mangles them.
"skus": [v["sku"] for v in variants if v.get("sku")],
"urls": [u for u in [raw.get("url")] if u],
# JSON-encoded source row. BM25-indexed on the TP side, so the bot
# can full-text-search anything in here as a fallback, and it carries
# all the structured data needed to render product details after
# the bot decides to hydrate.
"rawJson": json.dumps(raw, ensure_ascii=False),
}Return None from map_one to skip an item without counting it as an error — useful for inactive items, gift cards, or filtered categories.
Field-by-field notes
id— must be stable across runs. Re-running the sync with a different id for the same product creates a duplicate row in TurboPuffer instead of updating.hashlib.md5(source_id)is a safe default; just don't include volatile data in the hash input.name— short, customer-readable. Returned in every search hit so the bot can decide which products to hydrate.shortDescription— also returned in every search hit. Non-searchable; non-indexed. If the source has no real short description, derive one (e.g. first 300 stripped-HTML chars of the long description).embeddableContent— the only text the semantic-search vector is built from. More signal = better recall. Keep it natural-language; don't dump raw JSON in here.hasInStockVariants/hasOnSaleVariants/hasPreorderableVariants— see the next section.minPrice/maxPrice— floats; null is acceptable for free items or feeds without prices.skus/urls— exact-match arrays. The bot usesContainsfilters here, so include every SKU/URL variant a customer might paste.rawJson— full source row, JSON-encoded. The bot hydrates this when it wants to mention a product, so include anything that should influence its reply (variant breakdown, raw price details, images, metafields, lead times, full descriptions, dimensions, returns policy snippets).
Stock semantics: the three has*Variants booleans
These three booleans together describe what the customer can do with the product, and product search reads them to filter results. Get them right or the bot will either (a) tell customers an unavailable item is in stock, or (b) silently skip an item the store is happy to sell.
hasInStockVariants | hasPreorderableVariants | hasOnSaleVariants | Meaning |
|---|---|---|---|
true | any | any | At least one variant physically has inventory. Search returns it for in-stock queries. |
false | true | any | No inventory, but at least one variant is orderable (backorder / pre-order / made-to-order). |
false | false | any | Nothing is sellable. The bot will not surface this product for purchase intent. |
| any | any | true | At least one variant is on sale. Used for "what's on sale" queries. |
Be literal. hasInStockVariants is true only when the source says inventory exists. Don't fudge it to keep search happy — hasPreorderableVariants exists exactly so back-orderable items are still discoverable.
Surface the actual availability state in rawJson (e.g. include an availability field like "in stock" / "backorder" / "out of stock") so the bot can quote a concrete label when it hydrates, instead of guessing from the boolean.
Worked example for a feed with an explicit availability field per variant:
def has_in_stock(variants):
return any((v.get("availability") or "").lower() == "in stock" for v in variants)
def has_preorderable(variants):
return any(
(v.get("availability") or "").lower() in {"backorder", "pre-order"}
or (v.get("backorders_allowed") and not v.get("in_stock"))
for v in variants
)
def has_on_sale(variants):
return any(v.get("sale_price") and v["sale_price"] < v["regular_price"] for v in variants)Runtime helpers
The runtime pre-injects a few things into the parser's globals. You don't need to import them.
fetch_via_proxy(url, *, headers=None, timeout=60, method='GET', data=None) -> bytes
Fetch a URL through Octocom's HTTP proxy (egress IP 74.242.171.127) instead of the Azure sandbox's IP. Use it when the origin blocks the sandbox — symptoms include HTTPError 403, HTTPError 429, or unexplained urlopen timeouts on URLs that work fine in a browser or via the download_via_proxy MCP tool.
See the full fetch_via_proxy helper docs for the standard try-direct-then-fallback pattern.
V4ProductInput
The Pydantic class used to validate map_one's return. You usually don't reference it directly — the runtime constructs it for you and reports validation failures in errors_sample.
Iterating on a parser
- Sketch the parser by hand or with an LLM. The three reference parsers cover the common feed shapes — adapt them to the v4 schema as you go.
- Dry-run via the
dryrun_product_syncMCP tool. It runs the parser end-to-end and produces a JSONL artifact in blob storage without touching the live catalog. Returns anexecutionId; pollget_product_sync_executionfor the result and download the JSONL via the returned SAS URL. - Inspect the output. Spot-check products you know — does
hasInStockVariantsmatch the source? Are prices sane? DoesembeddableContentread like a description a human would search for? - Persist + go live via
upsert_product_sync_parser. This both saves the source and (on the first call for a business) switches the business to theaiGeneratedprovider withuseV4Products=true, enablesautoSync, and schedules the first real sync for the next scheduler tick (≤1 minute). On subsequent updates the schedule is left alone so you can iterate without immediately re-running production.
Common pitfalls
- Non-deterministic
id. Hashing volatile data (timestamps, prices, computed flags) means every sync writes a fresh row and the catalog grows unboundedly. Hash the source's stable id only. - Setting
hasInStockVariants=truefor backorder items. Tempting (you want the bot to recommend them) but wrong — the bot will claim they're in stock. The right answer ishasInStockVariants=false+hasPreorderableVariants=true. See the Stock semantics section above. - Forgetting
rawJson. The bot reads it when it decides to mention a product; without it, the bot has onlyname+shortDescriptionto work with. Always include"rawJson": json.dumps(raw, ensure_ascii=False). - Dumping JSON into
embeddableContent. The embedding model is trained on natural language. JSON tokens dilute the signal. Build a plain-text description instead. - Empty
embeddableContent. The row will still upload, but the embedding job has nothing to work with and the product won't surface in semantic search. At minimum: the name and one of the descriptions. - Mutating the chunk's raw items in
map_one.map_oneshould be pure; if you need to derive lookup tables, build them once infetch_next_chunk's initialisation block. - Not clearing iter-parsed elements. When using
xml.etree.ElementTree.iterparse, callelem.clear()after consuming each element or memory grows unboundedly on large feeds. - Hard-coding secrets in the parser source. The source is stored in plaintext in the database. Use the business's existing integration config where possible; for feed credentials there is currently no first-class injection point, so accept that they end up in the source and rotate them through the dashboard.