Airflow triggering Spark application results in error "Too large frame"
Answer a question I have a Docker-compose pipeline with containers for Airflow and for Spark. I want to schedule a SparkSubmitOperator job, but it fails with the error java.lang.IllegalArgumentExcepti
Answer a question
I have a Docker-compose pipeline with containers for Airflow and for Spark. I want to schedule a SparkSubmitOperator
job, but it fails with the error java.lang.IllegalArgumentException: Too large frame: 5211883372140375593
. The Spark application consists only of creating a Spark session (I already commented out all other stuff). When I manually run the Spark app (by going to the bash of the Spark container and executing a spark-submit), everything works fine! Also, when I don't create a Spark session but just a SparkContext, it works!
Here is my docker-compose.yml:
version: '3'
x-airflow-common:
&airflow-common
build: ./airflow/
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.2}
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'false'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
AIRFLOW__CORE__DEFAULT_TIMEZONE: 'Europe/Berlin'
volumes:
- ./airflow/dags:/opt/airflow/dags
- ./airflow/logs:/opt/airflow/logs
- ./airflow/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
networks:
- app-tier
depends_on:
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
container_name: airflowPostgres
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
networks:
- app-tier
redis:
container_name: airflowRedis
image: redis:latest
ports:
- 6380:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
networks:
- app-tier
airflow-webserver:
<<: *airflow-common
container_name: airflowWebserver
command: webserver
ports:
- 8081:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
airflow-scheduler:
<<: *airflow-common
container_name: airflowScheduler
command: scheduler
restart: always
airflow-worker:
<<: *airflow-common
container_name: airflowWorker
command: celery worker
restart: always
airflow-init:
<<: *airflow-common
container_name: airflowInit
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
spark:
image: docker.io/bitnami/spark:3
user: root
environment:
- SPARK_MODE=master
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
ports:
- '8080:8080'
volumes:
- ./:/app
networks:
- app-tier
spark-worker-1:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- app-tier
spark-worker-2:
image: docker.io/bitnami/spark:3
environment:
- SPARK_MODE=worker
- SPARK_MASTER_URL=spark://spark:7077
- SPARK_WORKER_MEMORY=1G
- SPARK_WORKER_CORES=1
- SPARK_RPC_AUTHENTICATION_ENABLED=no
- SPARK_RPC_ENCRYPTION_ENABLED=no
- SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
- SPARK_SSL_ENABLED=no
networks:
- app-tier
volumes:
postgres-db-volume:
networks:
app-tier:
driver: bridge
name: app-tier
My Airflow DAG:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from functions import send_to_kafka, send_to_mongo
# * AIRFLOW ################################
# default arguments
default_args = {
'owner': 'daniel',
'start_date': datetime(2021, 5, 9),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
"retries": 3,
"retry_delay": timedelta(minutes = 1)
}
# * spark DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
dag_spark = DAG('spark',
description = '', catchup = False, schedule_interval = "@once", default_args = default_args)
s1 = SparkSubmitOperator(
task_id = "spark-job",
application = "/opt/airflow/dags/application.py",
conn_id = "spark_default", # defined under Admin/Connections in Airflow webserver
packages = "org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2,postgresql:postgresql:9.1-901-1.jdbc4",
dag = dag_spark
)
My application (application.py) which does NOT work:
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("myApp") \
.getOrCreate()
The application which DOES work:
from pyspark import SparkContext
sc = SparkContext("local", "First App")
The connection defined in the Admin menu of Airflow:
And here is the log created by the DAG: https://pastebin.com/FMW3kJ9g
Any ideas why this fails?
Answers
Problem was solved by adding a .master("local")
to the SparkSession.
更多推荐
所有评论(0)