Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Airflow DAGs timeout #573

Closed
2 tasks done
ac5tin opened this issue May 2, 2022 · 6 comments
Closed
2 tasks done

Airflow DAGs timeout #573

ac5tin opened this issue May 2, 2022 · 6 comments
Labels
kind/bug kind - things not working properly

Comments

@ac5tin
Copy link

ac5tin commented May 2, 2022

Checks

Chart Version

8.6.0

Kubernetes Version

Client Version: version.Info{Major:"1", Minor:"23", GitVersion:"v1.23.5+k3s1", GitCommit:"313aaca547f030752788dce696fdf8c9568bc035", GitTreeState:"clean", BuildDate:"2022-03-31T01:02:40Z", GoVersion:"go1.17.5", Compiler:"gc", Platform:"linux/amd64"}
Server Version: version.Info{Major:"1", Minor:"23", GitVersion:"v1.23.5+k3s1", GitCommit:"313aaca547f030752788dce696fdf8c9568bc035", GitTreeState:"clean", BuildDate:"2022-03-31T01:02:40Z", GoVersion:"go1.17.5", Compiler:"gc", Platform:"linux/amd64"}

Helm Version

version.BuildInfo{Version:"v3.8.2", GitCommit:"6e3701edea09e5d55a8ca2aae03a68917630e91b", GitTreeState:"clean", GoVersion:"go1.17.5"}

Description

DAGs would timeout and fail to run.

When i checked the logs in airflow-scheduler deployment i see that {scheduler_job.py:1111} INFO - Run manual__2022-05-02T15:22:34.690790+00:00 of test has timed-out

Relevant Logs

