Code Samples — Python, Node.js & No-Code Examples

Ready-to-use code for common tasks with the Walnut Customer Data API. Copy, paste, replace YOUR_API_KEY, and run.


Python

Requires the requests library (pip install requests).

Basic Query

import requests

API_KEY = "YOUR_API_KEY"
BASE = "<https://customer-api.teamwalnut.com>"

response = requests.get(
    f"{BASE}/demo-sessions",
    headers={"x-api-key": API_KEY},
    params={
        "limit": 10,
        "user_type": "external",
        "fields": "demo_name,started_at,is_completed,interaction_count"
    }
)

data = response.json()
print(f"Showing {data['count']} of {data['total']} sessions")

for session in data["data"]:
    status = "Completed" if session["is_completed"] else "In progress"
    print(f"  {session['demo_name']} | {session['started_at'][:10]} | {status} | {session['interaction_count']} clicks")

Pull Full Summary

import requests

API_KEY = "YOUR_API_KEY"
BASE = "<https://customer-api.teamwalnut.com>"

response = requests.get(
    f"{BASE}/demo-sessions/summary",
    headers={"x-api-key": API_KEY},
    params={"start_date": "2026-02-01", "end_date": "2026-02-28"}
)

s = response.json()
t = s["totals"]
a = s["averages"]

print(f"Period: {s['period']['start']} to {s['period']['end']}")
print(f"Sessions: {t['sessions']:,}")
print(f"Bounce rate: {t['bounced'] / t['sessions'] * 100:.1f}%")
print(f"Completion rate: {t['completed'] / t['sessions'] * 100:.1f}%")
print(f"Avg duration: {a['duration_seconds']}s")
print(f"Avg interactions: {a['interactions']}")
print(f"Avg screen views: {a['screen_views']}")
print(f"\\nExternal: {s['by_user_type'].get('external', 0):,}")
print(f"Internal: {s['by_user_type'].get('internal', 0):,}")
print(f"\\nTop 5 demos:")
for demo in s["top_demos"][:5]:
    print(f"  {demo['demo_name']}: {demo['sessions']:,} sessions")

Paginate Through All Sessions

Use this when you need to export a full dataset (e.g., into a CSV or data warehouse).

import requests
import csv

API_KEY = "YOUR_API_KEY"
BASE = "<https://customer-api.teamwalnut.com>"
LIMIT = 5000

params = {
    "limit": LIMIT,
    "offset": 0,
    "user_type": "external",
    "start_date": "2026-02-01",
    "end_date": "2026-02-28",
    "fields": "id,demo_name,demo_owner_name,started_at,is_completed,is_bounced,interaction_count,screen_views_count,geo_country_name"
}

all_sessions = []

while True:
    response = requests.get(
        f"{BASE}/demo-sessions",
        headers={"x-api-key": API_KEY},
        params=params
    )
    data = response.json()
    all_sessions.extend(data["data"])
    print(f"Fetched {len(all_sessions)} / {data['total']}")

    if data["count"] < LIMIT:
        break
    params["offset"] += LIMIT

# Write to CSV
if all_sessions:
    with open("demo_sessions.csv", "w", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=all_sessions[0].keys())
        writer.writeheader()
        writer.writerows(all_sessions)
    print(f"Exported {len(all_sessions)} sessions to demo_sessions.csv")

Daily High-Intent Alert (Slack Webhook)

Run this on a cron or scheduler. Sends a Slack message when a prospect completes a demo with high engagement.

import requests
from datetime import date

API_KEY = "YOUR_API_KEY"
SLACK_WEBHOOK = "<https://hooks.slack.com/services/YOUR/WEBHOOK/URL>"
BASE = "<https://customer-api.teamwalnut.com>"

# Pull today's external sessions
response = requests.get(
    f"{BASE}/demo-sessions",
    headers={"x-api-key": API_KEY},
    params={
        "start_date": str(date.today()),
        "user_type": "external",
        "fields": "demo_name,demo_owner_name,is_completed,interaction_count,screen_views_count,geo_country_name,started_at",
        "limit": 10000
    }
)

sessions = response.json()["data"]

# Filter for high-intent: completed + more than 5 interactions
hot_leads = [
    s for s in sessions
    if s["is_completed"] and s["interaction_count"] > 5
]

