import numpy as np
import pandas as pd
import polars as pl
from alpha.dataset.utility import DataProxy
from .ts_function import ts_slope, ts_rsquare, ts_delay
def to_pd_series(feature: DataProxy) -> pd.Series:
"""Convert to pandas.Series data structure"""
return feature.df.to_pandas().set_index(["date", "symbol"])["data"]
def to_pl_dataframe(series: pd.Series) -> pl.DataFrame:
"""Convert to polars.DataFrame data structure"""
return pl.from_pandas(series.reset_index().rename(columns={0: "data"}))
def trend_score(feature: DataProxy, window: int) -> DataProxy:
return ts_slope(feature,window) * ts_rsquare(feature,window)
def roc(feature: DataProxy, window: int) -> DataProxy:
"""计算给定窗口期的变动率(Rate of Change)"""
df: pl.DataFrame = feature.df.select(
pl.col("date"),
pl.col("symbol"),
(
(pl.col("data") - pl.col("data").shift(window).over("symbol"))
/ pl.col("data").shift(window).over("symbol")
).alias("data")
)
return DataProxy(df)
import numpy as np
import polars as pl
def ATR(feature_high: DataProxy, feature_low: DataProxy, feature_close: DataProxy, window: int) -> DataProxy:
"""
Calculate ATR (Average True Range) using Polars.
Parameters:
feature_high (DataProxy): DataProxy containing 'date', 'symbol', and 'data' columns for high prices.
feature_low (DataProxy): DataProxy containing 'date', 'symbol', and 'data' columns for low prices.
feature_close (DataProxy): DataProxy containing 'date', 'symbol', and 'data' columns for close prices.
window (int): The window size for calculating ATR.
Returns:
DataProxy: DataProxy containing 'date', 'symbol', and 'atr' columns.
"""
merged = feature_high.df.join(feature_low.df, on=["date", "symbol"]).join(feature_close.df, on=["date", "symbol"])
merged = merged.rename({"data": "high", "data": "low", "data": "close"})
merged = merged.with_columns(
pl.col("high").cast(pl.Float64).alias("high"),
pl.col("low").cast(pl.Float64).alias("low"),
pl.col("close").cast(pl.Float64).alias("close"),
)
tr1 = (merged["high"] - merged["low"]).alias("tr1")
tr2 = (merged["high"] - merged["close"].shift(1)).
abs().alias("tr2")
tr3 = (merged["low"] - merged["close"].shift(1)).abs().alias("tr3")
merged = merged.with_columns(
pl.concat_list([tr1, tr2, tr3]).arr.max().alias("tr")
)
atr = merged.with_columns(
pl.col("tr")
.rolling_map(lambda s: s.mean(), window)
.over("symbol")
.alias("atr")
)
return DataProxy(atr.select(pl.col("date"), pl.col("symbol"), pl.col("atr")))
def RSRS(high: pl.DataFrame, low: pl.DataFrame, window: int) -> pl.DataFrame:
"""Calculate RSRS (Relative Strength Slope) indicator"""
combined = (
high.select(pl.col("date"), pl.col("symbol"), pl.col("data").alias("high"))
.join(
low.select(pl.col("date"), pl.col("symbol"), pl.col("data").alias("low")),
on=["date", "symbol"]
)
)
rsrs = combined.with_columns(
pl.col("high")
.rolling_map(
lambda s: 0.0 if len(s) < window else np.polyfit(np.arange(window), s[-window:], 1)[0],
window,
min_periods=window
)
.over("symbol")
.alias("high_slope"),
pl.col("low")
.rolling_map(
lambda s: 0.0 if len(s) < window else np.polyfit(np.arange(window), s[-window:], 1)[0],
window,
min_periods=window
)
.over("symbol")
.alias("low_slope")
)
rsrs = rsrs.with_columns(
(pl.col("high_slope") / pl.col("low_slope")).alias("rsrs")
)
return rsrs.select(pl.col("date"), pl.col("symbol"), pl.col("rsrs"))