Skip to content

Commit

Permalink
[AIRFLOW-2763] Add check to validate worker connectivity to metadata …
Browse files Browse the repository at this point in the history
…Database
  • Loading branch information
aimohan7 authored and r39132 committed Aug 10, 2018
1 parent 1e991b1 commit c7551f6
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 1 deletion.
5 changes: 5 additions & 0 deletions airflow/bin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,11 @@ def worker(args):
env = os.environ.copy()
env['AIRFLOW_HOME'] = settings.AIRFLOW_HOME

if not settings.validate_session():
log = LoggingMixin().log
log.error("Worker exiting... database connection precheck failed! ")
sys.exit(1)

# Celery worker
from airflow.executors.celery_executor import app as celery_app
from celery.bin import worker
Expand Down
3 changes: 3 additions & 0 deletions airflow/config_templates/default_airflow.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ killed_task_cleanup_time = 60
# `airflow trigger_dag -c`, the key-value pairs will override the existing ones in params.
dag_run_conf_overrides_params = False

# Worker initialisation check to validate Metadata Database connection
worker_precheck = False

[cli]
# In what way should the cli access the API. The LocalClient will use the
# database directly, while the json_client will use the api running on the
Expand Down
1 change: 1 addition & 0 deletions airflow/config_templates/default_test.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enable_xcom_pickling = False
killed_task_cleanup_time = 5
secure_mode = False
hostname_callable = socket:getfqdn
worker_precheck = False

[cli]
api_client = airflow.api.client.local_client
Expand Down
22 changes: 21 additions & 1 deletion airflow/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import pendulum
import socket

from sqlalchemy import create_engine
from sqlalchemy import create_engine, exc
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import NullPool

Expand Down Expand Up @@ -216,6 +216,26 @@ def configure_adapters():
pass


def validate_session():
try:
worker_precheck = conf.getboolean('core', 'worker_precheck')
except conf.AirflowConfigException:
worker_precheck = False
if not worker_precheck:
return True
else:
check_session = sessionmaker(bind=engine)
session = check_session()
try:
session.execute("select 1")
conn_status = True
except exc.DBAPIError as err:
log.error(err)
conn_status = False
session.close()
return conn_status


def configure_action_logging():
"""
Any additional configuration (register callback) for airflow.utils.action_loggers
Expand Down
73 changes: 73 additions & 0 deletions tests/cli/test_worker_initialisation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import unittest
import sqlalchemy
import airflow
from argparse import Namespace

try:
from unittest import mock
except ImportError:
try:
import mock
except ImportError:
mock = None
from mock import patch

patch('airflow.utils.cli.action_logging', lambda x: x).start()
from airflow.bin import cli # noqa
mock_args = Namespace(queues=1, concurrency=1)


class TestWorkerPrecheck(unittest.TestCase):

def setUp(self):
airflow.configuration.load_test_config()

@mock.patch('airflow.settings.validate_session')
def test_error(self, mock_validate_session):
"""
Test to verify the exit mechanism of airflow-worker cli
by mocking validate_session method
"""
mock_validate_session.return_value = False
with self.assertRaises(SystemExit) as cm:
# airflow.bin.cli.worker(mock_args)
cli.worker(mock_args)
self.assertEqual(cm.exception.code, 1)

@mock.patch('airflow.configuration.getboolean')
def test_worker_precheck_exception(self, mock_getboolean):
"""
Test to check the behaviour of validate_session method
when worker_precheck is absent in airflow configuration
"""
mock_getboolean.side_effect = airflow.configuration.AirflowConfigException
self.assertEqual(airflow.settings.validate_session(), True)

@mock.patch('sqlalchemy.orm.session.Session.execute')
@mock.patch('airflow.configuration.getboolean')
def test_validate_session_dbapi_exception(self, mock_getboolean, mock_session):
"""
Test to validate connection failure scenario on SELECT 1 query
"""
mock_getboolean.return_value = True
mock_session.side_effect = sqlalchemy.exc.OperationalError("m1", "m2", "m3", "m4")
self.assertEquals(airflow.settings.validate_session(), False)

0 comments on commit c7551f6

Please sign in to comment.