# Set credentials (typically in your .Renviron file)
Sys.setenv(openalexPro.apikey = "your-api-key-here")
# Validate your credentials
pro_validate_credentials()Introduction
The pro_request() function is the core data retrieval component of the openalexPro workflow. It downloads data from the OpenAlex API and saves it as JSON files to disk, enabling processing of datasets too large to fit in memory.
This vignette provides a comprehensive guide to using pro_request() and related functions, including:
- Basic usage patterns
- Progress bars and parallel processing
- The complete data pipeline (
pro_request()→pro_request_jsonl()→pro_request_jsonl_parquet()) - Internal architecture and flow diagrams
- Error handling and retry logic
Authentication
OpenAlex offers limited API access without credentials. Free API keys with substantially higher rate limits can be obtained from the OpenAlex website. Premium API access with even higher limits can also be purchased.
openalexPro uses environment variables for credentials (recommended):
You can also pass credentials directly to functions:
pro_request(
query_url = url,
output = "data/json",
api_key = "your-api-key"
)To force unauthenticated mode for a call, pass api_key = NULL (or ""):
pro_request(
query_url = url,
output = "data/json",
api_key = NULL
)Basic Usage
Simple Download
The most basic usage requires a query URL (from pro_query()) and an output directory:
library(openalexPro)
# Build query
url <- pro_query(
entity = "works",
search = "climate change",
from_publication_date = "2023-01-01"
)
# Download data
pro_request(
query_url = url,
output = "data/json"
)Get Count Only
To check how many results a query returns without downloading:
# Get count for a single query
count <- pro_request(
query_url = url,
count_only = TRUE
)
count$count
# [1] 45632
# Or use pro_count() directly
meta <- pro_count(query_url = url)
meta
# count db_response_time_ms page per_page error
# 1 45632 42 1 1 NADownload with Progress Bar
Progress bars are enabled by default:
pro_request(
query_url = url,
output = "data/json",
progress = TRUE # Default
)
# ℹ Fetching query counts...
# ℹ Total pages to download: 229
# Downloading ████████████████░░░░░░░░░░░░░░░░ 120/229 [1m 23s]Limiting Pages
By default, pro_request() downloads up to 10,000 pages (2 million records at 200 per page). You can adjust this:
# Download only first 100 pages (20,000 records)
pro_request(
query_url = url,
output = "data/json",
pages = 100
)
# Download all pages (use with caution for large queries!)
pro_request(
query_url = url,
output = "data/json",
pages = NULL
)Handling Chunked Queries
When pro_query() returns a list of URLs (due to automatic chunking of large ID lists), pro_request() handles them seamlessly:
# Query with many DOIs triggers automatic chunking
dois <- paste0("10.1234/example", 1:150)
urls <- pro_query(entity = "works", doi = dois)
length(urls) # 3 chunks
names(urls) # "chunk_1", "chunk_2", "chunk_3"
# pro_request handles the list automatically
pro_request(
query_url = urls,
output = "data/json"
)
# Creates: data/json/chunk_1/, data/json/chunk_2/, data/json/chunk_3/Parallel Downloads
For chunked queries, enable parallel processing:
pro_request(
query_url = urls,
output = "data/json",
workers = 4 # Use 4 parallel workers
)Function Parameters Reference
| Parameter | Type | Default | Description |
|---|---|---|---|
query_url |
character/list | required | URL or list of URLs from pro_query()
|
pages |
integer/NULL | 10000 | Max pages to download. NULL = all pages |
output |
character | required | Directory for JSON output files |
overwrite |
logical | FALSE | Delete existing output directory if it exists |
api_key |
character/NULL | env var | Optional API key. If NULL or "", requests are sent without key. |
workers |
integer | 1 | Parallel workers for list queries |
verbose |
logical | FALSE | Show detailed messages |
progress |
logical | TRUE | Show progress bar |
count_only |
logical | FALSE | Return count without downloading |
error_log |
character/NULL | NULL | Path for error log file |
pro_request() Architecture
High-Level Flow
flowchart TD
Start([pro_request called]) --> CheckList{query_url<br/>is list?}
CheckList -->|Yes| ListMethod[LIST METHOD:<br/>Multiple queries]
CheckList -->|No| ScalarMethod[SCALAR METHOD:<br/>Single query]
%% List Method Branch
ListMethod --> SetupFutures[Setup future plan<br/>based on workers]
SetupFutures --> CountOnly1{count_only?}
CountOnly1 -->|Yes| ParallelCount[Parallel pro_count()<br/>for each URL]
ParallelCount --> ReturnCounts([Return named vector])
CountOnly1 -->|No| FetchCounts[Fetch counts for<br/>progress bar]
FetchCounts --> CalcPages[Calculate total<br/>pages to download]
CalcPages --> ParallelDownload[future_lapply:<br/>fetch_query_pages<br/>for each URL]
ParallelDownload --> ReturnOutput([Return output path])
%% Scalar Method Branch
ScalarMethod --> CountOnly2{count_only?}
CountOnly2 -->|Yes| SingleCount[pro_count]
SingleCount --> ReturnMeta([Return metadata])
CountOnly2 -->|No| FetchPages[fetch_query_pages<br/>with CLI progress]
FetchPages --> ReturnPath([Return output path])
style Start fill:#e1f5e1
style ReturnCounts fill:#e1f5e1
style ReturnOutput fill:#e1f5e1
style ReturnMeta fill:#e1f5e1
style ReturnPath fill:#e1f5e1
style ListMethod fill:#cce5ff
style ScalarMethod fill:#fff3cd
fetch_query_pages() Internal Helper
The fetch_query_pages() function handles the actual page-by-page download:
flowchart TD
Start([fetch_query_pages called]) --> CheckOutput{output<br/>exists?}
CheckOutput -->|Yes, overwrite=FALSE| ErrorExists[Stop: Directory exists]
CheckOutput -->|Yes, overwrite=TRUE| DeleteDir[Delete existing<br/>directory]
CheckOutput -->|No| CreateDir[Create output<br/>directory]
DeleteDir --> CreateDir
CreateDir --> CreateProgress[Create<br/>00_in.progress file]
CreateProgress --> BuildRequest[Build httr2 request<br/>api_key optional]
BuildRequest --> FirstRequest[First API call<br/>to inspect meta]
FirstRequest --> CheckSingle{Single record<br/>response?}
CheckSingle -->|Yes| SaveSingle[Save single_1.json]
SaveSingle --> Success1([Success: Return path])
CheckSingle -->|No| CalcMaxPages[Calculate max_pages<br/>from meta]
CalcMaxPages --> StartProgress[Start CLI<br/>progress bar]
StartProgress --> SaveFirst[Save first page:<br/>results_page_1.json]
SaveFirst --> PageLoop{More pages?<br/>next_cursor exists?}
PageLoop -->|Yes| CheckLimit{page > pages<br/>limit?}
CheckLimit -->|Yes| Done[Exit loop]
CheckLimit -->|No| NextRequest[Request next page<br/>with cursor]
NextRequest --> SavePage[Save<br/>results_page_N.json]
SavePage --> UpdateProgress[Update<br/>progress bar]
UpdateProgress --> PageLoop
PageLoop -->|No| Done
Done --> DeleteProgressFile[Delete<br/>00_in.progress]
DeleteProgressFile --> Success2([Success: Return path])
style Start fill:#e1f5e1
style Success1 fill:#e1f5e1
style Success2 fill:#e1f5e1
style ErrorExists fill:#f8d7da
style PageLoop fill:#fff3cd
api_call() Retry Logic
All HTTP requests go through api_call() which provides robust retry logic:
flowchart TD
Start([api_call]) --> ConfigRetry[Configure retry:<br/>max 10 attempts<br/>exponential backoff]
ConfigRetry --> Perform[Perform request]
Perform --> CheckStatus{HTTP Status?}
CheckStatus -->|200 OK| ReturnSuccess([Return response])
CheckStatus -->|429, 500-504| Transient[Transient error:<br/>wait and retry]
Transient --> RetryCount{Retries<br/>exhausted?}
RetryCount -->|No| Perform
RetryCount -->|Yes| LogError[Log error]
LogError --> Abort1([Abort with error])
CheckStatus -->|400| Check400[Check if HTML<br/>response requested]
Check400 -->|Yes + HTML| Return400([Return for inspection])
Check400 -->|No| Abort2([Abort with error])
CheckStatus -->|Other 4xx/5xx| LogOther[Log unexpected status]
LogOther --> Abort3([Abort with error])
style Start fill:#e1f5e1
style ReturnSuccess fill:#e1f5e1
style Return400 fill:#fff3cd
style Abort1 fill:#f8d7da
style Abort2 fill:#f8d7da
style Abort3 fill:#f8d7da
style Transient fill:#cce5ff
Transient Error Handling
The following HTTP status codes are considered transient and trigger automatic retry:
| Status | Meaning | Action |
|---|---|---|
| 429 | Too Many Requests | Wait with exponential backoff |
| 500 | Internal Server Error | Retry up to 10 times |
| 502 | Bad Gateway | Retry up to 10 times |
| 503 | Service Unavailable | Retry up to 10 times |
| 504 | Gateway Timeout | Retry up to 10 times |
Output File Structure
After a successful download, the output directory contains:
data/json/
├── results_page_1.json
├── results_page_2.json
├── results_page_3.json
└── ...
For chunked queries (list of URLs):
data/json/
├── chunk_1/
│ ├── results_page_1.json
│ ├── results_page_2.json
│ └── ...
├── chunk_2/
│ ├── results_page_1.json
│ └── ...
└── chunk_3/
└── ...
Progress Tracking
During download, a 00_in.progress file exists in the output directory. This file is automatically deleted upon successful completion. If present, it indicates:
- Download is still in progress, OR
- Download was interrupted and is incomplete
The Complete Data Pipeline
Pipeline Overview
flowchart LR
subgraph Step1[Step 1: Query]
PQ[pro_query]
end
subgraph Step2[Step 2: Download]
PR[pro_request]
end
subgraph Step3[Step 3: Transform]
PRJL[pro_request_jsonl]
end
subgraph Step4[Step 4: Convert]
PRJLP[pro_request_jsonl_parquet]
end
subgraph Step5[Step 5: Analyze]
DB[(DuckDB)]
end
PQ -->|URL/URLs| PR
PR -->|JSON files| PRJL
PRJL -->|JSONL files| PRJLP
PRJLP -->|Parquet dataset| DB
style Step1 fill:#e1f5e1
style Step2 fill:#cce5ff
style Step3 fill:#fff3cd
style Step4 fill:#f8d7da
style Step5 fill:#e1f5e1
Complete Example
library(openalexPro)
# Step 1: Build query
urls <- pro_query(
entity = "works",
search = "machine learning healthcare",
from_publication_date = "2020-01-01",
type = "article",
select = c("ids", "title", "abstract", "publication_year", "authorships")
)
# Step 2: Download JSON from API
pro_request(
query_url = urls,
output = "data/json",
pages = 1000,
progress = TRUE
)
# Step 3: Convert to JSONL (with abstract reconstruction)
pro_request_jsonl(
input_json = "data/json",
output = "data/jsonl",
progress = TRUE,
workers = 4
)
# Step 4: Convert to Parquet (with schema harmonization)
pro_request_jsonl_parquet(
input_jsonl = "data/jsonl",
output = "data/parquet",
progress = TRUE
)
# Step 5: Query with DuckDB
library(duckdb)
con <- dbConnect(duckdb())
dbGetQuery(
con,
"
SELECT title, publication_year, cited_by_count
FROM read_parquet('data/parquet/**/*.parquet')
ORDER BY cited_by_count DESC
LIMIT 10
"
)pro_request_jsonl()
Purpose
Converts the raw JSON files from OpenAlex into JSON Lines (JSONL) format, which is more efficient for processing with DuckDB. During conversion, it also:
-
Reconstructs abstracts from the
abstract_inverted_indexfield - Generates citation strings (e.g., “Smith et al. (2023)”)
-
Adds a
pagefield for tracking data provenance - Optionally adds custom columns
Function Signature
pro_request_jsonl(
input_json = NULL,
output = NULL,
add_columns = list(),
overwrite = FALSE,
verbose = TRUE,
progress = TRUE,
delete_input = FALSE,
workers = 1
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input_json |
character | required | Directory of JSON files from pro_request()
|
output |
character | required | Output directory for JSONL files |
add_columns |
list | list() |
Named list of additional columns to add |
overwrite |
logical | FALSE | Overwrite existing output |
verbose |
logical | TRUE | Show detailed messages |
progress |
logical | TRUE | Show progress bar |
delete_input |
logical | FALSE | Delete input JSON after conversion |
workers |
integer | 1 | Number of parallel workers |
Internal Flow
flowchart TD
Start([pro_request_jsonl]) --> ValidateInputs[Validate inputs]
ValidateInputs --> CheckOverwrite{Output exists?}
CheckOverwrite -->|Yes, overwrite=FALSE| ErrorExists[Stop: Output exists]
CheckOverwrite -->|Yes, overwrite=TRUE| DeleteOutput[Delete output dir]
CheckOverwrite -->|No| CreateOutput[Create output dir]
DeleteOutput --> CreateOutput
CreateOutput --> CreateProgress[Create 00_in.progress]
CreateProgress --> ListJSONs[List all JSON files<br/>recursively]
ListJSONs --> DetectType[Detect type:<br/>results, group_by, single]
DetectType --> SetupFutures[Setup parallel plan<br/>if workers > 1]
SetupFutures --> ProcessLoop[future_lapply over<br/>JSON files]
ProcessLoop --> ForEachJSON[For each JSON file]
ForEachJSON --> ExtractPage[Extract page number<br/>from filename]
ExtractPage --> CallJQ[jq_execute:<br/>transform JSON]
CallJQ --> CheckSize{Output > 5 bytes?}
CheckSize -->|No| DeleteEmpty[Delete empty file]
CheckSize -->|Yes| UpdateProgress[Update progress]
DeleteEmpty --> UpdateProgress
UpdateProgress --> MoreFiles{More files?}
MoreFiles -->|Yes| ForEachJSON
MoreFiles -->|No| Cleanup[Delete input if<br/>delete_input=TRUE]
Cleanup --> Success([Return output path])
style Start fill:#e1f5e1
style Success fill:#e1f5e1
style ErrorExists fill:#f8d7da
style ProcessLoop fill:#cce5ff
jq_execute() Transformation
The jq_execute() function performs the actual JSON transformation using the jqr package:
flowchart TD
Start([jq_execute]) --> SelectRoot{Type?}
SelectRoot -->|results| ExtractResults[".results[] |"]
SelectRoot -->|group_by| ExtractGroups[".group_by[] |"]
SelectRoot -->|single| NoExtract[Direct processing]
ExtractResults --> Transform
ExtractGroups --> Transform
NoExtract --> Transform
Transform[Apply jq filter] --> AbstractCheck{Has abstract_<br/>inverted_index?}
AbstractCheck -->|Yes| ReconstructAbstract[Reconstruct abstract<br/>from inverted index]
AbstractCheck -->|No| SkipAbstract[Keep as-is]
ReconstructAbstract --> GenerateCitation
SkipAbstract --> GenerateCitation
GenerateCitation[Generate citation string<br/>based on author count]
GenerateCitation --> AddColumns{Custom columns?}
AddColumns -->|Yes| InsertColumns[Add custom columns]
AddColumns -->|No| SkipColumns[Skip]
InsertColumns --> DeleteInverted
SkipColumns --> DeleteInverted
DeleteInverted[Delete abstract_inverted_index]
DeleteInverted --> AddPage{Page specified?}
AddPage -->|Yes| InsertPage[Add page field]
AddPage -->|No| SkipPage[Skip]
InsertPage --> WriteJSONL[Write to JSONL file]
SkipPage --> WriteJSONL
WriteJSONL --> Done([Return output path])
style Start fill:#e1f5e1
style Done fill:#e1f5e1
style Transform fill:#cce5ff
style GenerateCitation fill:#fff3cd
Abstract Reconstruction
OpenAlex stores abstracts as inverted indices for efficiency. The transformation reconstructs readable text:
Input (inverted index):
{
"abstract_inverted_index": {
"Climate": [0],
"change": [1, 5],
"affects": [2],
"biodiversity": [3],
"and": [4]
}
}Output (reconstructed):
{
"abstract": "Climate change affects biodiversity and change"
}Citation String Generation
The function generates citation strings based on author count:
| Authors | Citation Format |
|---|---|
| 1 | “Smith (2023)” |
| 2 | “Smith & Jones (2023)” |
| 3+ | “Smith et al. (2023)” |
pro_request_jsonl_parquet()
Purpose
Converts JSONL files to Apache Parquet format for efficient analytical queries. Key features:
- Automatic schema harmonization across all files
- Partitioning by page for efficient filtering
- SNAPPY compression for good compression/speed balance
- Streaming conversion - no memory limit on total dataset size
Function Signature
pro_request_jsonl_parquet(
input_jsonl = NULL,
output = NULL,
overwrite = FALSE,
verbose = TRUE,
progress = TRUE,
delete_input = FALSE,
sample_size = 1000
)Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input_jsonl |
character | required | Directory of JSONL files |
output |
character | required | Output directory for Parquet dataset |
overwrite |
logical | FALSE | Overwrite existing output |
verbose |
logical | TRUE | Show detailed messages |
progress |
logical | TRUE | Show progress bar |
delete_input |
logical | FALSE | Delete input JSONL after conversion |
sample_size |
integer | 1000 | Records to sample for schema inference |
Internal Flow
flowchart TD
Start([pro_request_jsonl_parquet]) --> ValidateInputs[Validate inputs]
ValidateInputs --> SetupDuckDB[Create in-memory<br/>DuckDB connection]
SetupDuckDB --> LoadJSON[INSTALL json;<br/>LOAD json;]
LoadJSON --> CheckOverwrite{Output exists?}
CheckOverwrite -->|Yes, overwrite=FALSE| ErrorExists[Stop: Output exists]
CheckOverwrite -->|Yes, overwrite=TRUE| DeleteOutput[Delete output]
CheckOverwrite -->|No| CreateOutput[Create output dir]
DeleteOutput --> CreateOutput
CreateOutput --> CreateProgress[Create 00_in.progress]
CreateProgress --> ListJSONL[List all JSONL files]
ListJSONL --> InferSchema[Infer unified schema<br/>from all files]
InferSchema --> SchemaSQL["DESCRIBE SELECT *<br/>FROM read_json_auto<br/>(union_by_name=true)"]
SchemaSQL --> SchemaSuccess{Schema<br/>inferred?}
SchemaSuccess -->|Yes| BuildColumns[Build columns clause<br/>for explicit schema]
SchemaSuccess -->|No| FallbackAuto[Fall back to<br/>per-file auto-detect]
BuildColumns --> ConvertLoop
FallbackAuto --> ConvertLoop
ConvertLoop[For each JSONL file]
ConvertLoop --> CopySQL["COPY (...) TO output<br/>(FORMAT PARQUET,<br/>PARTITION_BY 'page',<br/>APPEND)"]
CopySQL --> UpdateProgress[Update progress]
UpdateProgress --> MoreFiles{More files?}
MoreFiles -->|Yes| ConvertLoop
MoreFiles -->|No| Cleanup
Cleanup[Delete input if requested]
Cleanup --> CloseDB[Close DuckDB connection]
CloseDB --> Success([Return output path])
style Start fill:#e1f5e1
style Success fill:#e1f5e1
style ErrorExists fill:#f8d7da
style InferSchema fill:#cce5ff
style ConvertLoop fill:#fff3cd
Schema Harmonization
Different OpenAlex records can have different field structures. For example:
- Some works have
primary_locationas a struct, others as a string - Some have nested arrays, others have null values
The function samples records from all files to infer a unified schema that accommodates all variations:
flowchart LR
subgraph Input[Input JSONL Files]
F1[File 1:<br/>location: struct]
F2[File 2:<br/>location: string]
F3[File 3:<br/>location: null]
end
subgraph Schema[Schema Inference]
Sample[Sample from<br/>all files]
Union[union_by_name=true]
Describe[DESCRIBE query]
end
subgraph Output[Unified Schema]
Unified[location: struct<br/>(accommodates all)]
end
F1 --> Sample
F2 --> Sample
F3 --> Sample
Sample --> Union
Union --> Describe
Describe --> Unified
style Input fill:#f8d7da
style Schema fill:#cce5ff
style Output fill:#e1f5e1
Output Structure
The Parquet dataset is partitioned by the page field:
data/parquet/
├── page=chunk_1_1/
│ └── data_0.parquet
├── page=chunk_1_2/
│ └── data_0.parquet
├── page=chunk_2_1/
│ └── data_0.parquet
└── ...
This partitioning enables efficient filtering when querying subsets of the data.
Error Handling
Common Errors
Output Directory Exists
pro_request(query_url = url, output = "data/json")
# Error: Directory data/json exists.
# Either specify `overwrite = TRUE` or delete it.
# Solution:
pro_request(query_url = url, output = "data/json", overwrite = TRUE)No Output Specified
pro_request(query_url = url)
# Error: No `output` specified!API Rate Limiting
If you hit rate limits (HTTP 429), the function automatically retries with exponential backoff. To reduce rate limit issues:
- Use an API key (free from OpenAlex)
- Reduce parallel workers
- Add delays between large queries
You can check your current budget and remaining allowance at any time with:
This queries the /rate-limit endpoint and prints your daily budget, amount used, remaining balance, and seconds until the daily reset.
Request Size Exceeded
OpenAlex has a maximum request size of 4094 characters. If exceeded:
meta <- pro_count(query_url = very_long_url)
meta$count
# [1] -4195 # Negative indicates error
meta$error
# [1] "ERROR: Request size exceeds the maximum limit of 4094."Solution: Use pro_query() with chunking (it handles this automatically).
Best Practices
1. Always Use Progress Bars
Progress bars help monitor long downloads and estimate completion:
pro_request(url, output = "data", progress = TRUE)2. Use Appropriate Page Limits
Start with smaller page limits during development:
# Development: small sample
pro_request(url, output = "test", pages = 10)
# Production: full download
pro_request(url, output = "data", pages = 10000)3. Enable Error Logging for Large Jobs
pro_request(
query_url = urls,
output = "data/json",
error_log = "download_errors.log"
)4. Clean Up Intermediate Files
Use delete_input = TRUE to save disk space:
# Convert and delete JSON
pro_request_jsonl(
input_json = "data/json",
output = "data/jsonl",
delete_input = TRUE
)
# Convert and delete JSONL
pro_request_jsonl_parquet(
input_jsonl = "data/jsonl",
output = "data/parquet",
delete_input = TRUE
)5. Use Parallel Processing Wisely
More workers isn’t always better:
# Good for many small chunks
pro_request(urls, output = "data", workers = 4)
# For JSONL conversion (CPU-bound)
pro_request_jsonl(input, output, workers = parallel::detectCores() - 1)See Also
-
pro_query()- Build OpenAlex API query URLs -
pro_count()- Get result counts without downloading -
pro_validate_credentials()- Validate API credentials (optional helper) -
pro_rate_limit_status()- Check current rate limit usage and remaining budget -
opt_filter_names()- List available filter names -
opt_select_fields()- List available select fields