Docs

Using Code Node

The Code Node allows you to write custom Python code to process data, transform information, and create custom logic in your workflows.

What is a Code Node?

A Code Node is a powerful node that executes Python code directly in your browser using Pyodide. It gives you the flexibility to write custom logic, process data, and integrate with connected database nodes.

Python Execution

Write and execute Python code directly in the browser. No server required.

Database Integration

Access connected database rows and columns through the db object.

Basic Structure

All Code Node scripts must define a process function that takes an optional input parameter:

def process(input=None):
    # Your code here
    return result

Examples

Example 1: Hello World

def process(input=None):
    # Hello World example
    print("Hello, World!")
    return "Hello, World!"

Example 2: Processing Database Rows with Date Calculation

This example iterates through database rows and creates a list of tuples with row IDs and dates incremented by 15 minutes:

import datetime

def process(input=None):
    li = []
    start_date = datetime.datetime.now()
    for idx, ri in enumerate(db.rows):
        next_date = start_date + datetime.timedelta(minutes=15*idx)
        li.append((ri.id, next_date))
    return li

Explanation:

  • Imports the datetime module for date/time operations
  • Creates an empty list li to store results
  • Gets the current date/time as start_date
  • Iterates through db.rows with enumeration to get both index and row
  • Calculates next_date by adding 15 minutes multiplied by the index
  • Appends a tuple (row_id, calculated_date) to the result list
  • Returns the list of tuples

Example 3: Filtering and Transforming Data

def process(input=None):
    # Filter rows where a specific column meets a condition
    filtered = [row for row in db.rows if row.status == 'active']
    
    # Transform data
    result = [{'id': row.id, 'name': row.name.upper()} for row in filtered]
    return result

Example 4: Processing Input Data

def process(input=None):
    if input is None:
        return "No input provided"
    
    # Process the input
    if isinstance(input, str):
        return input.upper()
    elif isinstance(input, (int, float)):
        return input * 2
    else:
        return str(input)

Database Access

When a Code Node is connected to a Database Node, you can access the database data through the db object:

Available Properties

  • db.rows - List of Row objects, each with attributes for column values
  • db.columns - List of column definitions

Row Object Structure

Each row in db.rows has:

  • id - The row ID
  • Other attributes matching column names (e.g., row.name, row.email)
  • to_dict() method to convert to a dictionary

Example usage:

