Azure Data Factory: Preventing Concurrent Pipeline Runs

Introduction

This is a bit of a departure from my usual Logic Apps and Power Automate blog posts, but as a part-time amateurish Data Factory user I could have done with a guide like this before I got started, so I thought I’d post this for anyone else in the same situation.

This blog isn’t intended as an introduction to ADF (although do I feel like I should write one from the perspective of a Logic Apps/Flow developer) so apologies if we gloss over some concepts that aren’t familiar to the reader. The comments at the bottom of this article can be used to ask questions.

Scenario

You have a Data Factory pipeline that triggers on a schedule. Within the pipeline you invoke a data flow that pulls a bunch of data from CSV files in Azure blob storage, does some transformation, sinks it into Azure SQL, then deletes the source blobs.

You want the pipeline to run fairly frequently because new data is being dropped into the blob storage all the time and it’s important for the users that the latest data is available relatively soon after.

Usually the amount of data that appears in the blob between runs is small enough that the ETL process finishes before the next scheduled trigger. However, sometimes it’s not.

As a safeguard against failures or duplicated data, you want to ensure that the pipeline never invokes the dataflow if another pipeline is still running.

Method

To achieve the desired outcome, we need to do a few things:

  • Run a PowerShell script to get past pipeline runs
  • Invoke the script and return the output of the script to the pipeline
  • Perform condition logic on that output

In ADF, one of the activities you can add to the pipeline is called Web:

This activity will perform an HTTP request (GET, POST, PUT, PATCH and DELETE methods are supported). The response is included in the output of the activity.

We can use this Web activity to invoke a PowerShell script and return the result. My first thought was to use an Azure Automation Run Book for this, wrap a Logic App around it and invoke the Logic App with an HTTP request, but my esteemed colleague, Microsoft MVP and PowerShell guru @WindosNZ guided me towards using a PowerShell Azure Function.

The Azure Function

I used this guide to get me going with a PowerShell Azure function without using a local IDE.

First I’ll deal with the permissions then get onto the code.

Once you create the Function App you need to create a managed identity for the function app and give it the appropriate permissions to read the Data Factory pipeline runs.

Within the new Function App, go to the Identity tab and turn System assigned Status to On:

Now click the Azure role assignments button then + Add role assignment.

I feel like this is too much permission, but I couldn’t find Data Factory Reader. Someone with more AAD security nouse than me could advise, but anyway, this is what I did:

Now we have that out the way we can do the code.

Create a new HTTP trigger function within your Function App using Function as the Authorization level.

Use this code. Substitute your own resource group and data factory names in $rg and $df variables.

You could also customise the date/time stamps. In this code we’re getting all the pipeline runs in the last 24 hours. Your mileage may vary.

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Put the value of the runid query into a variable

$runid = $Request.Query.runid

# get any pipeline runs that are in progress and not the runid provided in the URL query 

$rg = 'myResourceGroup'
$df = 'My-data-factory'
$now = (Get-Date).ToUniversalTime()
$dayago = $now.AddDays(-1)
$pipelineruns = Get-AzDataFactoryV2PipelineRun  -ResourceGroupName $rg -DataFactoryName $df -LastUpdatedAfter $dayago -LastUpdatedBefore $now
$filteredrun = $pipelineruns.where({$_.Status -eq 'InProgress' -and $_.RunId -ne $runid})
$body = ConvertTo-Json $filteredrun


# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = $body
  
})

The HTTP method is GET. This code expects a URL query called runid to contain a string. The reason for this is we want to filter out a specific RunId using the where() method in PowerShell.

When we trigger this function, we want to pass in the Run ID of the current run (as a system variable from the Data Factory), and use it to filter that out of the results returned, leaving only pipeline runs with a status of ‘InProgress’ that are not the actual run we’re invoking this function from.

In other words, are there any other runs that are in progress we should know about.

Use the Test/Run pane to verify your script. It might pay to have a pipeline running when you do this, or temporarily modify the parameters of the where() method in $filteredrun so you’re not filtering everything out. Paste the GUID of an existing past run into the Query parameter called runid for testing purposes.

It’ll take a while to run first go, and I got some cryptic error message about whitespace that I ignored and found it worked on the second try.

All going well you should get a JSON array back, which may be empty e.g. [] or it may contain some objects. At first, make sure you get a HTTP 200 code and an array, empty or otherwise, back from the function. To test it properly you need to simulate the real condition it’ll work in. Start a pipeline, then when you’re waiting for the compute cluster to warm up, start another. Grab the Run Id from the second one and use it in the query. Make sure your response contains one object. Do the same but with just the one pipeline going, put its run id in and make sure you get an empty array.

When you’re happy with the script, click Get function URL at the top, select the default Function Key and copy the URL to the clipboard:

Now you can test your function outside of the Azure portal. I use Postman for this. You could use Postman too, or you could use Invoke-WebRequest in PowerShell, but as it’s a GET, you could also test it in your web browser. Don’t forget to add the runid query parameter to the URL.

In this case the function returns an empty array, because there are no pipelines running that do not match the Run ID provided in the URL query:

You can see the function responded in less than one second, which isn’t bad really.

Now we have that nailed, let’s go back to the Data Factory

The Pipeline

So, we want to invoke this function in the pipeline and use its output in a condition.

Add the Web activity to your pipeline. Give it a sensible name. In the Settings tab, set the method to GET and use dynamic content for the URL.

Here you should concatenate the static part of the URL with the system variable for the Pipeline Run ID

If you’re familiar with the expression language in Logic Apps and Power Automate, you will already recognise how this works, because it’s exactly the same.

In my pipeline I have also added a Get Metadata step that gets the child items in the storage account. I want to use this to decide whether to run the data flow. Obviously if the storage account is empty and there are no CSV files to process, then we don’t invoke the data flow.

Armed with the outputs of these two activities, we run into an If condition:

Again, you may recognise the Logic Apps expression language. We need to convert the Response property of the Get-PipelineRuns activity to JSON because it comes as a string. Converted to JSON we now have an array we can count the items in using the length() function, and compare that with 0 (we want exactly 0).

Also we compare the length of the childItems property of the Get Metadata activity to 0 (we want more than 0). If both of those return true, using and() we progress to the True branch of the condition, which contains, among other things, the data flow activity:

In the False branch, we do nothing.

Conclusion

That’s pretty much it. Now whenever your scheduled trigger fires off a pipeline run, it will invoke the Azure Function to check for any pipeline runs that are in progress (other than this one) and if there are are any, the pipeline ends.

There are a few ways you could do this. I’ve kind of made a REST endpoint out of the function by converting the PowerShell script output to JSON, but really you could just return a string like 1 or 0 from the function and use that in your condition, it’s totally up to you. I’m not sure if there’s a best-practice way, but that’s what I’ve done.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s