Skip to contents

Introduction

The pro_request() function is the core data retrieval component of the openalexPro workflow. It downloads data from the OpenAlex API and saves it as JSON files to disk, enabling processing of datasets too large to fit in memory.

This vignette provides a comprehensive guide to using pro_request() and related functions, including:

Authentication

OpenAlex offers limited API access without credentials. Free API keys with substantially higher rate limits can be obtained from the OpenAlex website. Premium API access with even higher limits can also be purchased.

openalexPro uses environment variables for credentials (recommended):

# Set credentials (typically in your .Renviron file)
Sys.setenv(openalexPro.apikey = "your-api-key-here")

# Validate your credentials
pro_validate_credentials()

You can also pass credentials directly to functions:

pro_request(
  query_url = url,
  output = "data/json",
  api_key = "your-api-key"
)

To force unauthenticated mode for a call, pass api_key = NULL (or ""):

pro_request(
  query_url = url,
  output = "data/json",
  api_key = NULL
)

Basic Usage

Simple Download

The most basic usage requires a query URL (from pro_query()) and an output directory:

library(openalexPro)

# Build query
url <- pro_query(
  entity = "works",
  search = "climate change",
  from_publication_date = "2023-01-01"
)

# Download data
pro_request(
  query_url = url,
  output = "data/json"
)

Get Count Only

To check how many results a query returns without downloading:

# Get count for a single query
count <- pro_request(
  query_url = url,
  count_only = TRUE
)
count$count
# [1] 45632

# Or use pro_count() directly
meta <- pro_count(query_url = url)
meta
#   count db_response_time_ms page per_page error
# 1 45632                  42    1        1    NA

Download with Progress Bar

Progress bars are enabled by default:

pro_request(
  query_url = url,
  output = "data/json",
  progress = TRUE # Default
)
# ℹ Fetching query counts...
# ℹ Total pages to download: 229
# Downloading ████████████████░░░░░░░░░░░░░░░░ 120/229 [1m 23s]

Limiting Pages

By default, pro_request() downloads up to 10,000 pages (2 million records at 200 per page). You can adjust this:

# Download only first 100 pages (20,000 records)
pro_request(
  query_url = url,
  output = "data/json",
  pages = 100
)

# Download all pages (use with caution for large queries!)
pro_request(
  query_url = url,
  output = "data/json",
  pages = NULL
)

Handling Chunked Queries

When pro_query() returns a list of URLs (due to automatic chunking of large ID lists), pro_request() handles them seamlessly:

# Query with many DOIs triggers automatic chunking
dois <- paste0("10.1234/example", 1:150)
urls <- pro_query(entity = "works", doi = dois)

length(urls) # 3 chunks
names(urls) # "chunk_1", "chunk_2", "chunk_3"

# pro_request handles the list automatically
pro_request(
  query_url = urls,
  output = "data/json"
)
# Creates: data/json/chunk_1/, data/json/chunk_2/, data/json/chunk_3/

Parallel Downloads

For chunked queries, enable parallel processing:

pro_request(
  query_url = urls,
  output = "data/json",
  workers = 4 # Use 4 parallel workers
)

Function Parameters Reference

Parameter Type Default Description
query_url character/list required URL or list of URLs from pro_query()
pages integer/NULL 10000 Max pages to download. NULL = all pages
output character required Directory for JSON output files
overwrite logical FALSE Delete existing output directory if it exists
api_key character/NULL env var Optional API key. If NULL or "", requests are sent without key.
workers integer 1 Parallel workers for list queries
verbose logical FALSE Show detailed messages
progress logical TRUE Show progress bar
count_only logical FALSE Return count without downloading
error_log character/NULL NULL Path for error log file

pro_request() Architecture

High-Level Flow

