Big Data Management. 2 - Primeras lineas de código.

En el artículo anterior, realizamos una introducción a las bases de datos NoSQL y sus ventajas, casos de uso y justificaciones pertinentes.

Big Data Management. 2 - Primeras lineas de código.

En el artículo anterior, realizamos una introducción a las bases de datos NoSQL y sus ventajas, casos de uso y justificaciones pertinentes. Usualmente en la tecnología existe una premisa máxima: "Si algo funciona, no lo toques". El problema surge cuando llegas a los límites tecnológicos de la arquitectura. Cuando entiendes las limitaciones de las bases de datos tradicionales a la hora de hacer una gestión de datos correcta, eficiente y sostenible, no queda otra solución que acudir a estructuras como ArcticDB que vamos a explicar en este artículo.

En este artículo, vamos a explicar los principios fundamentales de una forma más teórica. Vamos a realizar un pequeño recorrido por las funciones nativas de datalake, intentando aportar más conocimiento teórico, esta vez desde un punto de vista práctico.

Y posteriormente crearemos la primera parte del código de nuestro datalake. Generaremos diferentes módulos que solucionaran ciertas tareas del día a día, y sus contingencias, al mismo tiempo que recopilaremos los primeros datos, realizaremos los procesos oportunos para tener dos outputs distintos, común a todos los datasets:

- Los datos: Los datos que queremos pushear al datalake. Recordemos que al tratarse de un Datalake NoSQL, cuando hablamos de datos, hablamos de combinaciones de 0 y 1, sin importar si son datos temporales, datos fundamentales, etc… Los datalakes consideran los datos como información binaria registrada, y que posteriormente es accedida, al contrario de las databases SQL, que disponen de esquemas fijos y cualquier información no tipeada (adaptada a sus esquemas) produce un error.

- Los Metadatos: Necesitamos generar metadatos, para poder acceder a cierta información relevante como, la última vez que la base datos fue actualizada, cuando fue creada, qué longitud tiene, etc. Los metadatos son valores personalizados en función de la data que se quiera procesar

Una vez finalizado el proceso, nuestro objetivo será storear nuestra información, en nuestro objeto S3 (push data), o hablando en cristiano, enviarla a la nube.

Despleglegar Arquitectura

Paquetes Requeridos

El paquete requerido de forma obligatoria es unicamente arcticdb. Si quieres instalarlo, tienes unicamente que realizar :

pip install arcticdb

Datos Requeridos

Primero de todo, para los que no tengan ni idea de que es un servico S3:

Introducción a AWS S3 (Amazon Simple Storage Service)
Artículo básico sobre servicio S3 AWS. Conceptos, funcionamiento y características principales.

Luego aquí os dejo unos proveedores pythonde S3, que personalmente he probado, y a mí no me han dado errores, y he quedado satisfecho con el desempeño. Los ordeno por orden de importancia. Es decir, el primero es el que utilizaría en proyectos más críticos, y último el que utilizaría para jugar.

Acceso a un S3 :

Llegados a este punto, ya dispondremos de toda la información necesaria para crear nuestro objeto.

Nuestro objeto es un “puntero” que acumula todas las credenciales y direcciones necesarias para conectarse a nuestro datalake. Es como la llave maestra que nos dejara realizar todas las acciones dentro del datalake.

Para poder crear el puntero lo hacemos mediante:

from arcticdb import Arctic

Cargamos Arcticdb para poder crear el puntero

  • endpoint : tu url del s3
  • db : tu datalake dentro de s3
  • access key : clave de acceso
  • secret key: clave de acceso secreta
endpoint = 'tu-localizacion.tuproveedor.com'
db = 'datalake.stable' 
access_key = "soymuyseguro" 
secret_key= "noselodigasanadie" 

definimos el entorno para poder llamar al puntero

Ahora, una vez definidas las variables, creamos el puntero pasandole como parametros todos los valores introducidos de entorno.

