AI Knowledge & LogicProduct Sync Parsers

vita4you (static XML, multiple feeds)

Reference AI-product-sync parser. Three static XML feeds (products + Greek URLs + reviews). Streams the main feed via ET.iterparse; review and Greek-URL maps are built once into module-level dicts and looked up per product. Conditional metafields are emitted via a (label, key) tuple list.

Reference parser for the aiGenerated product-sync provider. Use it as a starting point when the source is one or more static XML feeds that you can download once and stream end-to-end.

Source shape: static-xml-multiple-feeds

Runtime helper note. The _download helper uses a try/except that falls back to fetch_via_proxy() — a globally-injected function that routes the request through Octocom's HTTP proxy IP (74.242.171.127). Use it when the origin blocks the Azure sandbox IP (symptoms: HTTPError 403, HTTPError 429, unexplained urlopen timeouts). Signature:

fetch_via_proxy(url, *, headers=None, timeout=60, method='GET', data=None) -> bytes

Full source

"""Vita4You parser. Hand-written reference implementation in the
fetch_next_chunk(state) / map_one(raw, context) contract.

Source: 3 static XML feeds — main product feed, Greek-URL feed, review feed.
On first call we download all three to /mnt/data and build two in-memory
lookup maps (review stars by SKU, Greek URLs by SKU). Subsequent calls
stream the next CHUNK_SIZE items from the main feed via ET.iterparse.
"""

import json
import re
import unicodedata
import urllib.error
import urllib.request
import xml.etree.ElementTree as ET

# -----------------------------------------------------------------------------
# Text helpers — ports of slugify() and decodeGreekText() in text.utils.ts
# -----------------------------------------------------------------------------

GREEK_ENTITIES = {
    "Α": "Α", "Β": "Β", "Γ": "Γ", "Δ": "Δ",
    "Ε": "Ε", "Ζ": "Ζ", "Η": "Η", "Θ": "Θ",
    "Ι": "Ι", "Κ": "Κ", "Λ": "Λ", "Μ": "Μ",
    "Ν": "Ν", "Ξ": "Ξ", "Ο": "Ο", "Π": "Π",
    "Ρ": "Ρ", "Σ": "Σ", "Τ": "Τ", "Υ": "Υ",
    "Φ": "Φ", "Χ": "Χ", "Ψ": "Ψ", "Ω": "Ω",
    "α": "α", "β": "β", "γ": "γ", "δ": "δ",
    "ε": "ε", "ζ": "ζ", "η": "η", "θ": "θ",
    "ι": "ι", "κ": "κ", "λ": "λ", "μ": "μ",
    "ν": "ν", "ξ": "ξ", "ο": "ο", "π": "π",
    "ρ": "ρ", "ς": "ς", "σ": "σ", "τ": "τ",
    "υ": "υ", "φ": "φ", "χ": "χ", "ψ": "ψ",
    "ω": "ω", " ": " ", "&": "&", "´": "´",
    "¨": "¨",
}


def slugify(text, max_length=None):
    s = (text or "").strip().lower()
    s = unicodedata.normalize("NFD", s)
    s = "".join(c for c in s if unicodedata.category(c) != "Mn")
    s = re.sub(r"^[\W_]+|[\W_]+$", "", s)
    s = re.sub(r"[\W_]+", "-", s)
    s = re.sub(r"-+", "-", s).strip("-")
    if max_length:
        s = s[:max_length].rstrip("-")
    return s


def decode_greek_text(text):
    decoded = (text or "").replace("&", "&")
    for entity, char in GREEK_ENTITIES.items():
        decoded = decoded.replace(entity, char)
    decoded = re.sub(r"&[a-zA-Z]+;", "", decoded)
    decoded = re.sub(r"\s+", " ", decoded).strip()
    return decoded


def _nonempty(v):
    if v is None:
        return None
    s = str(v).strip()
    return s if s else None


def _parse_price(s):
    if not s:
        return None
    try:
        return float(str(s).replace(" EUR", "").strip())
    except (ValueError, TypeError):
        return None


# -----------------------------------------------------------------------------
# Conditional metafields — ported from vita4you.service.ts. Each entry is
# (label, raw-key); the value is added if non-empty after _nonempty().
# Chromata is handled separately because it's a list.
# -----------------------------------------------------------------------------

