Input and Data Sources

In Memory

If your data fits into process memory - use that. It's fast.

import ultibi as ul
import polars as pl

in_mem_frame = pl.read_csv(
    "./data/frtb/Delta.csv", dtypes={"SensitivitySpot": pl.Float64}
)
dsource = ul.DataSource.inmemory(in_mem_frame)
ds = ul.DataSet.from_source(dsource)
ds.prepare()  # .prepare() is only relevant to FRTB dataset currently

ds.ui()

Same would be achieved with a .from_frame() shortcut.

Your data must be a polars Dataframe. You can either do this yourself(using any of the countless IO operations supported including from_pandas and from_arrow) or use a config.

Scan

If you can't hold all your data in the process memory, you can sacrifise performance for a Scan.

import polars as pl
import ultibi as ul

# Note that the LazyFrame query must start with scan_
# and must've NOT been collected
scan = pl.scan_csv("./data/frtb/Delta.csv", dtypes={"SensitivitySpot": pl.Float64})
dsource = ul.DataSource.scan(scan)
ds = ul.DataSet.from_source(dsource)

ds.ui()

Note:

  • Naturally this option will be slower, because prior to computing your measures we will need to read the relevant bits of the data into the process memory, and if relevant, call .prepare().
  • Scanning involves serialisation of the Lazy Frame, and hence the python version of your polars lib must be aligned to what we expect. At the time of writing it has to be >=0.18.7.

DataBase

Ultibi leverages on connectorx. As such all of their Sources will work eventually (Postgres, Mysql, Mariadb (through mysql protocol), Sqlite, Redshift (through postgres protocol), Clickhouse (through mysql protocol), SQL Server, Azure SQL Database (through mssql protocol), Oracle, Big Query).

Currently, Mysql has been tested to work and other DataBases will be supported in the nearest future.

import ultibi as ul
import polars as pl
from polars.type_aliases import PolarsDataType

# SQL is not very good in preserving and communicating back the schema
# Best to provide the expected schema to the cube Data Set
schema: list[tuple[str, PolarsDataType]] = [
    ("COB", pl.Utf8),
    ("TradeId", pl.Utf8),
    ("SensitivitySpot", pl.Float64),
    ("Sensitivity_025Y", pl.Float64),
    ("EXOTIC_RRAO", pl.Boolean),
    ("OTHER_RRAO", pl.Boolean),
    ("Sensitivity_05Y", pl.Float64),
    ("Sensitivity_1Y", pl.Float64),
    ("Sensitivity_2Y", pl.Float64),
    ("Sensitivity_3Y", pl.Float64),
    ("Sensitivity_5Y", pl.Float64),
    ("Sensitivity_10Y", pl.Float64),
    ("Sensitivity_15Y", pl.Float64),
    ("Sensitivity_20Y", pl.Float64),
    ("Sensitivity_30Y", pl.Float64),
    ("SensitivityCcy", pl.Utf8),
    ("CoveredBondReducedWeight", pl.Utf8),
    ("Sector", pl.Utf8),
    ("FxCurvDivEligibility", pl.Boolean),
    ("BookId", pl.Utf8),
    ("Product", pl.Utf8),
    ("Notional", pl.Float64),
    ("Desk", pl.Utf8),
    ("Country", pl.Utf8),
    ("LegalEntity", pl.Utf8),
    ("Group", pl.Utf8),
    ("RiskCategory", pl.Utf8),
    ("RiskClass", pl.Utf8),
    ("RiskFactor", pl.Utf8),
    ("RiskFactorType", pl.Utf8),
    ("CreditQuality", pl.Utf8),
    ("MaturityDate", pl.Utf8),
    ("Tranche", pl.Utf8),
    ("CommodityLocation", pl.Utf8),
    ("GirrVegaUnderlyingMaturity", pl.Utf8),
    ("BucketBCBS", pl.Utf8),
    ("BucketCRR2", pl.Utf8),
    ("GrossJTD", pl.Float64),
    ("PnL_Up", pl.Float64),
    ("PnL_Down", pl.Float64),
]

conn_uri = "mysql://%s:%s@%s:%d/%s?cxprotocol=binary" % (
    "root",
    "mysql",
    "localhost",
    3306,
    "ultima",
)

db = ul.DbInfo("frtb", "MySQL", conn_uri, schema)

source = ul.DataSource.db(db)

build_params = dict(
    fx_sqrt2_div="true",
    girr_sqrt2_div="true",
    csrnonsec_covered_bond_15="true",
    DayCountConvention="2",
    DateFormat="DateFormat",
)

ds = ul.FRTBDataSet.from_source(source, build_params=build_params)

ds.ui()