datalake = Arctic(f's3s://{endpoint}:{db}?access=
{access_key}&secret={secret_key}')

creamos el puntero mediante Arctic y generamos la direccion del S3s mediante las variables del entorno definidas con anterioridad

El objeto que hemos creado, la llave que nos dara acceso total a nuestro datalake, devuelve esta configuracion.

Funcionalidades básicas del objeto datalake.

Este objeto es utilizado para comunicarnos con la primera capa de nuestro datalake. La primera capa se ocupa de las siguientes tareas:

  • crear: Esta primera capa es capaz de crear dentro del datalake bases de datos donde depositaremos y consultaremos a futuro, lugar donde volcaremos todos los símbolos de nuestra información.
  • eliminar: De forma opuesta a la creación, también nos permite eliminar toda la base datos completa, lo que implica todos los tickers, y toda información que hay en su interior.
  • listar: Muestra todas las bases de datos activas dentro de un datalake
  • acceder: Accede a la segunda capa del datalake, dentro del datalake-apunta a una base de datos concreta.
  • get URI: Nos devuelve la URL del servicio S3s que está apuntando.
Crear

Para poder crear una database dentro del datalake se realiza de la siguiente manera:

datalake.create_library('nombre.de.la.database')

Crearia en el datalake la database nombre.de.la.database

Al crear una base de datos, sin asignar a ninguna variable, obtenemos la respuesta, si se ha asignado a algún valor, únicamente hay que lanzar el valor:

Como resumen podemos concluir que mediante el objeto de datalake accedemos a la primera capa, y cuando ejecutamos la función create_library hacemos que cree una nueva base de datos dentro del datalake. Condición necesaria para posteriormente poder volcar todo tipo de información.

Para verificar que todo ha sido correcto, únicamente habría que realizar el comando de Listar.

Eliminar

De forma opuesta al create_library(), arcticdb tiene la función de delete_library(). Esta llamada elimina toda la base de datos que se ha solicitado eliminar. No hablamos de un ticker en concreto o un día en concreto, hablamos de eliminar todos los días de todos los tickers de una base de datos completa. Para eliminar un ticker, se utilizan otras funciones.

datalake.delete_library('nombre.de.la.database')

Eliminaria del datalake la database 'nombre.de.la.database'

Listar

Al utilizar la función de list_libraries(), el datalake nos devuelve una lista con los nombres únicos de todas las bases de datos que contiene. Con esta función veríamos los conjuntos de datos que actualmente están activos dentro del datalake.

datalake.list_libraries()

Funcion muy util, para verificar y que sera una constante que utilizaremos en la gestion de datos utilizando arcticdb y python.

Acceder

Una de las funciones mas utiles es el acceso a las bases de datos. Una vez tenemos el puntero definido correctamente, podemos acceder de dos formas.

  • mediante []:
datalake['economic.calendar.stable']

acceder mediante []

datalake['economic.calendar.stable'] accederia esa base de datos

  • mediante funcion():
datalake.get_library('economic.calendar.stable')

acceso mediante funcion

ademas, el asignar estas llamadas, facilita a posteor el trabajo con la capa 3 (Los simbolos dentro de las bases de datos, dentro del datalake)

get URI

Esta funcion devuelve la direccion s3 a la cual esta apuntando el objeto. De momento no la vamos a utilizar, pero se puede realizar la llamada tal que:

datalake.get_uri()

Y devuelve de una forma legible para el humano los valores de que esta ejecutando el puntero datalake (los mismos que le hemos introducido anteriormente)

Resumen de la estructura

Como resumen, deberías tener acceso a algun proveedor de espacio S3, en mi recomendación, para labores de producción, es Amazon S3 la mejor opción, y para unos datalakes menos críticos, cualquier proveedor es aceptable, pero recomendaría especialmente wasabisys. ¿Por qué? Porque solo recomiendo lo que conozco, y me ha dado unos resultados excelentes en comparación con el precio pagado por gigabyte.

Además hemos recorrido la primera capa del datalake, la capa “orquestadora”, que organiza la creación, eliminación listado y acceso a las bases de datos de la segunda capa

Datos Macroeconomicos

Para empezar con una serie de datos temporal sencilla, vamos a empezar con datos macroeconómicos. La primera decisión relevante es elegir el proveedor que queremos utilizar. Para datos macroeconómicos haciendo una búsqueda en internet, vemos:

Existe una alta variedad de fuentes de información para obtener datos macroeconómicos. Y el objetivo no es gestionar datos, por gestionar. El objetivo es gestionar dato útiles de calidad. Por consecuencia, volcarse todos los datasets a la base de datos, no tiene sentido a nivel computacional.

Vamos a descargar un calendario de expectativas macroeconómicas, como una primera prueba de concepto,

Plan de Ataque

Nuestro objetivo será recopilar un calendario macroeconómico que incluya información relevante sobre expectativas y datos, dado que con esa información, posteriormente se podría analizar como influye el factor sobre una cesta de activos, o incluso contra el conjunto de mercados, estimar el punto de acción máximo del evento …

Al necesitar información convencional, se podría empezar a trabajar sobre los datasets originales y montar un sistema complejo que ejecute de forma simple una tarea delicada. Pero también existen otras alternativas, en este caso hablo de eodhd, un proveedor de datos europeo, que agrupa y procesa esta información.

En resumen, llamaremos al proveedor de datos, solicitaremos la información requerida, la procesaremos para adaptarla a nuestros intereses, generaremos su metadata, y la volcaremos en el datalake.

Al venir de un proveedor de pago, no es necesario procesar la data, dado que ellos lo hacen por nosotros, y nos devuelven el trabajo listo para su uso.

Prueba de concepto

El esquema aproximado que buscamos en esta prueba de concepto corresponderia a algo como

flowchart TD A[Dalake Ingest RUN] -->|OK?| B(RUN ROUTINE) B --> C{GET DATA} C -->|One| D[adapt] D -->|Two| E[generate meta] E -->|Three| F[push to datalake] F -->|SYMBOL++|C

Nuesta estructura principal, sera un loop, que recorra todos los tickers que reconoce que debe procesar, procesarlos, y dumpearlos al datalake. Algo realmente simple

Para ello utilizaremos

import pandas as pd
import numpy as np 
import requests
from arcticdb import Arctic
from alive_progress import alive_bar

Además de las librerías habituales, utilizaremos alive_progress para poder calcular el proceso y animarlo de una forma sencilla.

Primero programaremos la gestión de la capa 1, y luego recorreremos la gestión de la capa 2, que básicamente consiste en recorrer una lista de tickers, obtenida del proveedor de datos, e ir procesándolos dentro del pipeline y dumpearlos.

Codigo

La primera parte, vamos a programar los metodos de gestion de la capa 1, intentando encapsular lo maximo posible, dado que son procesos que se utilizan de forma recurrente dentro de la gestion de datos con arcticdb

Gestion de la Capa 1 del Datalake

Mediante esta parte, gestionaremos el acceso a la capa 1 del datalake, gestionando las bases de datos de forma automatizada y accediendo a cada una de ellas de forma oportuna.

Esta función lo que hace es tomar desde la dataz (que es una lista) los nombres de las databases que se necesitan para el proceso de ingestión de datos y las crea si no están creadas. Esta función es muy ampliable, pudiendo realizar una gestión completa desde el inicio del algoritmo, pero dado que estamos programando una prueba de concepto funcional, la dejamos lo mas mínima posible.

def prepare_datalinks(dataz):
    print('[PREPARE DATALINKS] OK')
    for d in dataz:
        if d not in ac.list_libraries():
            ac.create_library(d)
            print('[DATABASE]:',d.upper(),'NOT FOUND, CREATING')
        else:
            print('[DATABASE]:',d.upper(),'IS IN THE DATALAKE')
    print('[QA] - DATALINKS READY)

Obtener paises en el Datalake

Una forma de obtener todos los paises que forman parte del dataset, es haciendo una peticion, y creando una lista con los valores unicos que el campo pais. No es la mas eficiente pero es funcional y entendible.

def get_countries():
	url = f'https://eodhd.com/api/economic-events?&from=1900-01-01&to=2024-01-01&limit=1000&comparison=qoq&api_token=6326eedae7f1e4.58558306'
    response = requests.get(url)
    if response.status_code == 200:
    	data = response.json()
        return pd.DataFrame.from_dict(data)['country'].unique().tolist()

Gestion de Candidatos

def construct_candidates(update):
    calendar = ac.get_library(dataz[0])
    countries = get_countries()
    if update == False:
        in_datalake = calendar.list_symbols()
        out = [ticker for ticker in countries if ticker not in in_datalake]
        print('[QA PREADOR INGESTING RE STARTED] - COUNTRIES TOTAL', len(countries),'COUNTRIES IN DATALAKE',len(in_datalake),'TO PUSH :',len(out))
        return out
    else:
        print('[QA PREDATOR] - UPDATING ALL DATALAKE')
        return countries

Gestion de la Capa 2 del Datalake

Una vez que los procesos ejecutados sobre la capa 1 del datalake y tenemos el entorno listo, gestionaremos la capa dos de la siguente forma:

  • Requerir la informacion al data broker
  • Si existe lo procesamos y lo pusheamos, ademas le añadimos metadata
  • Si no existe, lo descartamos y seguimos (quiza algun log estaria bien)
with alive_bar(len(countries),title='COUNTRIES',bar='halloween',force_tty=True)as bar:      
    for t in countries:
        data = pd.DataFrame(obtener_datos_simbolo(t,lista=False)).astype(str).set_index('date').sort_index()
        meta = {'inicio':data.index[0],'fin':data.index[-1],'indicadores':len(data['type'].unique()),'last_update':pd.Timestamp.now()}
        calendar.write(t,data,metadata=meta)
        i = i +1
        datalake.loc[t] = meta
        bar()
    metadata.write('CALENDAR',datalake)

Gestión de las Tareas Post-Exec

Una vez estan todas las tareas de la segunda capa finalizadas, ejecutamos tareas de post-ejecución, como sería el pusheo de los metadatos, el listado de símbolos o cualquier otra tarea de mantenimiento necesaria.

Únicamente vamos a listar los símbolos, pues por el diseño de arcticdb, cada vez que hay modificaciones en los símbolos, debemos hacer procesos de listado. Esto empieza aportar algo de lag a la carga en el momento que se rebasan los 500 símbolos. Por consecuencia, los listamos al final de la carga, como una forma de pre procesar los datos antes que sean requeridos en un futuro para su uso.

calendar.list_symbols()

Resultados

Una vez ejecutada la tarea, podemos evaluar el resultado. Nuestro objetivo es acceder a la base de datos que acabamos de crear y ingerir información, y consultar la metadata generada. ¡Verificar de forma aleatoria 5 valores, y listo! Hemos creado la primera base de datos dentro del datalake, con los calendarios macroeconómicos disponibles en la API de eodhd.