Una de las prestaciones que ofrece Fabric es la orquestación de notebooks para el tratamiento de ficheros en diferentes formatos. En este post veremos como cargar ficheros CSV en un Lakehouse para posteriormente crear tablas delta mediante el uso de notebooks.
¡Comenzamos!
Gran parte de los proyectos relacionados con Fabric incluyen el uso de pipelines de Synapse – Data Engineering para orquestar diversos procesos ETL que permitan volcar la información contenida en ficheros alojados dentro de un almacén Lakehouse. Mediante un caso de uso práctico, veremos como configurar la lectura dinámica de ficheros CSV y su posterior volcado en tablas delta haciendo uso de parámetros y notebooks de Spark.
¿Qué se entiende por notebook de Synapse?
Un notebook de Synapse es un objeto que nos permite utilizar diferentes tipos de lenguaje (Python, Scala, R, SQL) para elaborar programas que nos ayuden, entre otros a:
Leer ficheros en diversos formatos (CSV, Parquet, Delta…)
Análisis exploratorio
Realizar transformaciones sobre los datos
Elaborar visualizaciones
Crear tablas e ingestar datos
Para llevar a cabo nuestro caso de uso, vamos a dividir la arquitectura de nuestro pipeline en las diferentes fases:
Flujo de solución
Un Notebook encargado de listar los nombres de los ficheros alojados en nuestro Lakehouse y guardar dicho listado en una variable.
Se itera sobre dicha lista mediante el uso de un bucle ForEach.
Para cada elemento de la lista, se leerá su contenido (tratado como CSV) para posteriormente almacenarlo en una tabla Delta de su mismo nombre.
Quedando la arquitectura global de la siguiente forma:
Arquitectura global
Arquitectura global
Caso de uso: lectura y carga dinámica de ficheros CSV
Comencemos por cargar nuestros ficheros de origen en un almacén Lakehouse, para ello, primero debemos haber creado un Lakehouse en nuestro workspace:
Creación de un nuevo Lakehouse
Una vez creado podremos acceder a él y cargar nuestros ficheros CSV:
Nuevo Lakehouse: Semantic model y Endpoint
Al acceder por primera vez a nuestro Lakehouse veremos que existen por defecto dos apartados:
- Tables: Aquí se almacenarán las tablas delta que iremos creando dentro de nuestro Lakehouse.
- Files: Donde cargaremos nuestros ficheros CSV de origen. Tales ficheros pueden ser cargados de forma manual.
Procedemos a cargar los ficheros “Customer.csv” y “Stores.csv” mediante el selector “Get Data”:
Tables: Aquí se almacenarán las tablas delta que iremos creando dentro de nuestro Lakehouse.
Files: Donde cargaremos nuestros ficheros CSV de origen. Tales ficheros pueden ser cargados de forma manual.
Carga de ficheros CSV
Ya disponemos de nuestros ficheros CSV en nuestro almacén Lakehouse, estamos listos para crear nuestro primer notebook de apache Spark.
Llamaremos a nuestro primer notebook “List Files” y tendrá por código:
Listar ficheros alojados en Lakehouse
En este notebook hacemos uso de la librería “mssparkutils”, la cual servirá para leer los nombres de los ficheros contenidos en nuestro Lakehouse (línea 8), así como para devolver los nombres de dichos ficheros (línea 15) como argumento de salida.
A la hora de ubicar la llamada a este notebook en nuestro pipeline, lo haremos en primer lugar mediante la actividad “Notebook”:
Notebook: Listar ficheros
Una vez ejecutado, podemos ver su salida en tiempo de ejecución en formato JSON.
Salida en formato JSON
Nos interesa el atributo “exitValue” que contiene la lista de ficheros alojados en nuestro Lakehouse, podemos proceder a formatear el texto JSON para verlo de forma más clara:
Lista de ficheros por salida
Una vez que tenemos los nombres de los ficheros a leer, nos interesa iterar por cada uno de ellos para proceder a su lectura posterior volcado en formato delta. Para ello, el siguiente elemento de nuestra arquitectura será una actividad “ForEach”:
Actividad ForEach
Los ítems por los que iterará la actividad ForEach serán cada uno de los nombre de los ficheros del listado recuperado anteriormente, en este caso “Customer.csv” y “Stores.csv”.
Para poder acceder correctamente a cada ítem, debemos utilizar el lenguaje de expresiones, mediante el cual separaremos cada elemento de la lista por el carácter comma “,”:
A la hora de iterar, nuestro bucle hará dos iteraciones por ejecución (dado que solo tenemos dos ficheros), realizando en cada una de ellas las siguientes operaciones:
Iteraciones ForEach
- Almacenamiento del nombre del fichero a iterar en una variable temporal previamente creada.
- Ejecución de notebook de lectura de fichero CSV y escritura en formato Delta.
La ejecución del notebook encargado de leer y cargar se realiza en dos bloques principales: uno destinado a inicializar el parámetro vacío (marcado como bloque de parámetros) y otro destinado a la lectura y carga dinámica de los ficheros y tablas delta.
Nota: Como se ha referenciado en el párrafo anterior, es importante marcar el primer bloque de código como parámetro para poder recibir correctamente el valor de la ejecución:
Etiquetado como parámetro
Notebook: Lectura CSV y escritura Delta
A la hora de especificar una ruta podemos hacerlo de dos formas:
- Rutas absolutas: especificadas de forma directa.
- Rutas relativas: especificadas en base al valor de parámetros, como en el caso que nos atañe, donde el valor de la ruta variará en función del fichero a cargar (fileName):
Si la ejecución resulta satisfactoria, podremos comprobar que todas las actividades se han ejecutado con éxito en el apartado “Output”:
Ejecución exitosa
Por último, solo queda comprobar que nuestras tablas Delta han sido creadas y cargadas correctamente, para ello vamos al apartado “Tables” dentro de nuestro Lakehouse:
Tablas Delta
Quiero consultar mis tablas delta en formato T-SQL, ¿Es posible?
Sí, al acceder a nuestros recursos en workspace, veremos que al crear nuestro Lakehouse se ha creado un sub-recurso “SQL analytics endpoint”:
SQL analytics endpoint
Si entramos dentro accederemos a un panel SQL donde podremos ver esquemas, tablas, vistas y procedimientos almacenados, entre otros.
Panel SQL analytics endpoint
Al pulsar sobre “New SQL query” podemos ejecutar una nueva consulta sobre nuestra tabla delta “customer” previamente materializada por el proceso:
Consulta SQL
Conclusión
A lo largo de este post hemos visto un caso de uso práctico que permite la lectura dinámica de ficheros CSV y su posterior carga en formato tabla Delta.
Este alcance resulta muy útil y potente en aquellos escenarios ETL que requieran el tratamiento de un gran conjunto de ficheros, así como el aprovechamiento de la ventaja que supone el uso de tablas en formato Delta, principalmente gracias al versionado que permite mantener un histórico de los datos que nos permita acceder a versiones previas.
Todos estos elementos combinados, dan lugar a la antesala de un correcto modelado de datos para su posterior uso en herramientas analíticas y de reporting como Power BI.