Answer a question

I have a Docker-compose pipeline with containers for Airflow and for Spark. I want to schedule a SparkSubmitOperator job, but it fails with the error java.lang.IllegalArgumentException: Too large frame: 5211883372140375593. The Spark application consists only of creating a Spark session (I already commented out all other stuff). When I manually run the Spark app (by going to the bash of the Spark container and executing a spark-submit), everything works fine! Also, when I don't create a Spark session but just a SparkContext, it works!

Here is my docker-compose.yml:

version: '3' 


x-airflow-common:
  &airflow-common
  build: ./airflow/
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Europe/Berlin' 
  volumes:
    - ./airflow/dags:/opt/airflow/dags
    - ./airflow/logs:/opt/airflow/logs
    - ./airflow/plugins:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  networks:
    - app-tier
  depends_on:
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy



services:

  postgres:
    container_name: airflowPostgres
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always
    networks:
      - app-tier

  redis:
    container_name: airflowRedis
    image: redis:latest
    ports:
      - 6380:6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    networks:
      - app-tier

  airflow-webserver:
    <<: *airflow-common
    container_name: airflowWebserver
    command: webserver
    ports:
      - 8081:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    container_name: airflowScheduler
    command: scheduler
    restart: always

  airflow-worker:
    <<: *airflow-common
    container_name: airflowWorker
    command: celery worker
    restart: always

  airflow-init:
    <<: *airflow-common
    container_name: airflowInit
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}

  spark:
    image: docker.io/bitnami/spark:3
    user: root
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    ports:
      - '8080:8080'
    volumes: 
      - ./:/app
    networks:
      - app-tier

  spark-worker-1:
    image: docker.io/bitnami/spark:3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - app-tier

  spark-worker-2:
    image: docker.io/bitnami/spark:3
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_MEMORY=1G
      - SPARK_WORKER_CORES=1
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
    networks:
      - app-tier
    
volumes:
  postgres-db-volume:

networks:
  app-tier:
    driver: bridge
    name: app-tier

My Airflow DAG:

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python import PythonOperator
from functions import send_to_kafka, send_to_mongo


# * AIRFLOW ################################

# default arguments
default_args = {
    'owner': 'daniel',
    'start_date': datetime(2021, 5, 9),
    'email': [''],
    'email_on_failure': False,
    'email_on_retry': False,
    "retries": 3,
    "retry_delay": timedelta(minutes = 1)
}    


# * spark DAG

from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator

dag_spark = DAG('spark', 
                description = '', catchup = False, schedule_interval = "@once", default_args = default_args)

s1 = SparkSubmitOperator(
    task_id = "spark-job",
    application = "/opt/airflow/dags/application.py",
    conn_id = "spark_default", # defined under Admin/Connections in Airflow webserver
    packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,postgresql:postgresql:9.1-901-1.jdbc4",
    dag = dag_spark
)

My application (application.py) which does NOT work:

from pyspark.sql import SparkSession    

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .getOrCreate()

The application which DOES work:

from pyspark import SparkContext
sc = SparkContext("local", "First App")

The connection defined in the Admin menu of Airflow:

enter image description here

And here is the log created by the DAG: https://pastebin.com/FMW3kJ9g

Any ideas why this fails?

Answers

Problem was solved by adding a .master("local") to the SparkSession.

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