Integration with orchestration tools
Integration with Azure Data Factory
To perform integration with Azure Data Factory, please do the following steps:
Please ensure that pipeline is created and published in Azure Data Factory.
Deploy from cli command
Deploy the latest job version and write deployment result into a file
dbx deploy --write-specs-to-file=.dbx/deployment-result.json --files-only
Reflect job definitions to Azure Data Factory activities:
dbx datafactory reflect \
--specs-file=.dbx/deployment-result.json \
--subscription-name some-subscription \
--resource-group some-group \
--factory-name some-factory \
--name some-pipeline-name
Deploy from CI/CD pipeline
Inside your CI/CD pipeline, add Azure login before the dbx datafactory reflect
step. Below is an example using GitHub Actions:
- name: Deploy and write deployment result into a file
run: |
dbx deploy --deployment-file ./conf/deployment_single_task.yml --write-specs-to-file=./.dbx/deployment-result.json --files-only
- name: Azure Login
uses: azure/login@v1
with:
creds: ${{ secrets.AZURE_CREDENTIALS }}
- name: Reflect job definitions to ADF
run: |
dbx datafactory reflect \
--specs-file=.dbx/deployment-result.json \
--subscription-name some-subscription \
--resource-group some-group \
--factory-name some-factory \
--name some-pipeline-name
See more details on Azure login here
This command will create or update linked services and pipeline activities. Each job will be configured as a separate activity.
Warning
- Please note following limitations of this approach:
runs triggered from Azure Data Factory won’t be mentioned in the job runs
changing job definition manually in Databricks UI won’t change the properties of ADF-defined activities
only Python-based activities are supported at this moment
MSI authentication is not yet supported
policy_id
argument is not yet supported (it will be ignored during deployment to ADF)- Manual changes to the parameters of Databricks tasks will be nullified during next reflect.All parameters for Databricks tasks shall be provided in the deployment file
Multi-task jobs are not supported by ADF
Integration with Apache Airflow
To trigger job execution from Apache Airflow, please do the following:
Deploy jobs to Databricks:
dbx deploy
Add this function to get job id by the job name into your Airflow setup:
from airflow.contrib.hooks.databricks_hook import DatabricksHook
def get_job_id_by_name(job_name: str, databricks_conn_id: str) -> str:
list_endpoint = ('GET', 'api/2.0/jobs/list')
hook = DatabricksHook(databricks_conn_id=databricks_conn_id)
response_payload = hook._do_api_call(list_endpoint, {})
all_jobs = response_payload.get("jobs", [])
matching_jobs = [j for j in all_jobs if j["settings"]["name"] == job_name]
if not matching_jobs:
raise Exception(f"Job with name {job_name} not found")
if len(matching_jobs) > 1:
raise Exception(f"Job with name {job_name} is duplicated. Please make job name unique in Databricks UI.")
job_id = matching_jobs[0]["job_id"]
return job_id
Use this function from your DAG:
from airflow.providers.databricks.operators.databricks import DatabricksRunNowOperator
job_id = get_job_id_by_name("some-job-name", "some-databricks-conn-id")
operator = DatabricksRunNowOperator(
job_id=job_id,
# add your arguments
)