flowchart TD
    Start([pro_request called]) --> CheckList{query_url<br/>is list?}

    CheckList -->|Yes| ListMethod[LIST METHOD:<br/>Multiple queries]
    CheckList -->|No| ScalarMethod[SCALAR METHOD:<br/>Single query]

    %% List Method Branch
    ListMethod --> SetupFutures[Setup future plan<br/>based on workers]
    SetupFutures --> CountOnly1{count_only?}

    CountOnly1 -->|Yes| ParallelCount[Parallel pro_count()<br/>for each URL]
    ParallelCount --> ReturnCounts([Return named vector])

    CountOnly1 -->|No| FetchCounts[Fetch counts for<br/>progress bar]
    FetchCounts --> CalcPages[Calculate total<br/>pages to download]
    CalcPages --> ParallelDownload[future_lapply:<br/>fetch_query_pages<br/>for each URL]
    ParallelDownload --> ReturnOutput([Return output path])

    %% Scalar Method Branch
    ScalarMethod --> CountOnly2{count_only?}
    CountOnly2 -->|Yes| SingleCount[pro_count]
    SingleCount --> ReturnMeta([Return metadata])

    CountOnly2 -->|No| FetchPages[fetch_query_pages<br/>with CLI progress]
    FetchPages --> ReturnPath([Return output path])

    style Start fill:#e1f5e1
    style ReturnCounts fill:#e1f5e1
    style ReturnOutput fill:#e1f5e1
    style ReturnMeta fill:#e1f5e1
    style ReturnPath fill:#e1f5e1
    style ListMethod fill:#cce5ff
    style ScalarMethod fill:#fff3cd

fetch_query_pages() Internal Helper

The fetch_query_pages() function handles the actual page-by-page download:

flowchart TD
    Start([fetch_query_pages called]) --> CheckOutput{output<br/>exists?}

    CheckOutput -->|Yes, overwrite=FALSE| ErrorExists[Stop: Directory exists]
    CheckOutput -->|Yes, overwrite=TRUE| DeleteDir[Delete existing<br/>directory]
    CheckOutput -->|No| CreateDir[Create output<br/>directory]

    DeleteDir --> CreateDir
    CreateDir --> CreateProgress[Create<br/>00_in.progress file]
    CreateProgress --> BuildRequest[Build httr2 request<br/>api_key optional]

    BuildRequest --> FirstRequest[First API call<br/>to inspect meta]
    FirstRequest --> CheckSingle{Single record<br/>response?}

    CheckSingle -->|Yes| SaveSingle[Save single_1.json]
    SaveSingle --> Success1([Success: Return path])

    CheckSingle -->|No| CalcMaxPages[Calculate max_pages<br/>from meta]
    CalcMaxPages --> StartProgress[Start CLI<br/>progress bar]
    StartProgress --> SaveFirst[Save first page:<br/>results_page_1.json]

    SaveFirst --> PageLoop{More pages?<br/>next_cursor exists?}

    PageLoop -->|Yes| CheckLimit{page > pages<br/>limit?}
    CheckLimit -->|Yes| Done[Exit loop]
    CheckLimit -->|No| NextRequest[Request next page<br/>with cursor]
    NextRequest --> SavePage[Save<br/>results_page_N.json]
    SavePage --> UpdateProgress[Update<br/>progress bar]
    UpdateProgress --> PageLoop

    PageLoop -->|No| Done
    Done --> DeleteProgressFile[Delete<br/>00_in.progress]
    DeleteProgressFile --> Success2([Success: Return path])

    style Start fill:#e1f5e1
    style Success1 fill:#e1f5e1
    style Success2 fill:#e1f5e1
    style ErrorExists fill:#f8d7da
    style PageLoop fill:#fff3cd

api_call() Retry Logic

All HTTP requests go through api_call() which provides robust retry logic:

