Spark or Databricks

How to use the PySpark bridge for distributed token generation on Spark clusters or Databricks.


When to Use PySpark

  • Large datasets: Millions of records across multiple files
  • Distributed processing: Leverage cluster computing
  • Overlap analysis: Find matching records across datasets at scale
  • Cost-effective: Process on cloud infrastructure (AWS, GCP, Azure)

Installation

Version Compatibility

Spark Version PySpark Version Java Version Installation Extra
4.0.x >=4.0.1, <5.0 21 [spark40] (Recommended)
3.5.x >=3.5.0, <3.6 8-17 [spark35]
3.4.x >=3.4.0, <3.5 8-17 [spark34]

Important: PySpark 3.5.x and earlier are NOT compatible with Java 21. If using Java 21, you must use PySpark 4.0.1+.

Local Development

source /workspaces/OpenLinkToken/.venv/bin/activate

cd lib/python/opentoken-pyspark
pip install -e .[spark40]  # For Java 21
# or
pip install -e .[spark35]  # For Java 8-17

Managed Clusters (Databricks, EMR, Azure Synapse)

For environments where PySpark is pre-installed:

pip install opentoken-pyspark

Basic Usage

Standard Spark Cluster

import sys
import os
from pyspark.sql import SparkSession
from opentoken_pyspark import OpenTokenProcessor

# Create Spark session
spark = SparkSession.builder \
    .appName("OpenToken") \
    .master("local[*]") \
    .config("spark.executorEnv.PYTHONPATH", os.pathsep.join(sys.path)) \
    .getOrCreate()

# Load your data
df = spark.read.csv("data.csv", header=True)

# Initialize processor with your secrets
processor = OpenTokenProcessor(
    hashing_secret="your-hashing-secret",
    encryption_key="your-encryption-key-32-chars!!"
)

# Generate tokens
tokens_df = processor.process_dataframe(df)

# View results
tokens_df.show()

# Write output
tokens_df.write.mode("overwrite").csv("output/tokens")

Databricks Example

from opentoken_pyspark import OpenTokenProcessor

# Load data from Delta table or CSV
df = spark.read.table("my_database.person_records")

# Initialize processor using Databricks secrets
processor = OpenTokenProcessor(
    hashing_secret=dbutils.secrets.get("opentoken", "hashing_secret"),
    encryption_key=dbutils.secrets.get("opentoken", "encryption_key")
)

# Generate tokens
tokens_df = processor.process_dataframe(df)

# Save results to Delta table
tokens_df.write.mode("overwrite").saveAsTable("my_database.person_tokens")

Input DataFrame Requirements

Your DataFrame must contain these columns (alternative names supported):

Standard Name Alternatives Required
FirstName GivenName Yes
LastName Surname Yes
BirthDate DateOfBirth Yes
Sex Gender Yes
PostalCode ZipCode Yes
SocialSecurityNumber NationalIdentificationNumber Yes
RecordId Id Optional

Output Format

Each input record produces multiple output rows (one per token rule):

Column Description
RecordId Original record identifier
RuleId Token rule (T1, T2, T3, T4, T5)
Token Generated cryptographic token

Dataset Overlap Analysis

Find matching records between two tokenized datasets:

from opentoken_pyspark import OpenTokenOverlapAnalyzer

# Initialize with encryption key (same key used for token generation)
analyzer = OpenTokenOverlapAnalyzer("encryption-key-32-characters!!")

# Analyze overlap - match on T1 and T2 (both must match)
results = analyzer.analyze_overlap(
    tokens_df1,
    tokens_df2,
    matching_rules=["T1", "T2"],
    dataset1_name="Hospital_A",
    dataset2_name="Hospital_B"
)

# Print summary
analyzer.print_summary(results)

# Access results
print(f"Total records in dataset 1: {results['total_records_dataset1']}")
print(f"Matching records: {results['matching_records_dataset1']}")
print(f"Overlap percentage: {results['overlap_percentage']:.2f}%")

