Pipeline creation and Triggers

Data Ingestion Using HTTP Connector in Azure Data Factory (ADF)

Scenario: Ingest Order Dataset ( orders.csv, order_items.csv,products.csv) from an HTTP Source to Azure Data Lake Storage Gen2 (ADLS Gen2).


Steps:

1. Create a Resource Group

  • Go to the Azure Portal.
  • Create a Resource Group and pin it to the dashboard for easy access.

2. Set Up Blob Storage

  • Create a Storage Account with Blob Storage.
  • Upload the dataset files (products.csv, orders.csv, order_items.csv).
  • Enable anonymous access for the Blob under the Configuration settings.
  • Change the container's access level to anonymous read access for containers and blobs.

3. Create a Data Factory

  • Deploy a new Azure Data Factory (ADF) in the Resource Group.

4. Create Linked Services

  • Source (HTTP):

    • Add a Linked Service pointing to HTTP with the Base URL (e.g., https://example-source-url/).
    • Parameterize the base URL to allow dynamic configuration.
      Example:
      • Add a parameter baseUrl in the Linked Service.
      • Use dynamic content: @{linkedService().baseUrl}.
  • Sink (ADLS Gen2):

    • Add a Linked Service pointing to Azure Data Lake Storage Gen2.
    • Use parameters for dynamic file paths.

5. Create Datasets

  • Source Dataset:

    • Fetch data from HTTP using the relative URL (e.g., /data/products.csv).
    • Parameterize the relative URL to enable reusability.
      Example: @{dataset().relativeUrl}.
  • Sink Dataset:

    • Specify the ADLS Gen2 path where the data should be copied.
    • Parameterize the file path dynamically.

6. Create a Pipeline

  • Add a Copy Data Activity to transfer data from the HTTP source to ADLS Gen2 sink.
  • Use parameters in the Copy Data Activity to dynamically pass the base URL, relative URL, and target file path.

7. Debug and Run

  • Use the Debug option to test the pipeline with dynamic values.
  • Once successful, publish the changes.

Example with Parameterization:

HTTP Linked Service:

  • Parameter: baseUrl
  • Base URL:
    json

    "@{linkedService().baseUrl}"

Dataset (Source):

  • Parameter: relativeUrl
  • Relative URL:
    json

    "@{dataset().relativeUrl}"

Pipeline:

  • Pass values dynamically:
    • baseUrl: https://example-source-url/
    • relativeUrl: /data/products.csv
    • sinkPath: adls-container/processed/products.csv

Scalable Solution Using Lookup Activity

If ingesting multiple files from different HTTP URLs, use a Lookup Activity to automate URL retrieval.

Steps:

  1. Create a Lookup File (e.g., urls.json) with the following content:

    json

    [ {"baseUrl": "https://example-source-url", "relativeUrl": "/data/products.csv", "targetPath": "adls-container/processed/products.csv"}, {"baseUrl": "https://example-source-url", "relativeUrl": "/data/orders.csv", "targetPath": "adls-container/processed/orders.csv"} ]
  2. Add a Lookup Activity in the pipeline to read urls.json.

  3. Use a ForEach Activity to iterate over each entry in the lookup file.

  4. Inside the ForEach, add a Copy Data Activity:

    • Use @item().baseUrl for the Source Linked Service.
    • Use @item().relativeUrl for the Source Dataset.
    • Use @item().targetPath for the Sink Dataset.

Example Use Case: Amazon S3 to ADLS Gen2

Scenario: Ingest order_items.csv from Amazon S3 to ADLS Gen2.

  1. Create Resources:

    • Amazon S3 Bucket with order_items.csv.
    • Azure Storage Account with ADLS Gen2.
  2. Secure Access with Azure Key Vault:

    • Store Amazon S3 Access Key ID and Secret Key in Azure Key Vault.
    • Create a Linked Service for Azure Key Vault in ADF.
    • Use Key Vault secrets in the Amazon S3 Linked Service.
  3. Create Linked Services:

    • Source (Amazon S3): Use the Access Key ID and Secret Key from Key Vault.
    • Sink (ADLS Gen2): Specify the target container and path.
  4. Create Datasets:

    • Source: Amazon S3 path for order_items.csv.
    • Sink: ADLS Gen2 folder.
  5. Create a Pipeline:

    • Add a Copy Activity to move data from S3 to ADLS Gen2.

Output Validation:

  • Validate data ingestion by checking the ADLS Gen2 container.

Organizing Pipelines and Datasets

  1. Create folders for Datasets:
    • orders, order_items, and SQL.
  2. Organize Pipelines:
    • Separate folders for ingestion, processing, and SQL ingestion activities.
  3. Chain Pipelines:
    • Use Execute Pipeline Activity to ensure sequential execution.
  4. Add Triggers:
    • Storage Event Trigger for automatic execution when new data arrives in Blob storage.

Trigger Types in ADF

  1. Scheduled Trigger: Executes at a specified time.
  2. Tumbling Window Trigger:
    • Processes slices of data.
    • Supports past interval execution.
  3. Storage Event Trigger:
    • Activates when files are added/deleted in Blob storage.
  4. Custom Event Trigger: Fires based on custom-defined events.

Key Points

  • Parameterization minimizes the creation of redundant components.
  • Lookup and ForEach activities automate dynamic URL-based data ingestion.
  • Secure sensitive credentials with Azure Key Vault.
  • Triggers and their relationships:
    • Scheduled Trigger: Many-to-many pipeline relationships.
    • Tumbling Window Trigger: One-to-one pipeline relationships.
    • A single trigger can invoke multiple pipelines, and vice versa.

Akash

I am working as a Data Engineer

Post a Comment

Previous Post Next Post