flowchart TD
    Start([api_call]) --> ConfigRetry[Configure retry:<br/>max 10 attempts<br/>exponential backoff]

    ConfigRetry --> Perform[Perform request]
    Perform --> CheckStatus{HTTP Status?}

    CheckStatus -->|200 OK| ReturnSuccess([Return response])

    CheckStatus -->|429, 500-504| Transient[Transient error:<br/>wait and retry]
    Transient --> RetryCount{Retries<br/>exhausted?}
    RetryCount -->|No| Perform
    RetryCount -->|Yes| LogError[Log error]
    LogError --> Abort1([Abort with error])

    CheckStatus -->|400| Check400[Check if HTML<br/>response requested]
    Check400 -->|Yes + HTML| Return400([Return for inspection])
    Check400 -->|No| Abort2([Abort with error])

    CheckStatus -->|Other 4xx/5xx| LogOther[Log unexpected status]
    LogOther --> Abort3([Abort with error])

    style Start fill:#e1f5e1
    style ReturnSuccess fill:#e1f5e1
    style Return400 fill:#fff3cd
    style Abort1 fill:#f8d7da
    style Abort2 fill:#f8d7da
    style Abort3 fill:#f8d7da
    style Transient fill:#cce5ff

Transient Error Handling

The following HTTP status codes are considered transient and trigger automatic retry:

Status Meaning Action
429 Too Many Requests Wait with exponential backoff
500 Internal Server Error Retry up to 10 times
502 Bad Gateway Retry up to 10 times
503 Service Unavailable Retry up to 10 times
504 Gateway Timeout Retry up to 10 times

Output File Structure

After a successful download, the output directory contains:

data/json/
├── results_page_1.json
├── results_page_2.json
├── results_page_3.json
└── ...

For chunked queries (list of URLs):

data/json/
├── chunk_1/
│   ├── results_page_1.json
│   ├── results_page_2.json
│   └── ...
├── chunk_2/
│   ├── results_page_1.json
│   └── ...
└── chunk_3/
    └── ...

Progress Tracking

During download, a 00_in.progress file exists in the output directory. This file is automatically deleted upon successful completion. If present, it indicates:

  • Download is still in progress, OR
  • Download was interrupted and is incomplete

The Complete Data Pipeline

Pipeline Overview

flowchart LR
    subgraph Step1[Step 1: Query]
        PQ[pro_query]
    end

    subgraph Step2[Step 2: Download]
        PR[pro_request]
    end

    subgraph Step3[Step 3: Transform]
        PRJL[pro_request_jsonl]
    end

    subgraph Step4[Step 4: Convert]
        PRJLP[pro_request_jsonl_parquet]
    end

    subgraph Step5[Step 5: Analyze]
        DB[(DuckDB)]
    end

    PQ -->|URL/URLs| PR
    PR -->|JSON files| PRJL
    PRJL -->|JSONL files| PRJLP
    PRJLP -->|Parquet dataset| DB

    style Step1 fill:#e1f5e1
    style Step2 fill:#cce5ff
    style Step3 fill:#fff3cd
    style Step4 fill:#f8d7da
    style Step5 fill:#e1f5e1

Complete Example

library(openalexPro)

# Step 1: Build query
urls <- pro_query(
  entity = "works",
  search = "machine learning healthcare",
  from_publication_date = "2020-01-01",
  type = "article",
  select = c("ids", "title", "abstract", "publication_year", "authorships")
)

# Step 2: Download JSON from API
pro_request(
  query_url = urls,
  output = "data/json",
  pages = 1000,
  progress = TRUE
)

# Step 3: Convert to JSONL (with abstract reconstruction)
pro_request_jsonl(
  input_json = "data/json",
  output = "data/jsonl",
  progress = TRUE,
  workers = 4
)

# Step 4: Convert to Parquet (with schema harmonization)
pro_request_jsonl_parquet(
  input_jsonl = "data/jsonl",
  output = "data/parquet",
  progress = TRUE
)

# Step 5: Query with DuckDB
library(duckdb)
con <- dbConnect(duckdb())
dbGetQuery(
  con,
  "
  SELECT title, publication_year, cited_by_count
  FROM read_parquet('data/parquet/**/*.parquet')
  ORDER BY cited_by_count DESC
  LIMIT 10
"
)

pro_request_jsonl()

Purpose