if hot_leads:
    lines = ["*\\ud83d\\udd25 High-Intent Demo Views Today*\\n"]
    for s in hot_leads:
        lines.append(
            f"\\u2022 *{s['demo_name']}* | Owner: {s['demo_owner_name']}\\n"
            f"   {s['interaction_count']} clicks, {s['screen_views_count']} screens | "
            f"{s.get('geo_country_name', 'Unknown')} | {s['started_at'][:16]}"
        )

    requests.post(SLACK_WEBHOOK, json={"text": "\\n".join(lines)})
    print(f"Sent alert for {len(hot_leads)} high-intent sessions")
else:
    print("No high-intent sessions today")

Monthly Report Generator

Compares this month vs. last month and prints a summary.

import requests
from datetime import date, timedelta

API_KEY = "YOUR_API_KEY"
BASE = "<https://customer-api.teamwalnut.com>"

def get_summary(start, end):
    r = requests.get(
        f"{BASE}/demo-sessions/summary",
        headers={"x-api-key": API_KEY},
        params={"start_date": start, "end_date": end}
    )
    return r.json()

# Calculate date ranges
today = date.today()
this_month_start = today.replace(day=1)
last_month_end = this_month_start - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)

current = get_summary(str(this_month_start), str(today))
previous = get_summary(str(last_month_start), str(last_month_end))

def pct(part, whole):
    return f"{part / whole * 100:.1f}%" if whole > 0 else "N/A"

def change(curr, prev):
    if prev == 0:
        return "N/A"
    diff = (curr - prev) / prev * 100
    arrow = "\\u2191" if diff > 0 else "\\u2193"
    return f"{arrow} {abs(diff):.1f}%"

ct, pt = current["totals"], previous["totals"]

print(f"=== Demo Performance: {this_month_start} to {today} ===")
print(f"Sessions:        {ct['sessions']:>8,}  ({change(ct['sessions'], pt['sessions'])})")
print(f"Bounce rate:     {pct(ct['bounced'], ct['sessions']):>8}  (was {pct(pt['bounced'], pt['sessions'])})")
print(f"Completion rate: {pct(ct['completed'], ct['sessions']):>8}  (was {pct(pt['completed'], pt['sessions'])})")
print(f"Avg duration:    {current['averages']['duration_seconds']:>7}s  (was {previous['averages']['duration_seconds']}s)")
print(f"Avg interactions:{current['averages']['interactions']:>8}  (was {previous['averages']['interactions']})")

Node.js

No dependencies needed — uses the built-in fetch (Node 18+).

Basic Query

const API_KEY = "YOUR_API_KEY";
const BASE = "<https://customer-api.teamwalnut.com>";

const params = new URLSearchParams({
  limit: 10,
  user_type: "external",
  fields: "demo_name,started_at,is_completed,interaction_count"
});

const response = await fetch(`${BASE}/demo-sessions?${params}`, {
  headers: { "x-api-key": API_KEY }
});

const { data, count, total } = await response.json();
console.log(`Showing ${count} of ${total} sessions\\n`);

data.forEach(s => {
  const status = s.is_completed ? "Completed" : "In progress";
  console.log(`  ${s.demo_name} | ${s.started_at.slice(0, 10)} | ${status} | ${s.interaction_count} clicks`);
});

Pull Summary

const API_KEY = "YOUR_API_KEY";
const BASE = "<https://customer-api.teamwalnut.com>";

const params = new URLSearchParams({
  start_date: "2026-02-01",
  end_date: "2026-02-28"
});

const response = await fetch(`${BASE}/demo-sessions/summary?${params}`, {
  headers: { "x-api-key": API_KEY }
});

const s = await response.json();
const t = s.totals;

console.log(`Period: ${s.period.start} to ${s.period.end}`);
console.log(`Sessions: ${t.sessions.toLocaleString()}`);
console.log(`Bounce rate: ${(t.bounced / t.sessions * 100).toFixed(1)}%`);
console.log(`Completion rate: ${(t.completed / t.sessions * 100).toFixed(1)}%`);
console.log(`Avg duration: ${s.averages.duration_seconds}s`);
console.log(`\\nTop 5 demos:`);
s.top_demos.slice(0, 5).forEach(d => {
  console.log(`  ${d.demo_name}: ${d.sessions.toLocaleString()} sessions`);
});

