Dans le cadre d’un projet de mise en place d’Azure Data Lake récent j’ai du réaliser un projet C# permettant la génération de ma fabrique de données (ADF V2). Or la documentation officielle de Microsoft est pour l’heure peu fourni en exemples, voici donc quelques scripts qui pourront peut-être vous faire gagner du temps !
Créer une Azure Data Factory & Ajouter un Integration Runtime
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using Microsoft.Rest; using Microsoft.Azure.Management.DataFactory; using Microsoft.Azure.Management.DataFactory.Models; using Microsoft.IdentityModel.Clients.ActiveDirectory; using Microsoft.Rest.Serialization; namespace AzureDataFactoryLoadingLake { class ADF { public DataFactoryManagementClient client; public string dataFactoryName; public string resourceGroup; public string region; public string integrationRuntimeName; public string tenantID; public string applicationId; public string subscriptionId; public string authenticationKey; public ADF(string tenantID,string resourceGroup, string applicationId,string authenticationKey,string subscriptionId,string dataFactoryName,string region,string integrationRuntimeName) { this.resourceGroup = resourceGroup; this.dataFactoryName = dataFactoryName; this.region = region; this.tenantID = tenantID; this.applicationId = applicationId; this.subscriptionId = subscriptionId; this.integrationRuntimeName = integrationRuntimeName; this.authenticationKey = authenticationKey; // Authenticate and create a data factory management client var context = new AuthenticationContext("https://login.windows.net/" + tenantID); ClientCredential cc = new ClientCredential(applicationId, authenticationKey); AuthenticationResult result = context.AcquireTokenAsync("https://management.azure.com/", cc).Result; ServiceClientCredentials cred = new TokenCredentials(result.AccessToken); this.client = new DataFactoryManagementClient(cred) { SubscriptionId = subscriptionId }; Console.WriteLine("Creating or updating data factory [" + dataFactoryName + "] ..."); this.CreateFactory(); Console.WriteLine("Creating or updating integration runtime (Gateway) [" + integrationRuntimeName + "] ..."); this.CreateIntegrationRuntimes(); } public void CreateFactory() { Factory dataFactory = new Factory { Location = region, Identity = new FactoryIdentity() }; client.Factories.CreateOrUpdate(resourceGroup, dataFactoryName, dataFactory); Console.WriteLine(SafeJsonConvert.SerializeObject(dataFactory, client.SerializationSettings)); while (client.Factories.Get(resourceGroup, dataFactoryName).ProvisioningState == "PendingCreation") { System.Threading.Thread.Sleep(1000); Console.WriteLine("*"); } } public void CreateIntegrationRuntimes() { IntegrationRuntimeResource integrationRuntimeResource =new IntegrationRuntimeResource( new SelfHostedIntegrationRuntime { Description = "Integration Runtime for project ..." } ); client.IntegrationRuntimes.CreateOrUpdate(resourceGroup, dataFactoryName, integrationRuntimeName, integrationRuntimeResource); Console.WriteLine(SafeJsonConvert.SerializeObject(integrationRuntimeResource, client.SerializationSettings)); Console.WriteLine("Authkey : "+ client.IntegrationRuntimes.ListAuthKeys(resourceGroup, dataFactoryName, integrationRuntimeName).AuthKey1); } } }
A noter que la création de l’Integration Runtime au sein de l’Azure Data Factory retourne et affiche la clé nécessaire à l’enregistrement de celle-ci sur la machine.
Service lié
Créer un service lié Azure Data Lake Storage
Console.WriteLine("Creating or updating linked service ADLS [" + ADLSName + "] ..."); SecureString secureString = new SecureString(servicePrincipalKey); LinkedServiceResource storageLinkedService = new LinkedServiceResource( new AzureDataLakeStoreLinkedService { AccountName = ADLSName , DataLakeStoreUri = DataLakeStoreUri , ServicePrincipalId = ServicePrincipalId , Tenant = tenantId , SubscriptionId = subscriptionId , ResourceGroupName = resourceGroupName ,ServicePrincipalKey = secureString } ); client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, ADLSName, storageLinkedService); Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
Créer un service lié Azure Data Lake Analytics
Console.WriteLine("Creating or updating linked service ADLA [" + ADLAName + "] ..."); SecureString secureString = new SecureString(servicePrincipalKey); LinkedServiceResource storageLinkedService = new LinkedServiceResource( new AzureDataLakeAnalyticsLinkedService { AccountName = ADLAName ,ServicePrincipalId = ServicePrincipalId ,Tenant = tenantId ,SubscriptionId = subscriptionId ,ResourceGroupName = resourceGroupName ,ServicePrincipalKey = secureString } ); client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, ADLAName, storageLinkedService); Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
Créer un service lié Azure Blob Storage
Console.WriteLine("Creating or updating linked service BLOB [BLOB] ..."); SecureString secureString = new SecureString(uSQLScriptBlobStorage); LinkedServiceResource storageLinkedService = new LinkedServiceResource( new AzureStorageLinkedService { ConnectionString= secureString } ); client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, uSQLScriptBlobName, storageLinkedService); Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
Créer un service lié SQL On Premise
Console.WriteLine("Creating or updating linked service SQL On Prem [" + SQLName + "] ..."); SecureString secureString = new SecureString(SQLConnectionString); IntegrationRuntimeReference integrationRuntimeReference = new IntegrationRuntimeReference(integrationRuntimeResource); LinkedServiceResource storageLinkedService = new LinkedServiceResource ( new SqlServerLinkedService { ConnectionString = secureString , ConnectVia = integrationRuntimeReference } ); client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, SQLName, storageLinkedService); Console.WriteLine(SafeJsonConvert.SerializeObject(storageLinkedService, client.SerializationSettings));
Créer un service lié MongoDB
Console.WriteLine("Creating or updating linked service MangoDB [" + mangoDbServiceName + "] ..."); IntegrationRuntimeReference integrationRuntimeReference = new IntegrationRuntimeReference(integrationRuntimeName); SecureString secureString = new SecureString(password); LinkedServiceResource mongoLinkedService = new LinkedServiceResource( new MongoDbLinkedService { Server = server ,DatabaseName = databaseName ,AuthenticationType = authenticationType ,Username = username ,Password = secureString ,ConnectVia = integrationRuntimeReference } ); client.LinkedServices.CreateOrUpdate(resourceGroupName, dataFactoryName, mangoDbServiceName, mongoLinkedService); Console.WriteLine(SafeJsonConvert.SerializeObject(mongoLinkedService, client.SerializationSettings));
DataSet
Créer un DataSet SQL
DatasetResource sqlDataset = new DatasetResource( new SqlServerTableDataset { LinkedServiceName = new LinkedServiceReference { ReferenceName = SQLName }, TableName = tableName } , name:dSName ); client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, dSName, sqlDataset); Console.WriteLine(SafeJsonConvert.SerializeObject(sqlDataset, client.SerializationSettings));
Créer un DataSet Azure Data Lake Storage
Console.WriteLine("Creating or updating DataSet ADLS [" + dsName + "] ..."); DatasetResource AdlsDataset = new DatasetResource( new AzureDataLakeStoreDataset { LinkedServiceName = new LinkedServiceReference { ReferenceName = ADLSName } , FileName = cleanFileName + ".gz" , FolderPath = ADLSStagingFolder , Format = new TextFormat { ColumnDelimiter = ";" ,NullValue = "" } ,Compression = new DatasetGZipCompression { Level = "Optimal" } } ,name: dsName ); client.Datasets.CreateOrUpdate(resourceGroup, dataFactoryName, dsName, AdlsDataset); Console.WriteLine(SafeJsonConvert.SerializeObject(AdlsDataset, client.SerializationSettings));
Activités
Créer une activité de copie
new CopyActivity { Name = "Copy-" + dataSetSQLRessource[i].Name , Inputs = new List<DatasetReference> { new DatasetReference() { ReferenceName = dataSetSQLRessource[i].Name } } , Outputs = new List<DatasetReference> { new DatasetReference() { ReferenceName = dataSetADLSRessource[i].Name } } , Source = new SqlSource { SqlReaderQuery = new Expression { Value = "select * from "+ dataSetSQLRessourceTable.TableName+ " WHERE DATEDIFF(day,DateModification,DATEADD(DAY,@{pipeline().parameters.nbDaysReprise},GETDATE())) < 0" } } , Sink = new AzureDataLakeStoreSink { WriteBatchSize = 0 , WriteBatchTimeout = "00:00:00" } }
Créer une activité Http
string body = "{\"username\": \"" + powerBIAccount + "\",\"password\": \"" +<span data-mce-type="bookmark" style="display: inline-block; width: 0px; overflow: hidden; line-height: 0;" class="mce_SELRES_start"></span> powerBIPassword + "\",\"authorityUrl\": \"" + authorityUrl + "\",\"resourceUrl\": \"" + resourceUrl + "\",\"apiUrl\": \"" + apiUrl + "\",\"clientId\": \"" + clientId + "\",\"groupId\": \"" + groupId + "\"}"; new WebActivity() { Name = "RefreshPowerBI" ,Url = powerBIRefreshUrl ,Method = "Post" ,Body = body ,DependsOn = dependencies }
Créer une activité U-SQL
new DataLakeAnalyticsUSQLActivity() { Name = "Usql-"+ job , DegreeOfParallelism = DegreeOfParallelism , ScriptLinkedService = new LinkedServiceReference { ReferenceName = uSQLScriptBlobName } , ScriptPath = uSQLScriptPath + '/' + job ,LinkedServiceName = new LinkedServiceReference { ReferenceName = ADLAName } ,DependsOn = dependencies ,Parameters = new Dictionary&amp;lt;string,object&amp;gt; { {"directoryCol", new Expression { Value = "@{pipeline().parameters.directoryCol}" } } ,{"directoryDw", new Expression { Value = "@{pipeline().parameters.directoryDw}" } } } }
Créer un Pipeline
PipelineResource pipeline = new PipelineResource { Parameters = new Dictionary<string, ParameterSpecification> { {"nbDaysReprise",new ParameterSpecification {Type = ParameterType.String}} ,{"directoryCol",new ParameterSpecification {Type = ParameterType.String}} ,{"directoryDw",new ParameterSpecification {Type = ParameterType.String}} }, Activities = AllActivities };
Créer un déclencheur
Console.WriteLine("Creating scheduled trigger, starting at " + startTime.ToString() + " running each "+frequency); TriggerResource scheduleTrigger = new TriggerResource( new ScheduleTrigger { Pipelines = pipelines , Recurrence = new ScheduleTriggerRecurrence { StartTime = startTime , Frequency = frequency , Interval = 1 } } , name: triggerName ); client.Triggers.CreateOrUpdate(ressourceGroup, factoryName, triggerName, scheduleTrigger); Console.WriteLine(SafeJsonConvert.SerializeObject(scheduleTrigger, client.SerializationSettings)); client.Triggers.BeginStart(ressourceGroup, factoryName, triggerName);
L’objectif de cet article est principalement de rappeler la syntaxe de création des différents objets Azure Data Factory, et d’éviter d’avoir à m’arracher les cheveux à nouveau devant le manque d’exemple de la MSDN durant la preview d’ADF V2.