Converts the raw JSON files from OpenAlex into JSON Lines (JSONL) format, which is more efficient for processing with DuckDB. During conversion, it also:

  1. Reconstructs abstracts from the abstract_inverted_index field
  2. Generates citation strings (e.g., “Smith et al. (2023)”)
  3. Adds a page field for tracking data provenance
  4. Optionally adds custom columns

Function Signature

pro_request_jsonl(
  input_json = NULL,
  output = NULL,
  add_columns = list(),
  overwrite = FALSE,
  verbose = TRUE,
  progress = TRUE,
  delete_input = FALSE,
  workers = 1
)

Parameters

Parameter Type Default Description
input_json character required Directory of JSON files from pro_request()
output character required Output directory for JSONL files
add_columns list list() Named list of additional columns to add
overwrite logical FALSE Overwrite existing output
verbose logical TRUE Show detailed messages
progress logical TRUE Show progress bar
delete_input logical FALSE Delete input JSON after conversion
workers integer 1 Number of parallel workers

Internal Flow

flowchart TD
    Start([pro_request_jsonl]) --> ValidateInputs[Validate inputs]
    ValidateInputs --> CheckOverwrite{Output exists?}

    CheckOverwrite -->|Yes, overwrite=FALSE| ErrorExists[Stop: Output exists]
    CheckOverwrite -->|Yes, overwrite=TRUE| DeleteOutput[Delete output dir]
    CheckOverwrite -->|No| CreateOutput[Create output dir]

    DeleteOutput --> CreateOutput
    CreateOutput --> CreateProgress[Create 00_in.progress]
    CreateProgress --> ListJSONs[List all JSON files<br/>recursively]

    ListJSONs --> DetectType[Detect type:<br/>results, group_by, single]
    DetectType --> SetupFutures[Setup parallel plan<br/>if workers > 1]

    SetupFutures --> ProcessLoop[future_lapply over<br/>JSON files]

    ProcessLoop --> ForEachJSON[For each JSON file]
    ForEachJSON --> ExtractPage[Extract page number<br/>from filename]
    ExtractPage --> CallJQ[jq_execute:<br/>transform JSON]
    CallJQ --> CheckSize{Output > 5 bytes?}
    CheckSize -->|No| DeleteEmpty[Delete empty file]
    CheckSize -->|Yes| UpdateProgress[Update progress]
    DeleteEmpty --> UpdateProgress
    UpdateProgress --> MoreFiles{More files?}
    MoreFiles -->|Yes| ForEachJSON
    MoreFiles -->|No| Cleanup[Delete input if<br/>delete_input=TRUE]

    Cleanup --> Success([Return output path])

    style Start fill:#e1f5e1
    style Success fill:#e1f5e1
    style ErrorExists fill:#f8d7da
    style ProcessLoop fill:#cce5ff

jq_execute() Transformation

The jq_execute() function performs the actual JSON transformation using the jqr package:

flowchart TD
    Start([jq_execute]) --> SelectRoot{Type?}

    SelectRoot -->|results| ExtractResults[".results[] |"]
    SelectRoot -->|group_by| ExtractGroups[".group_by[] |"]
    SelectRoot -->|single| NoExtract[Direct processing]

    ExtractResults --> Transform
    ExtractGroups --> Transform
    NoExtract --> Transform

    Transform[Apply jq filter] --> AbstractCheck{Has abstract_<br/>inverted_index?}

    AbstractCheck -->|Yes| ReconstructAbstract[Reconstruct abstract<br/>from inverted index]
    AbstractCheck -->|No| SkipAbstract[Keep as-is]

    ReconstructAbstract --> GenerateCitation
    SkipAbstract --> GenerateCitation

    GenerateCitation[Generate citation string<br/>based on author count]
    GenerateCitation --> AddColumns{Custom columns?}

    AddColumns -->|Yes| InsertColumns[Add custom columns]
    AddColumns -->|No| SkipColumns[Skip]

    InsertColumns --> DeleteInverted
    SkipColumns --> DeleteInverted

    DeleteInverted[Delete abstract_inverted_index]
    DeleteInverted --> AddPage{Page specified?}

    AddPage -->|Yes| InsertPage[Add page field]
    AddPage -->|No| SkipPage[Skip]

    InsertPage --> WriteJSONL[Write to JSONL file]
    SkipPage --> WriteJSONL

    WriteJSONL --> Done([Return output path])

    style Start fill:#e1f5e1
    style Done fill:#e1f5e1
    style Transform fill:#cce5ff
    style GenerateCitation fill:#fff3cd

