AI Knowledge & LogicProduct Sync Parsers

Product Sync Parsers

Write a Python script that turns a product feed (XML / JSON / paginated API) into Octocom catalog rows. Covers the fetch_next_chunk / map_one contract, the state and context dicts, the v4 TurboPuffer output schema, and the runtime helpers (including fetch_via_proxy).

A product-sync parser is a Python script that the aiGenerated product-sync provider runs on a schedule to populate a business's product catalog from its source feed. You write two functions; Octocom drives them in a loop, validates each mapped product against a Pydantic schema, and writes the valid ones straight to TurboPuffer (the v4 product index).

The aiGenerated provider is v4-only. Products land in TurboPuffer, not in the legacy Elasticsearch index. The schema below (V4ProductInput) reflects that — it's a flat row shape, not a nested catalog object.

This page is the reference for the contract. For end-to-end examples, see the parsers in the sidebar — note that these examples were written for the previous v3 schema and need to be re-read with the new field set in mind:

  • houseshop-ro — single static XML feed (simplest shape)
  • vita4you — multiple static XML feeds joined via in-memory lookup tables
  • superhome — paginated JSON API

The contract

Your parser must define exactly two top-level functions. The runtime will look them up by name in the module's globals() and call them in a loop until the feed is exhausted.

def fetch_next_chunk(state: dict) -> list:
    """Return the next batch of raw items (any Python objects).
    Return [] when there is nothing more to fetch.
    Mutate state['parser'] (or state['cursor']) to track position so
    subsequent calls pick up where you left off."""

def map_one(raw, context: dict) -> dict | None:
    """Pure function. Transform one raw item from the chunk into a
    V4ProductInput-shaped dict (see the Output schema section).
    Return None to silently skip the item (e.g. inactive, filtered out)."""

A few rules the runtime enforces:

  • fetch_next_chunk must be cheap-ish. Each call has to finish in under ~150 seconds (Azure sandbox per-call ceiling is 220s, dryrun default is configurable). Make one chunk = one HTTP round-trip or one streamed batch from disk.
  • map_one must be pure. No I/O, no network, no global mutation. It runs once per raw item and is expected to be fast.
  • Two consecutive empty chunks ends the run. A single empty chunk is treated as transient; the runtime will call fetch_next_chunk again. If you return [] twice in a row, the run finalises.
  • Five fetch errors also ends the run. Errors raised from fetch_next_chunk are caught and counted; once fetch_errors >= 5 the run finalises with a failure sample.

The state dict

state is a module-level dict that the runtime owns and passes to fetch_next_chunk on every call. The same dict instance is reused across calls within a sync run, so anything you stash in it survives. Keys the runtime sets and reads:

KeyTypeOwnerNotes
contextdictruntimeThe same dict your map_one(raw, context) receives. See below.
cursoranybothOpaque per-run cursor. Initialised from context.get('cursor'). You may overwrite it.
started_atfloatruntimeUnix timestamp the run started.
phasestrruntimeOne of initialised, fetching, mapping, waiting, idle, done.
callsintruntimeNumber of step() invocations so far.
validintruntimeProducts that passed schema validation.
invalidintruntimeProducts that failed schema validation.
fetch_errorsintruntimeTimes fetch_next_chunk raised. ≥5 ends the run.
map_errorsintruntimeTimes map_one raised on an item.
raw_seenintruntimeTotal raw items observed.
empty_streakintruntimeConsecutive empty fetch results. ≥2 ends the run.
errors_samplelistruntimeUp to 50 captured errors, surfaced in the execution result.

You own anything else you put in state. The reference parsers all stash their per-parser state under state['parser']:

def fetch_next_chunk(state):
    parser = state.setdefault("parser", {})
    if not parser.get("initialized"):
        _download(FEED_URL, LOCAL_PATH)
        parser["iter"] = _make_product_iter(LOCAL_PATH)
        parser["initialized"] = True
    ...

You can also use state['cursor'] directly when a single opaque value (page number, since-id, watermark) is enough.


The context dict

context is built once per run from the business's product-sync configuration and passed to both init() (internally) and map_one(raw, context). Treat it as read-only.

Standard keys:

KeyTypeNotes
businessdict{id, slug, name} for the business being synced.
cursoranyOpaque resume point from the previous run, or null on the first run.
_octocom_proxydictProxy credentials used by fetch_via_proxy (see below). Don't access directly.