SIMPLE_METAFIELDS = [
    ("Brand", "brand"),
    ("Weight", "weight"),
    ("Bestseller Order", "bestseller_order"),
    ("Supplement Facts Table", "supplement_facts_table"),
    ("Instructions of Use", "instructions_of_use"),
    ("Posotita", "posotita"),
    ("Serving Size", "serving_size"),
    ("Servings per Container", "servings_per_container"),
    ("Barcode", "barcode"),
    ("Age", "age"),
    ("Dosage", "dosage"),
    ("Contraindications", "contraindications"),
    ("EOF", "eof"),
    ("Storage", "storage"),
    ("Properties", "properties"),
    ("Skin Type Cosm", "skin_type_cosm"),
    ("Other Ingredients", "other_ingredients"),
    ("Cautions", "cautions"),
    ("Vita Tip", "vita_tip"),
    ("Bafes Mallion", "bafes_mallion"),
    ("Apochroseis Bafon Korres", "apochroseis_bafon_korres"),
    ("Apochroseis Bafon", "apochroseis_bafon"),
    ("SFP", "sfp"),
    ("Ilikia", "ilikia"),
    ("Packing", "packing"),
    ("Periochi Chrisis", "periochi_chrisis"),
    ("Anagkes", "anagkes"),
    ("Chromata Foundation", "chromata_foundation"),
    ("Morfi", "morfi"),
    ("Megethi Epidesmika", "megethi_epidesmika"),
    ("Megethos Papoutsiou", "megethos_papoutsiou"),
    ("Periektikotita Suskeuasias", "periektikotita_suskeuasias"),
    ("Suskeuasia", "suskeuasia"),
    ("Typecream", "typecream"),
    ("Producttype", "producttype"),
    ("Bathmoi Oraseos", "bathmoi_oraseos"),
    ("Leptomereies Proiontos", "leptomereies_proiontos"),
    ("Brefiki Ilikia", "brefiki_ilikia"),
    ("Apochroseis", "apochroseis"),
    ("Peeling", "peeling"),
    ("Available In", "available_in"),
    ("Geusi", "geusi"),
    ("Chromata Maskas", "chromata_maskas"),
    ("Ingredientscream", "ingredientscream"),
    ("Colorcream", "colorcream"),
    ("Baros Morou", "baros_morou"),
    ("Condition", "condition"),
    ("Ingredients", "ingredients"),
    ("Size", "size"),
    ("Charaktiristika Aromatos", "charaktiristika_aromatos"),
    ("Charaktiristika Galaktos", "charaktiristika_galaktos"),
    ("Product Attributes", "product_attributes"),
    ("Multivitamins", "multivitamins"),
    ("Perioxtekotita", "perioxtekotita"),
    ("Trofima Att", "trofima_att"),
    ("Systatika Efsa", "systatika_efsa"),
    ("Ischyrismoi Kanonismou", "ischyrismoi_kanonismou"),
    ("Charaktiristika", "charaktiristika"),
    ("Eidika Charaktiristika", "eidika_charaktiristika"),
    ("Chromata Concealer", "chromata_concealer"),
    ("Chromata Makeup", "chromata_makeup"),
    ("Chroma Kragion", "chroma_kragion"),
    ("Paleta Chromaton", "paleta_chromaton"),
    ("Chromata Maskara", "chromata_maskara"),
]

# -----------------------------------------------------------------------------
# Source feed URLs and constants
# -----------------------------------------------------------------------------

FEED_PRODUCTS = "https://vita4you.gr/media/feed/chatboten.xml"
FEED_REVIEWS = "https://vita4you.gr/media/feed/chatreview.xml"
FEED_GREEK_URLS = "https://vita4you.gr/media/feed/chatbot.xml"

LOCAL_PRODUCTS = "/mnt/data/chatboten.xml"
LOCAL_REVIEWS = "/mnt/data/chatreview.xml"
LOCAL_GREEK_URLS = "/mnt/data/chatbot.xml"

USER_AGENT = "Mozilla/5.0 (compatible; OctocomSync/1.0)"
CHUNK_SIZE = 500

# Lookup tables built once during the first fetch_next_chunk() call.
# Module-level because they're large (tens of thousands of entries) and
# need to be read by map_one() which only receives `raw` and `context`.
# The kernel persists module-level globals across executePython calls.
_REVIEWS: dict = {}
_GREEK_URLS: dict = {}


def _download(url, dest_path):
    """Stream a URL to disk.

    Tries direct urllib first (streaming, lowest memory). On HTTP/URL
    errors falls back to `fetch_via_proxy()`, the runtime-provided
    helper that routes through Octocom's HTTP proxy IP. Use the proxy
    when the origin blocks the Azure sandbox IP — symptoms include
    `HTTPError 403/429` and unexplained `urlopen` timeouts.

    Runtime helper signature:
        fetch_via_proxy(url, *, headers=None, timeout=60,
                        method='GET', data=None) -> bytes
    """
    headers = {"User-Agent": USER_AGENT}
    try:
        req = urllib.request.Request(url, headers=headers)
        with urllib.request.urlopen(req, timeout=120) as r:
            with open(dest_path, "wb") as f:
                while True:
                    chunk = r.read(1024 * 256)
                    if not chunk:
                        break
                    f.write(chunk)
        return
    except (urllib.error.HTTPError, urllib.error.URLError):
        pass
    body = fetch_via_proxy(  # noqa: F821 - runtime-provided
        url, headers=headers, timeout=200
    )
    with open(dest_path, "wb") as f:
        f.write(body)