airflow-scheduler-56d94794c9-8wsbx airflow-scheduler [2022-05-02 15:22:43,460] {timeout.py:36} ERROR - Process timed out, PID: 8                                                                                                            │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler [2022-05-02 15:22:43,461] {celery_executor.py:299} ERROR - Error sending Celery task: Timeout, PID: 8                                                                                  │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler Celery Task ID: TaskInstanceKey(dag_id='aastocks_chan', task_id='get_aastocks_chan', run_id='manual__2022-05-02T15:22:34.690790+00:00', try_number=1)                                  │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler Traceback (most recent call last):                                                                                                                                                     │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/utils/functional.py", line 30, in __call__                                                                              │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return self.__value__                                                                                                                                                              │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler AttributeError: 'ChannelPromise' object has no attribute '__value__'                                                                                                                   │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler                                                                                                                                                                                        │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler During handling of the above exception, another exception occurred:                                                                                                                    │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler                                                                                                                                                                                        │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler Traceback (most recent call last):                                                                                                                                                     │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 925, in create_channel                                                                 │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return self._avail_channels.pop()                                                                                                                                                  │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler IndexError: pop from empty list                                                                                                                                                        │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler                                                                                                                                                                                        │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler During handling of the above exception, another exception occurred:                                                                                                                    │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler                                                                                                                                                                                        │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler Traceback (most recent call last):                                                                                                                                                     │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/executors/celery_executor.py", line 172, in send_task_to_executor                                                     │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     result = task_to_run.apply_async(args=[command], queue=queue)                                                                                                                      │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/task.py", line 575, in apply_async                                                                                 │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return app.send_task(                                                                                                                                                              │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/base.py", line 788, in send_task                                                                                   │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     amqp.send_task_message(P, name, message, **options)                                                                                                                                │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/celery/app/amqp.py", line 510, in send_task_message                                                                           │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     ret = producer.publish(                                                                                                                                                            │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/messaging.py", line 177, in publish                                                                                     │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return _publish(                                                                                                                                                                   │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 523, in _ensured                                                                                   │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return fun(*args, **kwargs) 
airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     channel = self.channel                                                                                                                                                             │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/messaging.py", line 209, in _get_channel                                                                                │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     channel = self._channel = channel()                                                                                                                                                │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/utils/functional.py", line 32, in __call__                                                                              │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     value = self.__value__ = self.__contract__()                                                                                                                                       │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/messaging.py", line 225, in <lambda>                                                                                    │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     channel = ChannelPromise(lambda: connection.default_channel)                                                                                                                       │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 895, in default_channel                                                                            │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     self._ensure_connection(**conn_opts)                                                                                                                                               │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 433, in _ensure_connection                                                                         │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return retry_over_time(                                                                                                                                                            │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/utils/functional.py", line 312, in retry_over_time                                                                      │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return fun(*args, **kwargs)                                                                                                                                                        │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 877, in _connection_factory                                                                        │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     self._connection = self._establish_connection()                                                                                                                                    │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/connection.py", line 812, in _establish_connection                                                                      │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     conn = self.transport.establish_connection()                                                                                                                                       │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 949, in establish_connection                                                           │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     self._avail_channels.append(self.create_channel(self))                                                                                                                             │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 927, in create_channel                                                                 │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     channel = self.Channel(connection)                                                                                                                                                 │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/kombu/transport/redis.py", line 737, in __init__                                                                              │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     self.client.ping()                                                                                                                                                                 │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 1378, in ping                                                                                          │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     return self.execute_command('PING')                                                                                                                                                │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/redis/client.py", line 898, in execute_command                                                                                │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     conn = self.connection or pool.get_connection(command_name, **options)                                                                                                             │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 1192, in get_connection                                                                            │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     connection.connect()                                                                                                                                                               │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 559, in connect                                                                                    │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     sock = self._connect()                                                                                                                                                             │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/redis/connection.py", line 603, in _connect                                                                                   │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     sock.connect(socket_address)                                                                                                                                                       │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler   File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout                                                                         │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler     raise AirflowTaskTimeout(self.error_message)                                                                                                                                       │
│ airflow-scheduler-56d94794c9-8wsbx airflow-scheduler airflow.exceptions.AirflowTaskTimeout: Timeout, PID: 8

Custom Helm Values

########################################
## CONFIG | Airflow Configs
########################################
airflow:
  # node arch
  defaultNodeSelector:
    kubernetes.io/arch: amd64
  # image
  image:
    repository: apache/airflow
    tag: 2.2.5-python3.8
  # web sec
  webserverSecretKey: "Z4tJvL6sVufqjG872RwWQDzcKA5kN9Up"
  users:
    - username: admin
      password: qnjTpsazbSE62YxVFNJu7Av8QC9mgWZR
      role: Admin
      email: admin@example.com
      firstName: admin
      lastName: admin

  extraEnv:
    - name: AIRFLOW__CORE__FERNET_KEY
      valueFrom:
        secretKeyRef:
          name: airflow-secrets
          key: FERNET_KEY
    - name: KAFKA_BROKER
      value: kafka0-kafka-bootstrap.datacrawl:9092
    - name: KAFKA_TOPIC
      value: scraper-req
  # install other packages
  extraPipPackages:
    - "confluent_kafka==1.8.2"


###################################
## COMPONENT | Airflow Workers
###################################
workers:
  ## if the airflow workers StatefulSet should be deployed
  ##
  enabled: true

  ## the number of worker Pods to run
  ## - if you set this >1 we recommend defining a `workers.podDisruptionBudget`
  ## - this is the minimum when `workers.autoscaling.enabled` is true
  ##
  replicas: 1

  autoscaling:
    enabled: true
    maxReplicas: 4
    metrics:
    - type: Resource
      resource:
        name: memory
        target:
          type: Utilization
          averageUtilization: 80

###################################
## CONFIG | Airflow DAGs
###################################
dags:
  ## the airflow dags folder
  ##
  path: /opt/airflow/dags

  ## configs for the dags PVC
  ##
  persistence:
    ## if a persistent volume is mounted at `dags.path`
    ##
    enabled: false

    ## the name of an existing PVC to use
    ##
    existingClaim: "airflow-data"

    ## sub-path under `dags.persistence.existingClaim` to use
    ##
    subPath: ""

    ## the name of the StorageClass used by the PVC
    ## - if set to "", then `PersistentVolumeClaim/spec.storageClassName` is omitted
    ## - if set to "-", then `PersistentVolumeClaim/spec.storageClassName` is set to ""
    ##
    storageClass: longhorn

    ## the access mode of the PVC
    ## - [WARNING] must be "ReadOnlyMany" or "ReadWriteMany" otherwise airflow pods will fail to start
    ##
    accessMode: ReadWriteMany

    ## the size of PVC to request
    ##
    size: 8Gi

  ## configs for the git-sync sidecar (https://github.com/kubernetes/git-sync)
  ##
  gitSync:
    ## if the git-sync sidecar container is enabled
    ##
    enabled: true

    ## the git-sync container image
    ##
    image:
      repository: k8s.gcr.io/git-sync/git-sync
      tag: v3.5.0
      pullPolicy: IfNotPresent
      uid: 65533
      gid: 65533

    ## resource requests/limits for the git-sync container
    ## - spec for ResourceRequirements:
    ##   https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.20/#resourcerequirements-v1-core
    ##
    resources: {}

    ## the url of the git repo
    ##
    ## ____ EXAMPLE _______________
    ##   # https git repo
    ##   repo: "https://github.com/USERNAME/REPOSITORY.git"
    ##
    ## ____ EXAMPLE _______________
    ##   # ssh git repo
    ##   repo: "git@github.com:USERNAME/REPOSITORY.git"
    ##
    repo: "https://github.com/ThinkCol/crawl-dags.git"

    ## the sub-path within your repo where dags are located
    ## - only dags under this path within your repo will be seen by airflow,
    ##   (note, the full repo will still be cloned)
    ##
    repoSubPath: "dags"

    ## the git branch to check out
    ##
    branch: staging

    ## the git revision (tag or hash) to check out
    ##
    revision: HEAD

    ## shallow clone with a history truncated to the specified number of commits
    ##
    depth: 1

    ## the number of seconds between syncs
    ##
    syncWait: 60

    ## the max number of seconds allowed for a complete sync
    ##
    syncTimeout: 120

    ## the name of a pre-created Secret with git http credentials
    ##
    httpSecret: "airflow-secrets"

    ## the key in `dags.gitSync.httpSecret` with your git username
    ##
    httpSecretUsernameKey: GITHUB_USER

    ## the key in `dags.gitSync.httpSecret` with your git password/token
    ##
    httpSecretPasswordKey: GITHUB_TOKEN

    ## the name of a pre-created Secret with git ssh credentials
    ##
    sshSecret: ""

    ## the key in `dags.gitSync.sshSecret` with your ssh-key file
    ##
    sshSecretKey: id_rsa

    ## the string value of a "known_hosts" file (for SSH only)
    ## - [WARNING] known_hosts verification will be disabled if left empty, making you more
    ##   vulnerable to repo spoofing attacks
    ##
    ## ____ EXAMPLE _______________
    ##   sshKnownHosts: |-
    ##     <HOST_NAME> ssh-rsa <HOST_KEY>
    ##
    sshKnownHosts: ""

    ## the number of consecutive failures allowed before aborting
    ##  - the first sync must succeed
    ##  - a value of -1 will retry forever after the initial sync
    ##
    maxFailures: 0

###################################
## CONFIG | Kubernetes RBAC
###################################
rbac:
  ## if Kubernetes RBAC resources are created
  ## - these allow the service account to create/delete Pods in the airflow namespace,
  ##   which is required for the KubernetesPodOperator() to function
  ##
  create: true

  ## if the created RBAC Role has GET/LIST on Event resources
  ## - this is needed for KubernetesPodOperator() to use `log_events_on_failure=True`
  ##
  events: true

###################################
## CONFIG | Kubernetes ServiceAccount
###################################
serviceAccount:
  ## if a Kubernetes ServiceAccount is created
  ## - if `false`, you must create the service account outside this chart with name: `serviceAccount.name`
  ##
  create: true

  ## the name of the ServiceAccount
  ## - by default the name is generated using the `airflow.serviceAccountName` template in `_helpers/common.tpl`
  ##
  name: ""

  ## annotations for the ServiceAccount
  ##
  ## ____ EXAMPLE _______________
  ##   # EKS - IAM Roles for Service Accounts
  ##   annotations:
  ##     eks.amazonaws.com/role-arn: "arn:aws:iam::XXXXXXXXXX:role/<<MY-ROLE-NAME>>"
  ##
  ## ____ EXAMPLE _______________
  ##   # GKE - WorkloadIdentity
  ##   annotations:
  ##     iam.gke.io/gcp-service-account: "<<GCP_SERVICE>>@<<GCP_PROJECT>>.iam.gserviceaccount.com"
  ##
  annotations: {}


###################################
## DATABASE | PgBouncer
###################################
pgbouncer:
  ## if the pgbouncer Deployment is created
  ##
  enabled: true # set to true when chart version 8.6.0 gets released
  authType: scram-sha-256

###################################
## DATABASE | Embedded Postgres
###################################
postgresql:
  ## if the `stable/postgresql` chart is used
  ## - [WARNING] the embedded Postgres is NOT SUITABLE for production deployments of Airflow
  ## - [WARNING] consider using an external database with `externalDatabase.*`
  ## - set to `false` if using `externalDatabase.*`
  ##
  enabled: false

###################################
## DATABASE | External Database
###################################
externalDatabase:
  type: postgres
  host: dc-pg.datacrawl
  port: 5432
  database: airflow
  user: user0
  passwordSecret: "airflow-secrets"
  passwordSecretKey: "POSTGRES_PASSWORD"
  properties: ""

###################################
## DATABASE | Embedded Redis
###################################
redis:
  enabled: true
  password: airflow
  existingSecret: ""
  existingSecretPasswordKey: "redis-password"
  cluster:
    enabled: false
    slaveCount: 1
  master:
    podAnnotations:
      cluster-autoscaler.kubernetes.io/safe-to-evict: "true"
    persistence:
      enabled: false

###################################
## CONFIG | ServiceMonitor (Prometheus Operator)
###################################
serviceMonitor:
  enabled: false
  selector:
    prometheus: kube-prometheus

  ## the ServiceMonitor web endpoint path
  ##
  path: /admin/metrics

  ## the ServiceMonitor web endpoint interval
  ##
  interval: "30s"

prometheusRule:
  enabled: false
@ac5tin ac5tin added the kind/bug kind - things not working properly label May 2, 2022
@thesuperzapper
Copy link
Member

@ac5tin this really looks like a celery issue, when I google AttributeError: 'ChannelPromise' object has no attribute '__value__'.

Is this still happening or has it resolved itself?

@ac5tin
Copy link
Author

ac5tin commented May 21, 2022

i ended up switching to 2.2.5-python3.9 and the error is gone

@thesuperzapper
Copy link
Member

@ac5tin thanks for letting us know!

@agconti
Copy link

agconti commented Oct 5, 2022

@ac5tin did you change anything else when you upgraded your airflow/python version? I'm running into this issue and the fix isn't working for me.

@thesuperzapper
Copy link
Member

@agconti its almost certainly a pip package issue, can you confirm:

  1. What version of the chart are you using?
  2. Are you using extraPipPackages values?
  3. Are you using the default airflow image, or a custom one?

@agconti
Copy link

agconti commented Oct 6, 2022

@thesuperzapper thanks for asking! My details are in #654.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/bug kind - things not working properly
Projects
None yet
Development

No branches or pull requests

3 participants