Skip to content

Usage Examples

Practical examples for integrating with Sync API.

Table of Contents


Python Examples

Basic Import

import httpx
import uuid
import asyncio

API_KEY = "your-api-key"
BASE_URL = "https://import-api.liddex.ru"

async def import_contacts(contacts: list[dict]) -> dict:
    """Import contacts to AI."""
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Idempotency-Key": str(uuid.uuid4()),
        "Content-Type": "application/json",
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{BASE_URL}/v1/contacts/import",
            json=contacts,
            headers=headers,
            timeout=30.0,
        )
        response.raise_for_status()
        return response.json()

# Usage
contacts = [
    {
        "phone": "+79001234567",
        "tags": ["лид", "сайт"],
        "additionalFields": {
            "source": "landing",
            "campaign": "summer2024",
        }
    },
    {
        "phone": "89001234568",
        "tags": ["клиент"],
        "additionalFields": {
            "source": "referral",
        }
    }
]

result = asyncio.run(import_contacts(contacts))
print(f"Job ID: {result['job_id']}")
print(f"Valid contacts: {result['valid_contacts']}")

Polling Job Status

async def wait_for_job_completion(
    job_id: str,
    max_wait: int = 300,
    poll_interval: int = 5
) -> dict:
    """Poll job status until completion or timeout."""
    headers = {"Authorization": f"Bearer {API_KEY}"}
    elapsed = 0

    async with httpx.AsyncClient() as client:
        while elapsed < max_wait:
            response = await client.get(
                f"{BASE_URL}/v1/contacts/import/{job_id}",
                headers=headers,
            )
            response.raise_for_status()
            status = response.json()

            if status["status"] in ["completed", "failed"]:
                return status

            await asyncio.sleep(poll_interval)
            elapsed += poll_interval

    raise TimeoutError(f"Job {job_id} did not complete within {max_wait}s")

# Usage
result = asyncio.run(import_contacts(contacts))
final_status = asyncio.run(wait_for_job_completion(result["job_id"]))
print(f"Status: {final_status['status']}")
print(f"Sent: {final_status['sent_contacts']}/{final_status['valid_contacts']}")

With HMAC Signature

import hmac
import hashlib
from datetime import datetime, timezone

def generate_hmac_signature(secret: str, timestamp: str, body: bytes) -> str:
    """Generate HMAC-SHA256 signature."""
    message = f"{timestamp}\n".encode() + body
    return hmac.new(secret.encode(), message, hashlib.sha256).hexdigest()

async def import_with_hmac(contacts: list[dict], hmac_secret: str) -> dict:
    """Import contacts with HMAC signature."""
    timestamp = datetime.now(timezone.utc).isoformat()
    body = json.dumps(contacts).encode()
    signature = generate_hmac_signature(hmac_secret, timestamp, body)

    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Idempotency-Key": str(uuid.uuid4()),
        "X-Signature": signature,
        "X-Timestamp": timestamp,
        "Content-Type": "application/json",
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{BASE_URL}/v1/contacts/import",
            content=body,
            headers=headers,
        )
        response.raise_for_status()
        return response.json()

Debug Mode (Dry-Run)

async def validate_contacts(contacts: list[dict]) -> dict:
    """Validate contacts without sending to AI."""
    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Idempotency-Key": str(uuid.uuid4()),
        "X-Debug": "true",
        "Content-Type": "application/json",
    }

    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{BASE_URL}/v1/contacts/import",
            json=contacts,
            headers=headers,
        )
        response.raise_for_status()
        return response.json()

# Usage
result = asyncio.run(validate_contacts([
    {"phone": "+79001234567", "tags": [], "additionalFields": {}},
    {"phone": "invalid", "tags": [], "additionalFields": {}},
]))

print(f"Valid: {result['valid_contacts']}")
print(f"Invalid: {result['invalid_contacts']}")

for contact in result['validation_results']:
    if not contact['is_valid']:
        print(f"Error: {contact['original_phone']} - {contact['error']}")

JavaScript/Node.js Examples

Basic Import

const axios = require('axios');
const { v4: uuidv4 } = require('uuid');

const API_KEY = 'your-api-key';
const BASE_URL = 'https://import-api.liddex.ru';

async function importContacts(contacts) {
    const response = await axios.post(
        `${BASE_URL}/v1/contacts/import`,
        contacts,
        {
            headers: {
                'Authorization': `Bearer ${API_KEY}`,
                'Idempotency-Key': uuidv4(),
                'Content-Type': 'application/json',
            },
            timeout: 30000,
        }
    );
    return response.data;
}

// Usage
const contacts = [
    {
        phone: '+79001234567',
        tags: ['лид', 'сайт'],
        additionalFields: {
            source: 'landing',
            campaign: 'summer2024',
        }
    }
];

importContacts(contacts)
    .then(result => {
        console.log('Job ID:', result.job_id);
        console.log('Valid contacts:', result.valid_contacts);
    })
    .catch(error => {
        console.error('Error:', error.response?.data || error.message);
    });

With Retry Logic

const axios = require('axios');
const axiosRetry = require('axios-retry');

const client = axios.create({
    baseURL: BASE_URL,
    timeout: 30000,
});

// Configure retry
axiosRetry(client, {
    retries: 3,
    retryDelay: axiosRetry.exponentialDelay,
    retryCondition: (error) => {
        // Retry on network errors or 5xx
        return axiosRetry.isNetworkOrIdempotentRequestError(error) ||
               (error.response && error.response.status >= 500);
    },
});

