C’est un sujet qui revient sans cesse et que je recode chez chaque client, voici donc un exemple de logique afin de partitionner et actualiser vos bases SSAS, je reprends donc les traces d’Arnaud qui avait déjà un excellent article sur le sujet.
Stocker les actions à effectuer en base
Pour ce faire j’ai choisi une approche simple, deux tables, une pour le partitionnement et une pour le processing.
Partitionnement :
CREATE TABLE [dbo].[OLAPPartition]( [ServerName] [varchar](50) NULL, [DatabaseName] [varchar](50) NOT NULL, [CubeName] [varchar](50) NOT NULL, [MeasureGroupName] [varchar](100) NOT NULL, [BaseQuery] [varchar](4000) NOT NULL ) ON [PRIMARY]
Toute la logique de cette table est dans la colonne « BaseQuery » qui devra me retourner pour un groupe de mesure 7 colonnes :
- Nom de la partition
- Query
- Slice
- IsActive
- ProcessPeriodicity
- ProcessType
- QueueId
Voici un exemple de requête que je peux mettre dans cette colonne « BaseQuery » :
WITH TimeCTE AS ( SELECT DATEADD(month,-60,GETDATE()) AS DateT UNION ALL SELECT DATEADD(MONTH,1,DateT) FROM TimeCTE WHERE DateT < DATEADD(MONTH,1,GETDATE()) ) SELECT CONVERT(char(6),DateT,112) + ' - ' + P.MeasureGroupName AS PartitionName ,'SELECT * FROM dmt_vpanalysis.t_fac_participationdeviceattendings WHERE FLOOR(raisedateid/10000) = ' + CONVERT(char(6),DateT,112) AS PartitionQuery ,'[Order date].[Year - Month].[Month].&['+CONVERT(char(6),DateT,112)+']' AS PartitionSlice ,CASE WHEN DateT >= DATEADD(MONTH,-2,GETDATE()) THEN 1 ELSE 0 END AS IsActive ,'Daily' AS ProcessPeriodicity ,'FULL' AS ProcessType ,2 AS QueueId FROM TimeCTE CROSS JOIN dbo.OLAPPartition P WHERE P.MeasureGroupName = 'Attending' OPTION (MAXRECURSION 0)
Si vous êtes à l’aise avec les CTE recursive, vous constatez que la requête suivante me retourne 62 lignes représentant les 62 partitions pour 62 mois. J’ai utilisé une CTE dans mon exemple pour avoir un peu de dynamisme sur la génération de mes partitions mais j’aurais pu aussi faire des UNION avec des granularités différentes dans mes différentes partitions sans utiliser de CTE. L’avantage de variabiliser avec une requête SQL c’est que vous n’êtes pas limité sur vos règles de gestion de génération de partitions.
Processing
Une fois les informations sur le partitionnement renseignées, il sera intéressant de pouvoir indiquer à notre logique de processing les éléments à actualiser.
CREATE TABLE [dbo].[OLAPObject]( [OLAPObjectId] [int] IDENTITY(1,1) NOT NULL, [ServerName] [varchar](50) NULL, [DatabaseName] [varchar](50) NULL, [CubeName] [varchar](50) NULL, [ObjectDescription] [varchar](255) NULL, [ObjectName] [varchar](100) NULL, [ProcessPeriodicity] [varchar](50) NULL, [ProcessType] [varchar](50) NULL, [IsActive] [bit] NULL, [ToBeProc] [varchar](50) NULL, [QueueId] [int] NULL, [LastRefreshDate] [datetime] NULL, CONSTRAINT [PK_OLAPObject] PRIMARY KEY CLUSTERED ( [OLAPObjectId] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON) ON [PRIMARY] ) ON [PRIMARY]
Les deux notions intéressantes sont :
- Le couple ProcessPeriodicity / LastRefreshDate qui va nous permettre de savoir si l’Objet SSAS doit être actualisé.
- Le QueuId qui va nous permettre d’indiquer ce que l’on doit paralléliser (Si deux bases SSAS sont dans le même QueueId, elle seront mises à jour séquentiellement afin d’éviter un overhead de la machine)
Voici pour la partie Base de données, passons maintenant à la partie SSIS / C#
Effectuer les traitements avec SSIS
Voici la structure de mon projet, 3 packages dont un pour la gestion des partitions et deux pour le traitement des objets SSAS.
PartitionOlap
La logique est extrêmement simple :
- Récupérer les groupes de mesures à gérer
- Pour chaque groupe de mesures on exécute la requête présente dans « BaseQuery » (La CTE vue un peu plus haut)
- On passe le dataset retourné par l’étape 2 au SCRIPT C# qui va s’occuper de la création / suppression de nos partitions.
- Les deux boucles suivantes sont là pour populer la table « OlapObject » avec les éléments à actualiser lors du job de traitement
Le script C# de « SRC – AddPartition » est le suivant :
public void Main() { bool fireAgain = true; try { OleDbDataAdapter DaObjects = new OleDbDataAdapter(); DataTable DtOjects = new DataTable(); //Variable SSAS String OLAPServerName; String OLAPDBName; String ConnectionString; List<string> partitionsName = new List<string>(); DaObjects.Fill(DtOjects, Dts.Variables["User::Partitions"].Value); //Initialisation des variables OLAPDBName = Dts.Variables["User::DatabaseName"].Value.ToString(); OLAPServerName = Dts.Variables["User::ServerName"].Value.ToString(); Server OLAPServer = new Server(); Dts.Events.FireInformation(0, "ServerName", OLAPServerName, String.Empty, 0, ref fireAgain); ConnectionString = "Data Source=" + OLAPServerName + ";Timeout=60000"; try { OLAPServer.Connect(ConnectionString); } catch (Exception e) { OLAPServer.Connect(ConnectionString); } Database OLAPDB = OLAPServer.Databases.FindByName(OLAPDBName); Dts.Events.FireInformation(0, "OLAPDB", OLAPDBName, String.Empty, 0, ref fireAgain); Cube OLAPCube = OLAPDB.Cubes.FindByName(Dts.Variables["User::CubeName"].Value.ToString()); MeasureGroup OLAPMeasureGroup = OLAPCube.MeasureGroups.FindByName(Dts.Variables["User::MeasureGroupName"].Value.ToString()); //OLAPServer.CaptureXml = true; foreach (DataRow row in DtOjects.Rows) { partitionsName.Add(row["PartitionName"].ToString()); if (OLAPMeasureGroup.Partitions.Find(row["PartitionName"].ToString()) == null) { Partition partition = OLAPMeasureGroup.Partitions.Add(row["PartitionName"].ToString()); partition.Slice = row["PartitionSlice"].ToString(); //Attention il faudra surement gérer les Datasources multiples mais là j'ai pas envie partition.Source = new QueryBinding(OLAPDB.DataSources[0].ID, row["PartitionQuery"].ToString()); if (OLAPMeasureGroup.AggregationDesigns.Count > 0) { partition.AggregationDesignID = OLAPMeasureGroup.AggregationDesigns[0].ID; } partition.Update(); } } List<string> toDelete = new List<string>(); for (int i = OLAPMeasureGroup.Partitions.Count - 1; i >=0; i--) { if(!partitionsName.Contains(OLAPMeasureGroup.Partitions[i].Name)) { toDelete.Add(OLAPMeasureGroup.Partitions[i].Name); OLAPMeasureGroup.Partitions[i].Drop(); } } Dts.Variables["User::toDelete"].Value = toDelete; /*OLAPServer.CaptureXml = false; XmlaResultCollection results = OLAPServer.ExecuteCaptureLog(false, true); StringCollection CapturedXmla = new StringCollection(); CapturedXmla = OLAPServer.CaptureLog; if (CapturedXmla.Count > 1) { StreamWriter sw = new StreamWriter(@"D:\" + OLAPDBName + ".xmla"); sw.AutoFlush = true; sw.Write(OLAPServer.ConcatenateCaptureLog(true, true)); sw.Close(); }*/ Dts.TaskResult = (int)ScriptResults.Success; } catch (Exception e) { Dts.Events.FireError(0, "PartitionAdd", e.Message, String.Empty, 0); Dts.TaskResult = (int)ScriptResults.Failure; } }
Pour ceux qui se demandent pourquoi une reconnexion dans le try/catch c’est parce que dans le cas d’utilisation de ce script j’ai une impersonnation entre deux domaines et une qualité réseau perfectible alors je laisse deux essais avant d’abdiquer.
ProcessOlap
Nos partitions sont maintenant créées, il est donc temps de mettre à jour nos objets SSAS.
1er partie, mettre à jour notre table « OLAPObject » afin d’indiquer les éléments à planifier, cela est réalisé grâce à la requêtes suivante :
UPDATE dbo.OLAPObject SET ToBeProc = CASE WHEN ProcessPeriodicity = 'Daily' AND (DATEDIFF(day,LastRefreshDate,GETDATE()) >= 1 OR LastRefreshDate IS NULL) THEN 'Planned' ELSE ToBeProc END WHERE ToBeProc = 'Done' AND IsActive = 1
Vous constatez que dans cette requête j’ai uniquement traité le cas d’une periodicité quotidienne, il est tout à fait possible d’ajouter des cas.
Nous récupérons ensuite les QueueId qui vont devoir être actualisés et nous lançons un « Thread » par queueId (Cela est devenu extrêmement simple depuis SSIS 2012) grâce à la requête suivante :
Declare @execution_id bigint EXEC [SSISDB].[catalog].[create_execution] @package_name=N'ProcessOlapThread.dtsx', @execution_id=@execution_id OUTPUT, @folder_name=N'BI', @project_name=N'SSASProcessingTool', @use32bitruntime=False, @reference_id=Null Select @execution_id DECLARE @var0 int = ? EXEC [SSISDB].[catalog].[set_execution_parameter_value] @execution_id, @object_type=30, @parameter_name=N'QueueId', @parameter_value=@var0 DECLARE @var1 smallint = 1 EXEC [SSISDB].[catalog].[set_execution_parameter_value] @execution_id, @object_type=50, @parameter_name=N'LOGGING_LEVEL', @parameter_value=@var1 EXEC [SSISDB].[catalog].[start_execution] @execution_id GO
Mon package ProcessOlapThread va donc être lancé de manière asynchrone autant de fois que j’ai de QueueId différent.
Toute la logique ici va être dans SRC-ProcessObjects que voici :
public void Main() { try { bool fireAgain = true; OleDbDataAdapter DaObjects = new OleDbDataAdapter(); DataTable DtOjects = new DataTable(); //Variable SSAS String OLAPServerName; String OLAPDBName; String ConnectionString; DaObjects.Fill(DtOjects, Dts.Variables["User::ObjectToProcessDS"].Value); //Initialisation des variables OLAPDBName = Dts.Variables["User::DatabaseName"].Value.ToString(); OLAPServerName = Dts.Variables["User::ServerName"].Value.ToString(); Server OLAPServer = new Server(); ErrorConfiguration errConfig = new ErrorConfiguration(); errConfig.KeyErrorLimit = -1; errConfig.KeyNotFound = ErrorOption.IgnoreError; errConfig.NullKeyNotAllowed = ErrorOption.IgnoreError; ConnectionString = "Data Source=" + OLAPServerName + ";Timeout=60000"; try { OLAPServer.Connect(ConnectionString); } catch (Exception e) { Dts.Events.FireWarning(0, "Connexion : ", "Deuxième tentative de connexion", "", 0); OLAPServer.Connect(ConnectionString); } Database OLAPDB = OLAPServer.Databases.FindByName(OLAPDBName); OLAPServer.CaptureXml = true; foreach (DataRow row in DtOjects.Rows) { Cube OLAPCube = OLAPDB.Cubes.FindByName(row["CubeName"].ToString()); Dimension OLAPDim = OLAPDB.Dimensions.FindByName(row["ObjectName"].ToString()); if (row["CubeName"].ToString() == string.Empty) { if (OLAPDim != null) // il s'agit bien d'une dimension { //Utiliser CanProcess OLAPDim.Process(Utils.GetProcessTypeFromString(OLAPDim, row["ProcessType"].ToString())); } else { OLAPDB.Process(Utils.GetProcessTypeFromString(OLAPDB, row["ProcessType"].ToString())); } } else if (OLAPCube != null) { MeasureGroup OLAPMG = OLAPCube.MeasureGroups.FindByName(row["ObjectName"].ToString()); Partition OLAPPartition = Utils.GetPartitionByName(OLAPCube, row["ObjectName"].ToString()); if (OLAPPartition != null) // il s'agit d'une partition { OLAPPartition.Process(Utils.GetProcessTypeFromString(OLAPPartition, row["ProcessType"].ToString())); } else if (OLAPMG != null) // il s'agit d'une partition { OLAPMG.Process(Utils.GetProcessTypeFromString(OLAPMG, row["ProcessType"].ToString())); } else if (row["ObjectName"].ToString() == String.Empty) // il s'agit d'une partition { OLAPCube.Process(Utils.GetProcessTypeFromString(OLAPCube, row["ProcessType"].ToString())); } else { Dts.Events.FireError(0, "ELEMENT INTROUVABLE : ", "Database " + OLAPDBName + "Objet : " + row["ObjectName"].ToString(), "", 0); } } } OLAPServer.CaptureXml = false; StringCollection CapturedXmla = new StringCollection(); CapturedXmla = OLAPServer.CaptureLog; if (CapturedXmla.Count > 1) { string XMLA = OLAPServer.ConcatenateCaptureLog(true, true); XMLA = XMLA.Replace("<Parallel>", @"<ErrorConfiguration xmlns:xsd=""http://www.w3.org/2001/XMLSchema"" xmlns:xsi=""http://www.w3.org/2001/XMLSchema-instance"" xmlns:ddl2=""http://schemas.microsoft.com/analysisservices/2003/engine/2"" xmlns:ddl2_2=""http://schemas.microsoft.com/analysisservices/2003/engine/2/2"" xmlns:ddl100_100=""http://schemas.microsoft.com/analysisservices/2008/engine/100/100"" xmlns:ddl200=""http://schemas.microsoft.com/analysisservices/2010/engine/200"" xmlns:ddl200_200=""http://schemas.microsoft.com/analysisservices/2010/engine/200/200"" xmlns:ddl300=""http://schemas.microsoft.com/analysisservices/2011/engine/300"" xmlns:ddl300_300=""http://schemas.microsoft.com/analysisservices/2011/engine/300/300""> <KeyErrorLimit>-1</KeyErrorLimit> <KeyNotFound>IgnoreError</KeyNotFound> <NullKeyNotAllowed>IgnoreError</NullKeyNotAllowed> </ErrorConfiguration> <Parallel>"); //StreamWriter sw = new StreamWriter(@"D:" + OLAPDBName + ".xmla"); //sw.AutoFlush = true; //sw.Write(XMLA); //sw.Close(); Dts.Events.FireInformation(0, "XMLA : ", XMLA, String.Empty, 0,ref fireAgain); XmlaResultCollection results = OLAPServer.Execute(XMLA); //XmlaResultCollection results = OLAPServer.ExecuteCaptureLog(true, true); foreach (XmlaResult result in results) { foreach (XmlaMessage message in result.Messages) { if (message.GetType().Name == "XmlaError") { Dts.TaskResult = (int)ScriptResults.Failure; Dts.Events.FireError(0, "Process Error", message.Description, "", 0); } } } } Dts.TaskResult = (int)ScriptResults.Success; } catch (Exception e) { Dts.Events.FireError(0, "Process Error", e.Message + "\r" + e.StackTrace, String.Empty, 0); Dts.TaskResult = (int)ScriptResults.Failure; } Dts.TaskResult = (int)ScriptResults.Success; }
Le gros du travail est de pouvoir identifier le type d’objet que nous voulons actualiser, en effet nous pouvons saisir dans ObjectName une partition mais aussi une dimension, un groupe de mesure, un cube… le script va donc tester les différents éléments afin de définir ce qui doit être actualisé.
Pour finir, voici le projet SSIS complet, celui-ci ne doit absolument pas être utilisé tel quel, il s’agit d’une base de réflexion il a déjà beaucoup évolué depuis cette version que je vous livre.
(Attention, les scripts contiennent une partie impersonation pour gérer l’inter domaine je vous invite fortement à l’enlever si vous n’en avez pas besoin)
Amusez vous bien !
2 Comments
Pas mal! Après je me demande dans quelle mesure tu simplifies pas ton deploiement en stockant les métadonnées du groupe de mesure dans les annotations en K/V – plutot que dans une table – mais c’est une question de goût. L’avantage c’est que tu peux te passer de SQL pour stocker ça et deporter tout le code (vérification de l’existence de ces metas et logique de process) dans une lib .net sans dépendance autre que SSAS. T’en penses quoi?
Hello,
J’aime bien l’idée, mais moins facile dans ce cas là de séparer exploitation et développement de la base AS (Je vois mal un DBA exploit me changer les annotations du cube). Comment tu gèrerais « Simplement » un équivalent de la requête SQL qui te permet de définir ton partitionnement de façon 100% flexible (1 partition par années pour les n dernières années puis par mois sur l’année passée puis par jours … ?). Ceci étant pouvoir se débarrasser de toute la partie SQL pour rester 100% cube c’est super intéressant 😀