When you call dryrun_product_sync(..., parserContext={...}) the extra keys are merged into context. Scheduled production runs don't get a parserContext — anything you need at runtime that isn't on the business config has to be in the parser source or a metafield on the business.


Output schema (map_one return value)

The runtime validates each non-None return value against V4ProductInput. The full schema lives in v4ProductInputSchema.py. The shape is flat — no nested variants, no nested prices, no nested collections. You collapse the source's structure into summary scalars + searchable text + an opaque rawJson blob that carries everything else.

import hashlib
import json

def map_one(raw, context):
    # Build the per-variant prices once so we can summarise.
    variants = raw.get("variants") or [raw]
    prices = [
        float(v["price"]) for v in variants
        if v.get("price") is not None
    ]
    min_p = min(prices) if prices else None
    max_p = max(prices) if prices else None

    return {
        # Stable, deterministic id. Becomes the row id in TurboPuffer.
        # Hashing the source's own stable id keeps re-syncs from duplicating.
        "id": hashlib.md5(str(raw["id"]).encode()).hexdigest(),

        # Display fields shown in search results before the bot hydrates.
        "name": raw["title"],
        "shortDescription": (raw.get("short_desc") or raw["long_desc"])[:300],

        # Text fed to the embedding model. Concatenate whatever helps
        # semantic search — title, descriptions, category names, brand,
        # key attributes. Strip HTML; keep under ~28k chars (≈7k tokens).
        "embeddableContent": "\n\n".join(filter(None, [
            raw["title"],
            raw.get("long_desc"),
            f"Brand: {raw['brand']}" if raw.get("brand") else None,
            f"Categories: {', '.join(c['name'] for c in raw.get('categories', []))}"
              if raw.get("categories") else None,
        ])),

        # Summary booleans. The bot uses these as hard filters
        # ("show only in-stock items", etc.) so they must be exact.
        "hasInStockVariants":     any(v.get("in_stock") for v in variants),
        "hasOnSaleVariants":      any(v.get("on_sale")  for v in variants),
        "hasPreorderableVariants": any(
            v.get("backorders_allowed") and not v.get("in_stock")
            for v in variants
        ),

        # Price range across variants. Null if no prices.
        "minPrice": min_p,
        "maxPrice": max_p,

        # Exact-match arrays. Search treats SKUs and URLs as opaque tokens
        # to support precise lookup — BM25 mangles them.
        "skus": [v["sku"] for v in variants if v.get("sku")],
        "urls": [u for u in [raw.get("url")] if u],

        # JSON-encoded source row. BM25-indexed on the TP side, so the bot
        # can full-text-search anything in here as a fallback, and it carries
        # all the structured data needed to render product details after
        # the bot decides to hydrate.
        "rawJson": json.dumps(raw, ensure_ascii=False),
    }

Return None from map_one to skip an item without counting it as an error — useful for inactive items, gift cards, or filtered categories.

Field-by-field notes

  • id — must be stable across runs. Re-running the sync with a different id for the same product creates a duplicate row in TurboPuffer instead of updating. hashlib.md5(source_id) is a safe default; just don't include volatile data in the hash input.
  • name — short, customer-readable. Returned in every search hit so the bot can decide which products to hydrate.
  • shortDescription — also returned in every search hit. Non-searchable; non-indexed. If the source has no real short description, derive one (e.g. first 300 stripped-HTML chars of the long description).
  • embeddableContent — the only text the semantic-search vector is built from. More signal = better recall. Keep it natural-language; don't dump raw JSON in here.
  • hasInStockVariants / hasOnSaleVariants / hasPreorderableVariants — see the next section.
  • minPrice / maxPrice — floats; null is acceptable for free items or feeds without prices.
  • skus / urls — exact-match arrays. The bot uses Contains filters here, so include every SKU/URL variant a customer might paste.
  • rawJson — full source row, JSON-encoded. The bot hydrates this when it wants to mention a product, so include anything that should influence its reply (variant breakdown, raw price details, images, metafields, lead times, full descriptions, dimensions, returns policy snippets).

Stock semantics: the three has*Variants booleans

These three booleans together describe what the customer can do with the product, and product search reads them to filter results. Get them right or the bot will either (a) tell customers an unavailable item is in stock, or (b) silently skip an item the store is happy to sell.

