Rate Limits

The Scraper API implements intelligent rate limiting to ensure fair resource allocation and optimal performance. Understanding these limits helps you build efficient, scalable applications.

Concurrency Limits

Each account has a maximum number of simultaneous requests that can be processed. This limit is enforced per API key.

How It Works

  1. When you start a request, your concurrency counter increments
  2. If the counter exceeds your limit, the request is rejected with 429 Too Many Requests
  3. When a request completes (success or failure), the counter decrements
  4. Counters automatically expire after 60 seconds to prevent stuck states

Checking Your Limits

Every response includes rate limit headers:

X-RateLimit-Limit: 5
X-RateLimit-Remaining: 3
Header Description
X-RateLimit-Limit Maximum concurrent requests allowed
X-RateLimit-Remaining Current available capacity

Rate Limit Response

When you exceed your limit:

Status: 429 Too Many Requests

{
  "success": false,
  "error": "Concurrency limit exceeded (5/5)",
  "limit": 5,
  "reset": 60,
  "timestamp": "2025-10-29T12:00:00Z"
}
Field Description
limit Your maximum concurrent requests
reset Seconds until limit resets (default: 60)

Best Practices

1. Monitor Remaining Capacity

Check the X-RateLimit-Remaining header before sending batches:

import requests

def check_capacity(api_key):
    response = requests.get(
        f"https://scrape.evomi.com/api/v1/scraper/health?api_key={api_key}"
    )
    remaining = int(response.headers.get("X-RateLimit-Remaining", 0))
    return remaining

# Use it
if check_capacity(api_key) >= 3:
    # Send batch of 3 requests
    process_batch(urls[:3])

2. Implement Exponential Backoff

When rate limited, wait before retrying:

import time

def scrape_with_backoff(url, api_key, max_retries=3):
    for attempt in range(max_retries):
        response = requests.get(
            f"https://scrape.evomi.com/api/v1/scraper/realtime?url={url}&api_key={api_key}"
        )
        
        if response.status_code == 200:
            return response
        
        elif response.status_code == 429:
            # Rate limited
            data = response.json()
            wait_time = data.get("reset", 60)
            
            if attempt < max_retries - 1:
                print(f"Rate limited. Waiting {wait_time}s...")
                time.sleep(wait_time)
            else:
                raise Exception("Max retries exceeded")
        
        else:
            return response

3. Use Async Mode for Batches

For large batches, use async=true to submit tasks without blocking:

import asyncio
import aiohttp

async def submit_async_task(session, url, api_key):
    async with session.post(
        "https://scrape.evomi.com/api/v1/scraper/realtime",
        headers={"x-api-key": api_key},
        json={"url": url, "async": True}
    ) as response:
        data = await response.json()
        return data["task_id"]

async def process_batch(urls, api_key):
    async with aiohttp.ClientSession() as session:
        tasks = [submit_async_task(session, url, api_key) for url in urls]
        task_ids = await asyncio.gather(*tasks)
        return task_ids

# Submit 100 URLs without waiting for results
task_ids = asyncio.run(process_batch(urls, api_key))

4. Queue Requests Locally

Implement a request queue to respect your concurrency limit:

from queue import Queue
from threading import Thread
import time

class ScraperQueue:
    def __init__(self, api_key, max_concurrent=5):
        self.api_key = api_key
        self.max_concurrent = max_concurrent
        self.queue = Queue()
        self.active = 0
        
    def scrape(self, url):
        """Add URL to queue"""
        self.queue.put(url)
    
    def worker(self):
        """Worker thread that processes queue"""
        while True:
            url = self.queue.get()
            
            # Wait if at capacity
            while self.active >= self.max_concurrent:
                time.sleep(0.1)
            
            self.active += 1
            try:
                response = requests.get(
                    f"https://scrape.evomi.com/api/v1/scraper/realtime?url={url}&api_key={self.api_key}"
                )
                self.process_response(response)
            finally:
                self.active -= 1
                self.queue.task_done()
    
    def start(self, num_workers=5):
        """Start worker threads"""
        for _ in range(num_workers):
            t = Thread(target=self.worker, daemon=True)
            t.start()