# Get matched pairs
matches_df = results['matches']
matches_df.show()

Compare with Multiple Rule Sets

rule_sets = [
    ["T1"],              # Match on T1 only
    ["T1", "T2"],        # Match on T1 AND T2
    ["T1", "T2", "T3"]   # Match on T1 AND T2 AND T3
]

results = analyzer.compare_with_multiple_rules(
    tokens_df1, tokens_df2, rule_sets
)

for result in results:
    print(f"Rules {result['matching_rules']}: {result['overlap_percentage']:.2f}% overlap")

Custom Token Definitions

from opentoken_pyspark import OpenTokenProcessor
from opentoken_pyspark.notebook_helpers import TokenBuilder, CustomTokenDefinition

# Define custom token rule
custom_token = TokenBuilder("T6") \
    .add("last_name", "T|U") \
    .add("first_name", "T|U") \
    .add("birth_date", "T|D") \
    .add("postal_code", "T|S(0,3)") \
    .add("sex", "T|U") \
    .build()

custom_definition = CustomTokenDefinition().add_token(custom_token)

# Create processor with custom definition
processor = OpenTokenProcessor(
    hashing_secret="your-hashing-secret",
    encryption_key="your-encryption-key-32-chars!!",
    token_definition=custom_definition
)

tokens_df = processor.process_dataframe(df)

Performance Considerations

Partitioning

Adjust spark.sql.shuffle.partitions for your cluster size:

spark.conf.set("spark.sql.shuffle.partitions", "200")

Memory

Token generation is memory-efficient but ensure adequate executor memory:

spark = SparkSession.builder \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

Output Format

Use Parquet for large outputs:

tokens_df.write.mode("overwrite").parquet("output/tokens.parquet")

Databricks-Specific Configuration

Secrets Management

Store secrets in Databricks secrets:

# Create secret scope
databricks secrets create-scope --scope opentoken

# Add secrets
databricks secrets put --scope opentoken --key hashing_secret
databricks secrets put --scope opentoken --key encryption_key

Cluster Libraries

Install via cluster UI or init script:

pip install opentoken-pyspark

Unity Catalog (UC) is the right place to govern data access (tables, volumes, external locations). For secrets (hashing/encryption keys), you typically still use Databricks Secret Scopes (optionally backed by a cloud secret manager) and lock down access with ACLs and cluster policies.

Recommended pattern:

  • Store secrets in a secret scope (Databricks-backed, or backed by AWS Secrets Manager / Azure Key Vault / GCP Secret Manager depending on your workspace configuration).
  • Restrict who can read secrets (scope ACLs) and which clusters can access them (cluster policies).
  • Store token outputs in UC-managed Delta tables (or UC volumes) and grant access via UC privileges.

Example: read secrets + write UC table

from opentoken_pyspark import OpenTokenProcessor

hashing_secret = dbutils.secrets.get("opentoken", "hashing_secret")
encryption_key = dbutils.secrets.get("opentoken", "encryption_key")

processor = OpenTokenProcessor(
    hashing_secret=hashing_secret,
    encryption_key=encryption_key,
)

df = spark.read.table("main.pprl.person_records")
tokens_df = processor.process_dataframe(df)

# Write to a UC-managed Delta table
tokens_df.write.mode("overwrite").format("delta").saveAsTable("main.pprl.person_tokens")

Notes:

  • Prefer a dedicated UC schema (e.g., main.pprl) for tokenized outputs.
  • Consider writing to separate tables per dataset/run to support auditing and reproducibility.
  • For production: use cluster policies to prevent printing secrets, and avoid collecting token data to the driver.

Example Notebooks

See the notebooks in lib/python/opentoken-pyspark/notebooks/:


Security Notes

  • Secrets Management: Use secure secrets management (Databricks Secrets, AWS Secrets Manager, Azure Key Vault)
  • Network Security: Ensure secure communication between Spark nodes
  • Secrets Serialization: Secrets are serialized to worker nodes; ensure secure cluster configuration

Next Steps