hasInStockVariantshasPreorderableVariantshasOnSaleVariantsMeaning
trueanyanyAt least one variant physically has inventory. Search returns it for in-stock queries.
falsetrueanyNo inventory, but at least one variant is orderable (backorder / pre-order / made-to-order).
falsefalseanyNothing is sellable. The bot will not surface this product for purchase intent.
anyanytrueAt least one variant is on sale. Used for "what's on sale" queries.

Be literal. hasInStockVariants is true only when the source says inventory exists. Don't fudge it to keep search happy — hasPreorderableVariants exists exactly so back-orderable items are still discoverable.

Surface the actual availability state in rawJson (e.g. include an availability field like "in stock" / "backorder" / "out of stock") so the bot can quote a concrete label when it hydrates, instead of guessing from the boolean.

Worked example for a feed with an explicit availability field per variant:

def has_in_stock(variants):
    return any((v.get("availability") or "").lower() == "in stock" for v in variants)

def has_preorderable(variants):
    return any(
        (v.get("availability") or "").lower() in {"backorder", "pre-order"}
        or (v.get("backorders_allowed") and not v.get("in_stock"))
        for v in variants
    )

def has_on_sale(variants):
    return any(v.get("sale_price") and v["sale_price"] < v["regular_price"] for v in variants)

Runtime helpers

The runtime pre-injects a few things into the parser's globals. You don't need to import them.

fetch_via_proxy(url, *, headers=None, timeout=60, method='GET', data=None) -> bytes

Fetch a URL through Octocom's HTTP proxy (egress IP 74.242.171.127) instead of the Azure sandbox's IP. Use it when the origin blocks the sandbox — symptoms include HTTPError 403, HTTPError 429, or unexplained urlopen timeouts on URLs that work fine in a browser or via the download_via_proxy MCP tool.

See the full fetch_via_proxy helper docs for the standard try-direct-then-fallback pattern.

V4ProductInput

The Pydantic class used to validate map_one's return. You usually don't reference it directly — the runtime constructs it for you and reports validation failures in errors_sample.


Iterating on a parser

  1. Sketch the parser by hand or with an LLM. The three reference parsers cover the common feed shapes — adapt them to the v4 schema as you go.
  2. Dry-run via the dryrun_product_sync MCP tool. It runs the parser end-to-end and produces a JSONL artifact in blob storage without touching the live catalog. Returns an executionId; poll get_product_sync_execution for the result and download the JSONL via the returned SAS URL.
  3. Inspect the output. Spot-check products you know — does hasInStockVariants match the source? Are prices sane? Does embeddableContent read like a description a human would search for?
  4. Persist + go live via upsert_product_sync_parser. This both saves the source and (on the first call for a business) switches the business to the aiGenerated provider with useV4Products=true, enables autoSync, and schedules the first real sync for the next scheduler tick (≤1 minute). On subsequent updates the schedule is left alone so you can iterate without immediately re-running production.

Common pitfalls

  • Non-deterministic id. Hashing volatile data (timestamps, prices, computed flags) means every sync writes a fresh row and the catalog grows unboundedly. Hash the source's stable id only.
  • Setting hasInStockVariants=true for backorder items. Tempting (you want the bot to recommend them) but wrong — the bot will claim they're in stock. The right answer is hasInStockVariants=false + hasPreorderableVariants=true. See the Stock semantics section above.
  • Forgetting rawJson. The bot reads it when it decides to mention a product; without it, the bot has only name + shortDescription to work with. Always include "rawJson": json.dumps(raw, ensure_ascii=False).
  • Dumping JSON into embeddableContent. The embedding model is trained on natural language. JSON tokens dilute the signal. Build a plain-text description instead.
  • Empty embeddableContent. The row will still upload, but the embedding job has nothing to work with and the product won't surface in semantic search. At minimum: the name and one of the descriptions.
  • Mutating the chunk's raw items in map_one. map_one should be pure; if you need to derive lookup tables, build them once in fetch_next_chunk's initialisation block.
  • Not clearing iter-parsed elements. When using xml.etree.ElementTree.iterparse, call elem.clear() after consuming each element or memory grows unboundedly on large feeds.
  • Hard-coding secrets in the parser source. The source is stored in plaintext in the database. Use the business's existing integration config where possible; for feed credentials there is currently no first-class injection point, so accept that they end up in the source and rotate them through the dashboard.

On this page