Abstract Reconstruction

OpenAlex stores abstracts as inverted indices for efficiency. The transformation reconstructs readable text:

Input (inverted index):

{
  "abstract_inverted_index": {
    "Climate": [0],
    "change": [1, 5],
    "affects": [2],
    "biodiversity": [3],
    "and": [4]
  }
}

Output (reconstructed):

{
  "abstract": "Climate change affects biodiversity and change"
}

Citation String Generation

The function generates citation strings based on author count:

Authors Citation Format
1 “Smith (2023)”
2 “Smith & Jones (2023)”
3+ “Smith et al. (2023)”

pro_request_jsonl_parquet()

Purpose

Converts JSONL files to Apache Parquet format for efficient analytical queries. Key features:

  1. Automatic schema harmonization across all files
  2. Partitioning by page for efficient filtering
  3. SNAPPY compression for good compression/speed balance
  4. Streaming conversion - no memory limit on total dataset size

Function Signature

pro_request_jsonl_parquet(
  input_jsonl = NULL,
  output = NULL,
  overwrite = FALSE,
  verbose = TRUE,
  progress = TRUE,
  delete_input = FALSE,
  sample_size = 1000
)

Parameters

Parameter Type Default Description
input_jsonl character required Directory of JSONL files
output character required Output directory for Parquet dataset
overwrite logical FALSE Overwrite existing output
verbose logical TRUE Show detailed messages
progress logical TRUE Show progress bar
delete_input logical FALSE Delete input JSONL after conversion
sample_size integer 1000 Records to sample for schema inference

Internal Flow

flowchart TD
    Start([pro_request_jsonl_parquet]) --> ValidateInputs[Validate inputs]
    ValidateInputs --> SetupDuckDB[Create in-memory<br/>DuckDB connection]
    SetupDuckDB --> LoadJSON[INSTALL json;<br/>LOAD json;]

    LoadJSON --> CheckOverwrite{Output exists?}
    CheckOverwrite -->|Yes, overwrite=FALSE| ErrorExists[Stop: Output exists]
    CheckOverwrite -->|Yes, overwrite=TRUE| DeleteOutput[Delete output]
    CheckOverwrite -->|No| CreateOutput[Create output dir]

    DeleteOutput --> CreateOutput
    CreateOutput --> CreateProgress[Create 00_in.progress]
    CreateProgress --> ListJSONL[List all JSONL files]

    ListJSONL --> InferSchema[Infer unified schema<br/>from all files]

    InferSchema --> SchemaSQL["DESCRIBE SELECT *<br/>FROM read_json_auto<br/>(union_by_name=true)"]
    SchemaSQL --> SchemaSuccess{Schema<br/>inferred?}

    SchemaSuccess -->|Yes| BuildColumns[Build columns clause<br/>for explicit schema]
    SchemaSuccess -->|No| FallbackAuto[Fall back to<br/>per-file auto-detect]

    BuildColumns --> ConvertLoop
    FallbackAuto --> ConvertLoop

    ConvertLoop[For each JSONL file]
    ConvertLoop --> CopySQL["COPY (...) TO output<br/>(FORMAT PARQUET,<br/>PARTITION_BY 'page',<br/>APPEND)"]
    CopySQL --> UpdateProgress[Update progress]
    UpdateProgress --> MoreFiles{More files?}
    MoreFiles -->|Yes| ConvertLoop
    MoreFiles -->|No| Cleanup

    Cleanup[Delete input if requested]
    Cleanup --> CloseDB[Close DuckDB connection]
    CloseDB --> Success([Return output path])

    style Start fill:#e1f5e1
    style Success fill:#e1f5e1
    style ErrorExists fill:#f8d7da
    style InferSchema fill:#cce5ff
    style ConvertLoop fill:#fff3cd