async function importContactsWithRetry(contacts) {
    const response = await client.post('/v1/contacts/import', contacts, {
        headers: {
            'Authorization': `Bearer ${API_KEY}`,
            'Idempotency-Key': uuidv4(),
            'Content-Type': 'application/json',
        },
    });
    return response.data;
}

TypeScript Example

import axios, { AxiosInstance } from 'axios';
import { v4 as uuidv4 } from 'uuid';

interface Contact {
    phone: string;
    tags: string[];
    additionalFields: Record<string, any>;
}

interface ImportResponse {
    job_id: string;
    correlation_id: string;
    idempotency_status: string;
    status: string;
    total_contacts: number;
    valid_contacts: number;
    invalid_contacts: number;
    chunks_count: number;
    created_at: string;
}

class SyncSashaClient {
    private client: AxiosInstance;

    constructor(apiKey: string, baseURL: string = 'https://import-api.liddex.ru') {
        this.client = axios.create({
            baseURL,
            headers: {
                'Authorization': `Bearer ${apiKey}`,
                'Content-Type': 'application/json',
            },
            timeout: 30000,
        });
    }

    async importContacts(contacts: Contact[]): Promise<ImportResponse> {
        const response = await this.client.post<ImportResponse>(
            '/v1/contacts/import',
            contacts,
            {
                headers: {
                    'Idempotency-Key': uuidv4(),
                },
            }
        );
        return response.data;
    }

    async getJobStatus(jobId: string): Promise<any> {
        const response = await this.client.get(`/v1/contacts/import/${jobId}`);
        return response.data;
    }
}

// Usage
const client = new SyncSashaClient('your-api-key');

const contacts: Contact[] = [
    {
        phone: '+79001234567',
        tags: ['test'],
        additionalFields: { source: 'typescript' },
    }
];

client.importContacts(contacts)
    .then(result => console.log('Job ID:', result.job_id))
    .catch(error => console.error('Error:', error.message));

cURL Examples

Basic Import

#!/bin/bash

API_KEY="your-api-key"
BASE_URL="https://import-api.liddex.ru"
IDEMPOTENCY_KEY=$(uuidgen)

curl -X POST "${BASE_URL}/v1/contacts/import" \
  -H "Authorization: Bearer ${API_KEY}" \
  -H "Idempotency-Key: ${IDEMPOTENCY_KEY}" \
  -H "Content-Type: application/json" \
  -d '[
    {
      "phone": "+79001234567",
      "tags": ["лид", "сайт"],
      "additionalFields": {
        "source": "landing",
        "campaign": "summer2024"
      }
    }
  ]'

Save Job ID and Check Status

#!/bin/bash

# Import and extract job ID
RESPONSE=$(curl -s -X POST "${BASE_URL}/v1/contacts/import" \
  -H "Authorization: Bearer ${API_KEY}" \
  -H "Idempotency-Key: $(uuidgen)" \
  -H "Content-Type: application/json" \
  -d '[{"phone": "+79001234567", "tags": [], "additionalFields": {}}]')

JOB_ID=$(echo $RESPONSE | jq -r '.job_id')
echo "Job ID: $JOB_ID"

# Wait and check status
sleep 5

curl -s -X GET "${BASE_URL}/v1/contacts/import/${JOB_ID}" \
  -H "Authorization: Bearer ${API_KEY}" | jq

Error Handling

Python

from httpx import HTTPStatusError

async def import_with_error_handling(contacts: list[dict]) -> dict:
    try:
        return await import_contacts(contacts)
    except HTTPStatusError as e:
        if e.response.status_code == 400:
            print("Validation error:", e.response.json())
        elif e.response.status_code == 401:
            print("Authentication failed - check API key")
        elif e.response.status_code == 422:
            print("Idempotency conflict - key reused with different data")
        elif e.response.status_code == 429:
            print("Rate limit exceeded - retry later")
        else:
            print(f"Unexpected error: {e.response.status_code}")
        raise

JavaScript

async function importWithErrorHandling(contacts) {
    try {
        return await importContacts(contacts);
    } catch (error) {
        if (error.response) {
            const status = error.response.status;
            const data = error.response.data;

            switch (status) {
                case 400:
                    console.error('Validation error:', data);
                    break;
                case 401:
                    console.error('Authentication failed - check API key');
                    break;
                case 422:
                    console.error('Idempotency conflict:', data);
                    break;
                case 429:
                    console.error('Rate limit exceeded - retry later');
                    break;
                default:
                    console.error(`Unexpected error: ${status}`, data);
            }
        } else {
            console.error('Network error:', error.message);
        }
        throw error;
    }
}

Advanced Usage

Batch Processing

async def import_in_batches(
    all_contacts: list[dict],
    batch_size: int = 1000,
) -> list[str]:
    """Import contacts in batches."""
    job_ids = []

    for i in range(0, len(all_contacts), batch_size):
        batch = all_contacts[i:i + batch_size]
        result = await import_contacts(batch)
        job_ids.append(result['job_id'])

        print(f"Batch {i // batch_size + 1}: Job {result['job_id']}")

        # Rate limiting
        await asyncio.sleep(0.5)

    return job_ids

Monitoring Progress

async def monitor_jobs(job_ids: list[str]) -> dict:
    """Monitor multiple jobs."""
    results = {"completed": 0, "failed": 0, "pending": 0}

    async with httpx.AsyncClient() as client:
        for job_id in job_ids:
            response = await client.get(
                f"{BASE_URL}/v1/contacts/import/{job_id}",
                headers={"Authorization": f"Bearer {API_KEY}"},
            )
            status = response.json()

            if status["status"] == "completed":
                results["completed"] += 1
            elif status["status"] == "failed":
                results["failed"] += 1
            else:
                results["pending"] += 1

    return results

See Also