def _build_review_map():
    out = {}
    for _, elem in ET.iterparse(LOCAL_REVIEWS, events=("end",)):
        if elem.tag == "review":
            sku_elem = elem.find(".//sku")
            rating_elem = elem.find(".//overall")
            if (
                sku_elem is not None
                and rating_elem is not None
                and sku_elem.text
                and rating_elem.text
            ):
                try:
                    out[sku_elem.text.strip()] = float(rating_elem.text)
                except ValueError:
                    pass
            elem.clear()
    return out


def _build_greek_url_map():
    out = {}
    for _, elem in ET.iterparse(LOCAL_GREEK_URLS, events=("end",)):
        if elem.tag == "item":
            sku_elem = elem.find("sku")
            link_elem = elem.find("link")
            if (
                sku_elem is not None
                and link_elem is not None
                and sku_elem.text
                and link_elem.text
            ):
                out[str(sku_elem.text).strip()] = link_elem.text.strip()
            elem.clear()
    return out


def _xml_item_to_dict(elem):
    """Convert one <item> element to a flat {tag: text|list} dict.

    Repeated child tags (like <chromata>) collapse to a list of strings.
    """
    item: dict = {}
    for child in elem:
        if child.tag in item:
            existing = item[child.tag]
            if not isinstance(existing, list):
                item[child.tag] = [existing]
            item[child.tag].append(child.text)
        else:
            item[child.tag] = child.text
    return item


def _make_item_iter(path):
    """Yield raw <item> elements one at a time from a streaming parse."""
    for _, elem in ET.iterparse(path, events=("end",)):
        if elem.tag == "item":
            yield elem


# -----------------------------------------------------------------------------
# Contract: fetch_next_chunk(state) -> List[Any], map_one(raw, context) -> dict
# -----------------------------------------------------------------------------


def fetch_next_chunk(state):
    parser = state.setdefault("parser", {})

    if not parser.get("initialized"):
        # Heavy, one-time setup. Fits comfortably in the per-call budget for
        # vita4you (3 files, ~335 MB total, ~10s on a typical sandbox link).
        _download(FEED_REVIEWS, LOCAL_REVIEWS)
        _download(FEED_GREEK_URLS, LOCAL_GREEK_URLS)
        _download(FEED_PRODUCTS, LOCAL_PRODUCTS)

        global _REVIEWS, _GREEK_URLS
        _REVIEWS = _build_review_map()
        _GREEK_URLS = _build_greek_url_map()

        parser["iter"] = _make_item_iter(LOCAL_PRODUCTS)
        parser["initialized"] = True

    iter_obj = parser["iter"]
    chunk = []
    for _ in range(CHUNK_SIZE):
        try:
            elem = next(iter_obj)
        except StopIteration:
            break
        chunk.append(_xml_item_to_dict(elem))
        elem.clear()
    return chunk


def map_one(raw, context):
    # Filter: skip products whose category includes "extra gift"
    category_raw = _nonempty(raw.get("category"))
    if category_raw and "extra gift" in category_raw.lower():
        return None

    # Filter: skip products without a SKU
    sku = _nonempty(raw.get("sku"))
    if not sku:
        return None

    # The Zod schema transforms category to last segment of "a > b > c" path
    last_cat = category_raw.split(">")[-1].strip() if category_raw else None
    title = raw.get("title") or ""

    sale_price = _parse_price(raw.get("sale_price"))
    price = _parse_price(raw.get("price"))

    quantity = 0
    try:
        quantity = int(raw.get("quantity") or 0)
    except (ValueError, TypeError):
        pass

    metafields = []
    for label, key in SIMPLE_METAFIELDS:
        v = _nonempty(raw.get(key))
        if v:
            metafields.append({"key": label, "value": v})

    # Chromata is a list; emit one metafield per non-empty value
    chromata = raw.get("chromata")
    if isinstance(chromata, list):
        for c in chromata:
            v = _nonempty(c)
            if v:
                metafields.append({"key": "Chromata", "value": v})
    elif chromata is not None:
        v = _nonempty(chromata)
        if v:
            metafields.append({"key": "Chromata", "value": v})

    urls = [
        {"url": raw.get("link") or "", "salesChannel": "web", "language": "en"}
    ]
    greek_url = _GREEK_URLS.get(sku)
    if greek_url:
        urls.append(
            {"url": greek_url, "salesChannel": "web", "language": "el"}
        )

    return {
        "name": title,
        "uniqueId": sku,
        "slug": slugify(title),
        "url": urls,
        "longDescription": _nonempty(raw.get("description")),
        "averageReviewStars": _REVIEWS.get(sku),
        "images": [
            {
                "url": raw.get("image_link") or "",
                "altText": title,
                "isPrimary": True,
            }
        ],
        "variants": [
            {
                "title": title,
                "sku": sku,
                "inStock": quantity > 0,
                "stockQuantity": quantity,
                "price": {
                    "currentPrice": (
                        sale_price if sale_price is not None else price
                    ),
                    "onSale": sale_price is not None,
                    "regularPrice": price,
                },
            }
        ],
        "collections": (
            [{"name": last_cat, "slug": slugify(last_cat)}] if last_cat else []
        ),
        "metafields": metafields,
        "rawProduct": json.dumps(raw, ensure_ascii=False),
    }

On this page