for row in db.rows:
    print(f"Row ID: {row.id}")
    print(f"Name: {row.name}")
    # Access any column as an attribute
    print(f"Data: {row.to_dict()})

Working with Attachment Columns

When your database has columns of type Attachment, each cell stores one or more file URLs as a plain string. You can extract those URLs directly in your Code Node and pass them to any external service (Facebook, Instagram, OpenAI, etc.).

Value formats stored in an attachment cell:

# Single file
"https://cdn.example.com/image.jpg"

# Multiple files (comma-separated)
"https://cdn.example.com/a.jpg,https://cdn.example.com/b.pdf"

# Video + thumbnail (pipe-separated)
"https://cdn.example.com/video.mp4|https://cdn.example.com/thumb.jpg"

Example: Extract all attachment URLs from a column

⚠ Important:

Replace 'TU_COLUMNA_ATTACHMENT' with the exact name of your attachment column as it appears in your database.

def process(input=None):
    if db is None or not db.rows:
        return "No hay datos en la base de datos conectada"

    results = []

    for row in db.rows:
        raw = row.get('TU_COLUMNA_ATTACHMENT', '') or ''  # <-- cambia esto

        urls = [u.strip() for u in raw.split(',') if u.strip()]

        results.append({
            "id": row.id,
            "urls": urls,
            "count": len(urls)
        })

    return results

Output example:

[
  {
    "id": "bCZsX2Oxc3RX77",
    "urls": ["https://cdn.example.com/image1.jpg", "https://cdn.example.com/image2.jpg"],
    "count": 2
  },
  ...
]

Example: Handle video + thumbnail attachments

def process(input=None):
    if db is None or not db.rows:
        return "No hay datos"

    videos = []

    for row in db.rows:
        raw = row.get('TU_COLUMNA_ATTACHMENT', '') or ''  # <-- cambia esto

        for entry in raw.split(','):
            entry = entry.strip()
            if not entry:
                continue

            if '|' in entry:
                video_url, thumb_url = entry.split('|', 1)
                videos.append({
                    "id": row.id,
                    "video": video_url.strip(),
                    "thumbnail": thumb_url.strip()
                })
            else:
                videos.append({"id": row.id, "video": entry, "thumbnail": None})

    return videos

Tips:

  • Use row.get('ColumnName', '') to avoid errors on empty cells
  • Split by , for multiple files; split by | for video + thumbnail pairs
  • The extracted URLs are ready to send to Facebook, OpenAI, or any other service
  • Connect this node's output to an AI Node or HTTP node to process the files downstream

More examples

Filter by file type

Only return image URLs (jpg, png, webp, gif) and skip other file types like PDFs.

def process(input=None):
    if db is None or not db.rows:
        return "No hay datos"

    IMAGE_EXTS = ('.jpg', '.jpeg', '.png', '.webp', '.gif')
    results = []

    for row in db.rows:
        raw = row.get('TU_COLUMNA_ATTACHMENT', '') or ''  # <-- cambia esto
        urls = [u.strip() for u in raw.split(',') if u.strip()]
        images = [u for u in urls if u.lower().split('?')[0].endswith(IMAGE_EXTS)]

        if images:
            results.append({"id": row.id, "images": images})

    return results

Flatten all URLs into a single list

Get one flat list of all URLs across all rows — useful to queue them for upload.

def process(input=None):
    if db is None or not db.rows:
        return "No hay datos"

    all_urls = []

    for row in db.rows:
        raw = row.get('TU_COLUMNA_ATTACHMENT', '') or ''  # <-- cambia esto
        urls = [u.strip() for u in raw.split(',') if u.strip()]
        for url in urls:
            all_urls.append({"row_id": row.id, "url": url})

    return all_urls

Prepare payload for Facebook / Instagram

Build a list with image URL and caption from another column, ready to pass to a Facebook node.

def process(input=None):
    if db is None or not db.rows:
        return "No hay datos"

    posts = []

    for row in db.rows:
        raw = row.get('TU_COLUMNA_ATTACHMENT', '') or ''  # <-- cambia esto
        caption = row.get('TU_COLUMNA_CAPTION', '') or ''  # <-- cambia esto
        urls = [u.strip() for u in raw.split(',') if u.strip()]

        for url in urls:
            posts.append({
                "image_url": url,
                "caption": caption,
                "row_id": row.id
            })

    return posts

Call OpenAI Vision directly

Use pyfetch to call the OpenAI API with image URLs from an attachment column. The process function must be async to use await. Set your OPENAI_API_KEY in the Config tab — it will be available as a global variable.

⚠ Note:

Use pyfetch — NOT pyxhr or requests. Those are not available in Pyodide.

Where to store your API key

You have two options:

  • Config tab — paste the key directly as a global variable in the node.
  • /credentials — save the key once in your credentials vault, then select it from the global variables picker in the Config tab. The variable name you assign there (e.g. OPENAI_API_KEY) is what you reference in the code.
import json
from pyodide.http import pyfetch

async def process(input=None):
    if db is None or not db.rows:
        return "No hay datos"

    image_urls = []
    for row in db.rows:
        raw = row.get('TU_COLUMNA_ATTACHMENT', '') or ''  # <-- cambia esto
        urls = [u.strip() for u in raw.split(',') if u.strip()]
        image_urls.extend(urls)

    if not image_urls:
        return "No se encontraron imágenes"

    content = [{"type": "text", "text": "Describe each image briefly."}]
    for url in image_urls:
        content.append({"type": "image_url", "image_url": {"url": url}})

    response = await pyfetch(
        "https://api.openai.com/v1/chat/completions",
        method="POST",
        headers={
            "Authorization": f"Bearer {OPENAI_API_KEY}",  # set in Config tab
            "Content-Type": "application/json"
        },
        body=json.dumps({
            "model": "gpt-4o",
            "messages": [{"role": "user", "content": content}],
            "max_tokens": 500
        })
    )

    data = await response.json()
    return data["choices"][0]["message"]["content"]

Best Practices

Always define process function: This is required for code execution

Handle None input: Check if input is None before processing

Use type checking: Verify input types before operations

Return meaningful results: Return data that can be used by other nodes

Error handling: Use try/except for robust error handling

Database access: Check if db.rows exists before iterating

Troubleshooting

Common Issues

  • "db is not defined": Make sure the Code Node is connected to a Database Node
  • "AttributeError": Check that the column name exists in your database
  • "SyntaxError": Verify your Python syntax is correct
  • "Module not found": Some Python modules may not be available in Pyodide

Debugging Tips

  • Use print() statements to debug your code
  • Check the output panel for execution results
  • Verify database connection status in the node configuration
  • Test with simple code first, then add complexity