Schema Harmonization

Different OpenAlex records can have different field structures. For example:

  • Some works have primary_location as a struct, others as a string
  • Some have nested arrays, others have null values

The function samples records from all files to infer a unified schema that accommodates all variations:

flowchart LR
    subgraph Input[Input JSONL Files]
        F1[File 1:<br/>location: struct]
        F2[File 2:<br/>location: string]
        F3[File 3:<br/>location: null]
    end

    subgraph Schema[Schema Inference]
        Sample[Sample from<br/>all files]
        Union[union_by_name=true]
        Describe[DESCRIBE query]
    end

    subgraph Output[Unified Schema]
        Unified[location: struct<br/>(accommodates all)]
    end

    F1 --> Sample
    F2 --> Sample
    F3 --> Sample
    Sample --> Union
    Union --> Describe
    Describe --> Unified

    style Input fill:#f8d7da
    style Schema fill:#cce5ff
    style Output fill:#e1f5e1

Output Structure

The Parquet dataset is partitioned by the page field:

data/parquet/
├── page=chunk_1_1/
│   └── data_0.parquet
├── page=chunk_1_2/
│   └── data_0.parquet
├── page=chunk_2_1/
│   └── data_0.parquet
└── ...

This partitioning enables efficient filtering when querying subsets of the data.


Error Handling

Common Errors

Output Directory Exists

pro_request(query_url = url, output = "data/json")
# Error: Directory data/json exists.
# Either specify `overwrite = TRUE` or delete it.

# Solution:
pro_request(query_url = url, output = "data/json", overwrite = TRUE)

No Output Specified

pro_request(query_url = url)
# Error: No `output` specified!

API Rate Limiting

If you hit rate limits (HTTP 429), the function automatically retries with exponential backoff. To reduce rate limit issues:

  1. Use an API key (free from OpenAlex)
  2. Reduce parallel workers
  3. Add delays between large queries

You can check your current budget and remaining allowance at any time with:

This queries the /rate-limit endpoint and prints your daily budget, amount used, remaining balance, and seconds until the daily reset.

Request Size Exceeded

OpenAlex has a maximum request size of 4094 characters. If exceeded:

meta <- pro_count(query_url = very_long_url)
meta$count
# [1] -4195  # Negative indicates error

meta$error
# [1] "ERROR: Request size exceeds the maximum limit of 4094."

Solution: Use pro_query() with chunking (it handles this automatically).

Best Practices

1. Always Use Progress Bars

Progress bars help monitor long downloads and estimate completion:

pro_request(url, output = "data", progress = TRUE)

2. Use Appropriate Page Limits

Start with smaller page limits during development:

# Development: small sample
pro_request(url, output = "test", pages = 10)

# Production: full download
pro_request(url, output = "data", pages = 10000)

3. Enable Error Logging for Large Jobs

pro_request(
  query_url = urls,
  output = "data/json",
  error_log = "download_errors.log"
)

4. Clean Up Intermediate Files

Use delete_input = TRUE to save disk space:

# Convert and delete JSON
pro_request_jsonl(
  input_json = "data/json",
  output = "data/jsonl",
  delete_input = TRUE
)

# Convert and delete JSONL
pro_request_jsonl_parquet(
  input_jsonl = "data/jsonl",
  output = "data/parquet",
  delete_input = TRUE
)

5. Use Parallel Processing Wisely

More workers isn’t always better:

# Good for many small chunks
pro_request(urls, output = "data", workers = 4)

# For JSONL conversion (CPU-bound)
pro_request_jsonl(input, output, workers = parallel::detectCores() - 1)

See Also

References