Paginate and Export to JSON File

import { writeFileSync } from "fs";

const API_KEY = "YOUR_API_KEY";
const BASE = "<https://customer-api.teamwalnut.com>";
const LIMIT = 5000;

let offset = 0;
const allSessions = [];

while (true) {
  const params = new URLSearchParams({
    limit: LIMIT,
    offset,
    user_type: "external",
    start_date: "2026-02-01",
    end_date: "2026-02-28",
    fields: "id,demo_name,demo_owner_name,started_at,is_completed,interaction_count,geo_country_name"
  });

  const response = await fetch(`${BASE}/demo-sessions?${params}`, {
    headers: { "x-api-key": API_KEY }
  });

  const { data, count, total } = await response.json();
  allSessions.push(...data);
  console.log(`Fetched ${allSessions.length} / ${total}`);

  if (count < LIMIT) break;
  offset += LIMIT;
}

writeFileSync("demo_sessions.json", JSON.stringify(allSessions, null, 2));
console.log(`Exported ${allSessions.length} sessions to demo_sessions.json`);

Google Apps Script

For teams using Google Sheets. Go to Extensions → Apps Script, paste, and run.

Auto-Updating Summary Dashboard

function refreshDemoSummary() {
  var API_KEY = "YOUR_API_KEY";
  var url = "<https://customer-api.teamwalnut.com/demo-sessions/summary>";

  var response = UrlFetchApp.fetch(url, {
    headers: { "x-api-key": API_KEY }
  });

  var s = JSON.parse(response.getContentText());
  var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName("Dashboard");

  // Clear and write header
  sheet.clear();
  sheet.getRange("A1").setValue("Demo Analytics Summary").setFontWeight("bold");
  sheet.getRange("A2").setValue("Updated: " + new Date().toLocaleString());

  // Totals
  var row = 4;
  sheet.getRange(row, 1).setValue("TOTALS").setFontWeight("bold");
  sheet.getRange(row + 1, 1).setValue("Sessions");
  sheet.getRange(row + 1, 2).setValue(s.totals.sessions);
  sheet.getRange(row + 2, 1).setValue("Completed");
  sheet.getRange(row + 2, 2).setValue(s.totals.completed);
  sheet.getRange(row + 3, 1).setValue("Bounced");
  sheet.getRange(row + 3, 2).setValue(s.totals.bounced);
  sheet.getRange(row + 4, 1).setValue("Bounce Rate");
  sheet.getRange(row + 4, 2).setValue((s.totals.bounced / s.totals.sessions * 100).toFixed(1) + "%");
  sheet.getRange(row + 5, 1).setValue("Completion Rate");
  sheet.getRange(row + 5, 2).setValue((s.totals.completed / s.totals.sessions * 100).toFixed(1) + "%");

  // Averages
  row = 11;
  sheet.getRange(row, 1).setValue("AVERAGES").setFontWeight("bold");
  sheet.getRange(row + 1, 1).setValue("Duration (sec)");
  sheet.getRange(row + 1, 2).setValue(s.averages.duration_seconds);
  sheet.getRange(row + 2, 1).setValue("Interactions");
  sheet.getRange(row + 2, 2).setValue(s.averages.interactions);
  sheet.getRange(row + 3, 1).setValue("Screen Views");
  sheet.getRange(row + 3, 2).setValue(s.averages.screen_views);

  // Top demos
  row = 16;
  sheet.getRange(row, 1).setValue("TOP DEMOS").setFontWeight("bold");
  sheet.getRange(row, 2).setValue("Sessions").setFontWeight("bold");
  s.top_demos.forEach(function(demo, i) {
    sheet.getRange(row + 1 + i, 1).setValue(demo.demo_name);
    sheet.getRange(row + 1 + i, 2).setValue(demo.sessions);
  });
}

To auto-refresh daily: In Apps Script, go to Triggers (clock icon) → Add Trigger → set refreshDemoSummary to run daily at your preferred time.


Pull Sessions Into a Sheet