# Usage
scraper = ScraperQueue(api_key, max_concurrent=5)
scraper.start(num_workers=5)

# Add URLs
for url in urls:
    scraper.scrape(url)

# Wait for completion
scraper.queue.join()

5. Pace Your Requests

Even within your concurrency limit, avoid bursting:

import time

def scrape_with_pacing(urls, api_key, delay=0.5):
    """Scrape URLs with minimum delay between requests"""
    results = []
    
    for url in urls:
        response = scrape(url, api_key)
        results.append(response)
        time.sleep(delay)  # 500ms between requests
    
    return results

Timeout Management

Different modes have different timeout limits:

Mode Timeout What Happens After
request 30 seconds Auto-converts to async task
browser 45 seconds Auto-converts to async task
auto 30-45 seconds Depends on mode used

Handling Timeouts

When a synchronous request times out, you receive a 202 Accepted response with a task ID:

{
  "success": true,
  "task_id": "task_abc123",
  "status": "processing",
  "message": "Task is taking longer than expected. Use task_id to check status.",
  "check_url": "/api/v1/scraper/tasks/task_abc123"
}

Check Status:

curl "https://scrape.evomi.com/api/v1/scraper/tasks/task_abc123?api_key=YOUR_API_KEY"

Async Processing

For long-running or batch jobs, use async mode from the start:

Submit Async Request

curl -X POST "https://scrape.evomi.com/api/v1/scraper/realtime" \
  -H "x-api-key: YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "url": "https://example.com",
    "async": true
  }'

Response:

{
  "task_id": "task_abc123",
  "status": "processing",
  "check_url": "/api/v1/scraper/tasks/task_abc123"
}

Poll for Results

import time

def wait_for_task(task_id, api_key, max_wait=120):
    """Poll task status until completion"""
    url = f"https://scrape.evomi.com/api/v1/scraper/tasks/{task_id}"
    headers = {"x-api-key": api_key}
    
    start_time = time.time()
    
    while time.time() - start_time < max_wait:
        response = requests.get(url, headers=headers)
        data = response.json()
        
        status = data.get("status")
        
        if status == "completed":
            return data["result"]
        elif status == "failed":
            raise Exception(f"Task failed: {data.get('error')}")
        
        # Still processing
        time.sleep(2)  # Poll every 2 seconds
    
    raise TimeoutError("Task did not complete in time")
ℹ️
Task results are stored in Redis and expire after 10 minutes. Results are automatically deleted after successful retrieval.

Optimizing for Scale

Strategy 1: Use Datacenter Proxies

Datacenter proxies are faster, reducing request duration and freeing up concurrency slots quicker:

{
  "proxy_type": "datacenter"
}

Strategy 2: Block Unnecessary Resources

Speed up requests by blocking images, stylesheets, and fonts:

{
  "block_resources": ["image", "stylesheet", "font", "media"]
}

Strategy 3: Use Request Mode When Possible

If JavaScript isn’t needed, use request mode for 6× faster execution:

{
  "mode": "request"
}

Strategy 4: Batch with Async

Submit large batches with async=true, then poll results:

# Submit 100 tasks
task_ids = []
for url in urls[:100]:
    response = submit_async(url, api_key)
    task_ids.append(response['task_id'])

# Poll all tasks
results = [wait_for_task(tid, api_key) for tid in task_ids]

Error Recovery

Always implement proper error handling:

def scrape_with_recovery(url, api_key):
    try:
        response = requests.get(
            "https://scrape.evomi.com/api/v1/scraper/realtime",
            params={"url": url},
            headers={"x-api-key": api_key},
            timeout=60
        )
        
        if response.status_code == 200:
            return response
        elif response.status_code == 429:
            # Rate limited - wait and retry
            time.sleep(60)
            return scrape_with_recovery(url, api_key)
        elif response.status_code == 408:
            # Timeout - check task status
            data = response.json()
            return wait_for_task(data['task_id'], api_key)
        else:
            # Other error
            print(f"Error: {response.status_code}")
            return None
            
    except requests.exceptions.Timeout:
        print("Request timed out")
        return None
⚠️
Don’t hammer the API with retries. Always implement exponential backoff and respect the reset time in rate limit responses.