Dans le cadre d’un projet de mise en place d’Azure Data Lake récent j’ai du réaliser un projet C# permettant la génération de ma fabrique de données (ADF V2). Or la documentation officielle de Microsoft est pour l’heure peu fourni en exemples, voici donc quelques scripts qui pourront peut-être vous faire gagner du temps !
Créer une Azure Data Factory & Ajouter un Integration Runtime
[csharp]
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Rest;
using Microsoft.Azure.Management.DataFactory;
using Microsoft.Azure.Management.DataFactory.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
using Microsoft.Rest.Serialization;
namespace AzureDataFactoryLoadingLake
{
class ADF
{
public DataFactoryManagementClient client;
public string dataFactoryName;
public string resourceGroup;
public string region;
public string integrationRuntimeName;
public string tenantID;
public string applicationId;
public string subscriptionId;
public string authenticationKey;
public ADF(string tenantID,string resourceGroup, string applicationId,string authenticationKey,string subscriptionId,string dataFactoryName,string region,string integrationRuntimeName)
{
this.resourceGroup = resourceGroup;
this.dataFactoryName = dataFactoryName;
this.region = region;
this.tenantID = tenantID;
this.applicationId = applicationId;
this.subscriptionId = subscriptionId;
this.integrationRuntimeName = integrationRuntimeName;
this.authenticationKey = authenticationKey;
// Authenticate and create a data factory management client
var context = new AuthenticationContext("https://login.windows.net/" + tenantID);
ClientCredential cc = new ClientCredential(applicationId, authenticationKey);
AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result;
ServiceClientCredentials cred = new TokenCredentials(result.AccessToken);
this.client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId };
Console.WriteLine("Creating or updating data factory [" + dataFactoryName + "] …");
this.CreateFactory();
Console.WriteLine("Creating or updating integration runtime (Gateway) [" + integrationRuntimeName + "] …");
this.CreateIntegrationRuntimes();
}
public void CreateFactory()
{
Factory dataFactory = new Factory
{
Location = region,
Identity = new FactoryIdentity()
};
client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory);
Console.WriteLine(SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings));
while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation")
{
System.Threading.Thread.Sleep(1000);
Console.WriteLine("*");
}
}
public void CreateIntegrationRuntimes()
{
IntegrationRuntimeResource integrationRuntimeResource =new IntegrationRuntimeResource(
new SelfHostedIntegrationRuntime
{
Description = "Integration Runtime for project …"
}
);
client.IntegrationRuntimes.CreateOrUpdate(resourceGroup, dataFactoryName, integrationRuntimeName, integrationRuntimeResource);
Console.WriteLine(SafeJsonConvert.SerializeObject(integrationRuntimeResource, client.SerializationSettings));
Console.WriteLine("Authkey : "+ client.IntegrationRuntimes.ListAuthKeys(resourceGroup, dataFactoryName, integrationRuntimeName).AuthKey1);
}
}
}
[/csharp]
A noter que la création de l’Integration Runtime au sein de l’Azure Data Factory retourne et affiche la clé nécessaire à l’enregistrement de celle-ci sur la machine.
Service lié
Créer un service lié Azure Data Lake Storage
[csharp]
Console.WriteLine("Creating or updating linked service ADLS [" + ADLSName + "] …");
SecureString secureString = new SecureString(servicePrincipalKey);
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureDataLakeStoreLinkedService
{
AccountName = ADLSName
, DataLakeStoreUri = DataLakeStoreUri
, ServicePrincipalId = ServicePrincipalId
, Tenant = tenantId
, SubscriptionId = subscriptionId
, ResourceGroupName = resourceGroupName
,ServicePrincipalKey = secureString
}
);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, ADLSName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
[/csharp]
Créer un service lié Azure Data Lake Analytics
[csharp]
Console.WriteLine("Creating or updating linked service ADLA [" + ADLAName + "] …");
SecureString secureString = new SecureString(servicePrincipalKey);
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureDataLakeAnalyticsLinkedService
{
AccountName = ADLAName
,ServicePrincipalId = ServicePrincipalId
,Tenant = tenantId
,SubscriptionId = subscriptionId
,ResourceGroupName = resourceGroupName
,ServicePrincipalKey = secureString
}
);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, ADLAName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
[/csharp]
Créer un service lié Azure Blob Storage
[csharp]
Console.WriteLine("Creating or updating linked service BLOB [BLOB] …");
SecureString secureString = new SecureString(uSQLScriptBlobStorage);
LinkedServiceResource storageLinkedService = new LinkedServiceResource(
new AzureStorageLinkedService
{
ConnectionString= secureString
}
);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, uSQLScriptBlobName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
[/csharp]
Créer un service lié SQL On Premise
[csharp]
Console.WriteLine("Creating or updating linked service SQL On Prem [" + SQLName + "] …");
SecureString secureString = new SecureString(SQLConnectionString);
IntegrationRuntimeReference integrationRuntimeReference = new IntegrationRuntimeReference(integrationRuntimeResource);
LinkedServiceResource storageLinkedService = new LinkedServiceResource (
new SqlServerLinkedService {
ConnectionString = secureString
, ConnectVia = integrationRuntimeReference
}
);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, SQLName, storageLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
[/csharp]
Créer un service lié MongoDB
[csharp]
Console.WriteLine("Creating or updating linked service MangoDB [" + mangoDbServiceName + "] …");
IntegrationRuntimeReference integrationRuntimeReference = new IntegrationRuntimeReference(integrationRuntimeName);
SecureString secureString = new SecureString(password);
LinkedServiceResource mongoLinkedService = new LinkedServiceResource(
new MongoDbLinkedService
{
Server = server
,DatabaseName = databaseName
,AuthenticationType = authenticationType
,Username = username
,Password = secureString
,ConnectVia = integrationRuntimeReference
}
);
client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, mangoDbServiceName, mongoLinkedService);
Console.WriteLine(SafeJsonConvert.SerializeObject(mongoLinkedService, client.SerializationSettings));
[/csharp]
DataSet
Créer un DataSet SQL
[csharp]
DatasetResource sqlDataset = new DatasetResource(
new SqlServerTableDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = SQLName
}, TableName = tableName
}
, name:dSName
);
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, dSName, sqlDataset);
Console.WriteLine(SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings));
[/csharp]
Créer un DataSet Azure Data Lake Storage
[csharp]
Console.WriteLine("Creating or updating DataSet ADLS [" + dsName + "] …");
DatasetResource AdlsDataset = new DatasetResource(
new AzureDataLakeStoreDataset
{
LinkedServiceName = new LinkedServiceReference
{
ReferenceName = ADLSName
}
, FileName = cleanFileName + ".gz"
, FolderPath = ADLSStagingFolder
,
Format = new TextFormat
{
ColumnDelimiter = ";"
,NullValue = ""
}
,Compression = new DatasetGZipCompression
{
Level = "Optimal"
}
}
,name: dsName
);
client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, dsName, AdlsDataset);
Console.WriteLine(SafeJsonConvert.SerializeObject(AdlsDataset, client.SerializationSettings));
[/csharp]
Activités
Créer une activité de copie
[csharp]
new CopyActivity
{
Name = "Copy-" + dataSetSQLRessource[i].Name
,
Inputs = new List<DatasetReference>
{
new DatasetReference()
{
ReferenceName = dataSetSQLRessource[i].Name
}
}
,
Outputs = new List<DatasetReference>
{
new DatasetReference()
{
ReferenceName = dataSetADLSRessource[i].Name
}
}
,
Source = new SqlSource
{
SqlReaderQuery = new Expression { Value = "select * from "+ dataSetSQLRessourceTable.TableName+ " WHERE DATEDIFF(day,DateModification,DATEADD(DAY,@{pipeline().parameters.nbDaysReprise},GETDATE())) < 0" }
}
,
Sink = new AzureDataLakeStoreSink
{
WriteBatchSize = 0
,
WriteBatchTimeout = "00:00:00"
}
}
[/csharp]
Créer une activité Http
[csharp]
string body = "{\"username\": \"" + powerBIAccount + "\",\"password\": \"" +<span data-mce-type="bookmark" style="display: inline-block; width: 0px; overflow: hidden; line-height: 0;" class="mce_SELRES_start"></span> powerBIPassword + "\",\"authorityUrl\": \"" + authorityUrl + "\",\"resourceUrl\": \"" + resourceUrl + "\",\"apiUrl\": \"" + apiUrl + "\",\"clientId\": \"" + clientId + "\",\"groupId\": \"" + groupId + "\"}";
new WebActivity()
{
Name = "RefreshPowerBI"
,Url = powerBIRefreshUrl
,Method = "Post"
,Body = body
,DependsOn = dependencies
}
[/csharp]
Créer une activité U-SQL
[csharp]
new DataLakeAnalyticsUSQLActivity()
{
Name = "Usql-"+ job
, DegreeOfParallelism = DegreeOfParallelism
, ScriptLinkedService = new LinkedServiceReference
{
ReferenceName = uSQLScriptBlobName
}
, ScriptPath = uSQLScriptPath + ‘/’ + job
,LinkedServiceName = new LinkedServiceReference
{
ReferenceName = ADLAName
}
,DependsOn = dependencies
,Parameters = new Dictionary&amp;lt;string,object&amp;gt;
{
{"directoryCol", new Expression { Value = "@{pipeline().parameters.directoryCol}" } }
,{"directoryDw", new Expression { Value = "@{pipeline().parameters.directoryDw}" } }
}
}
[/csharp]
Créer un Pipeline
[csharp]
PipelineResource pipeline = new PipelineResource
{
Parameters = new Dictionary<string, ParameterSpecification>
{
{"nbDaysReprise",new ParameterSpecification {Type = ParameterType.String}}
,{"directoryCol",new ParameterSpecification {Type = ParameterType.String}}
,{"directoryDw",new ParameterSpecification {Type = ParameterType.String}}
},
Activities = AllActivities
};
[/csharp]
Créer un déclencheur
[csharp]
Console.WriteLine("Creating scheduled trigger, starting at " + startTime.ToString() + " running each "+frequency);
TriggerResource scheduleTrigger = new TriggerResource(
new ScheduleTrigger
{
Pipelines = pipelines
, Recurrence = new ScheduleTriggerRecurrence
{
StartTime = startTime
, Frequency = frequency
, Interval = 1
}
}
, name: triggerName
);
client.Triggers.CreateOrUpdate(ressourceGroup, factoryName, triggerName, scheduleTrigger);
Console.WriteLine(SafeJsonConvert.SerializeObject(scheduleTrigger, client.SerializationSettings));
client.Triggers.BeginStart(ressourceGroup, factoryName, triggerName);
[/csharp]
L’objectif de cet article est principalement de rappeler la syntaxe de création des différents objets Azure Data Factory, et d’éviter d’avoir à m’arracher les cheveux à nouveau devant le manque d’exemple de la MSDN durant la preview d’ADF V2.