function pullRecentSessions() {
  var API_KEY = "YOUR_API_KEY";
  var url = "<https://customer-api.teamwalnut.com/demo-sessions>";

  var params = [
    "limit=200",
    "user_type=external",
    "fields=demo_name,demo_owner_name,started_at,is_completed,is_bounced,interaction_count,screen_views_count,geo_country_name"
  ].join("&");

  var response = UrlFetchApp.fetch(url + "?" + params, {
    headers: { "x-api-key": API_KEY }
  });

  var data = JSON.parse(response.getContentText()).data;
  var sheet = SpreadsheetApp.getActiveSpreadsheet().getSheetByName("Sessions");
  sheet.clear();

  // Header row
  var headers = Object.keys(data[0]);
  sheet.getRange(1, 1, 1, headers.length).setValues([headers]).setFontWeight("bold");

  // Data rows
  var rows = data.map(function(row) {
    return headers.map(function(h) { return row[h]; });
  });
  sheet.getRange(2, 1, rows.length, headers.length).setValues(rows);
}

Zapier / Make / n8n

No code needed — configure via the platform UI.

Zapier Setup

  1. Trigger: Schedule by Zapier → Every Day
  2. Action: Webhooks by Zapier → GET
    • URL: https://customer-api.teamwalnut.com/demo-sessions/summary
    • Headers: x-api-key = YOUR_API_KEY
  3. Action: Slack → Send Channel Message
    • Message:
📊 Daily Demo Summary
Sessions: {{data__totals__sessions}}
Completed: {{data__totals__completed}}
Bounced: {{data__totals__bounced}}
Avg Duration: {{data__averages__duration_seconds}}s
Top Demo: {{data__top_demos__0__demo_name}} ({{data__top_demos__0__sessions}} sessions)

Make (Integromat) Setup

  1. Module: HTTP → Make a request
    • URL: https://customer-api.teamwalnut.com/demo-sessions/summary
    • Method: GET
    • Headers: x-api-key = YOUR_API_KEY
  2. Module: JSON → Parse JSON (connect to HTTP output)
  3. Module: Slack / Email / Google Sheets → send or write the parsed data

n8n Setup

  1. Node: Schedule Trigger (daily)
  2. Node: HTTP Request
    • Method: GET
    • URL: https://customer-api.teamwalnut.com/demo-sessions/summary
    • Header: x-api-key = YOUR_API_KEY
  3. Node: Slack / Gmail / Google Sheets with the response fields

Error Handling Best Practice

All code that calls the API should handle errors gracefully. Here's a reusable pattern.

Python

import requests
import time

def call_api(path, params=None, max_retries=3):
    url = f"<https://customer-api.teamwalnut.com>{path}"
    headers = {"x-api-key": "YOUR_API_KEY"}

    for attempt in range(max_retries):
        response = requests.get(url, headers=headers, params=params)

        if response.status_code == 200:
            return response.json()
        elif response.status_code == 429:
            wait = 2 ** attempt
            print(f"Rate limited. Retrying in {wait}s...")
            time.sleep(wait)
        elif response.status_code == 401:
            raise Exception("Invalid API key. Check your x-api-key header.")
        elif response.status_code == 400:
            raise Exception(f"Bad request: {response.json().get('error', 'Unknown')}")
        else:
            raise Exception(f"API error {response.status_code}: {response.text}")

    raise Exception("Max retries exceeded (rate limited)")

# Usage
data = call_api("/demo-sessions", {"limit": 10, "user_type": "external"})

Node.js

async function callApi(path, params = {}, maxRetries = 3) {
  const base = "<https://customer-api.teamwalnut.com>";
  const query = new URLSearchParams(params);

  for (let attempt = 0; attempt < maxRetries; attempt++) {
    const response = await fetch(`${base}${path}?${query}`, {
      headers: { "x-api-key": "YOUR_API_KEY" }
    });

    if (response.ok) return response.json();

    if (response.status === 429) {
      const wait = 2 ** attempt * 1000;
      console.log(`Rate limited. Retrying in ${wait}ms...`);
      await new Promise(r => setTimeout(r, wait));
      continue;
    }

    if (response.status === 401) throw new Error("Invalid API key.");
    if (response.status === 400) {
      const err = await response.json();
      throw new Error(`Bad request: ${err.error}`);
    }
    throw new Error(`API error ${response.status}`);
  }
  throw new Error("Max retries exceeded (rate limited)");
}

// Usage
const data = await callApi("/demo-sessions", { limit: 10, user_type: "external" });
Was this article helpful?
0 out of 0 found this helpful