今天遇到一个关于金融背景的时间序列数据分析计算的需求,其中的分析处理方案非常具有代表性,所以分享给大家。
数据大概长这个样子,其中Unnamed: 0这一列代表时间
import numpy as np
import pandas as pd
data = pd.read_excel("a1.xlsx")
data.head(3)
| Unnamed: 0 | 000016.SH | CBA02501.CS | 000832.CSI |
|---|
| 0 | 2016-01-04 | 2270.4609 | 171.8353 | 312.9428 |
|---|
| 1 | 2016-01-05 | 2288.1127 | 171.5916 | 309.5110 |
|---|
| 2 | 2016-01-06 | 2317.6465 | 171.6981 | 310.2719 |
|---|
客户需要按日、周、月、季度计算各列数据百分比变化率的协方差以及期望值。
例如:按季度计算数据百分比变化率=(本季度最后一天-上一个季度的最后一天)/上一个季度的最后一天。
核心问题就在于客户给的数据的时间列并不是完整的,需要先判断出哪一天是(周、月、季度)的最后一天,并且取出对应的数据。
接下来就分享我的解决方案。
一:通过Unnamed: 0这一列计算出某一日期的年份以及当年是第几个季度、第几个月、第几个周。
data["year"] = data['Unnamed: 0'].apply(lambda x: x.year)
data['month'] = data['Unnamed: 0'].apply(lambda x: x.month)
data['week'] = data['Unnamed: 0'].apply(lambda x: x.week)
data['quarter'] = data['Unnamed: 0'].apply(lambda x: x.quarter)
二:通过上面计算得到的数据,按year、month分组聚合,得到按每年每月分组聚合的数据。同理得到每年每季度、每年每周的数据。
month_grouped = data['Unnamed: 0'].groupby([data['year'], data['month']])
week_grouped = data['Unnamed: 0'].groupby([data['year'], data['week']])
quarter_grouped = data['Unnamed: 0'].groupby([data['year'], data['quarter']])
三:通过上面的分组聚合数据,用max()函数取出最后一天数据
month_last_day = pd.to_datetime(month_grouped.max().to_list())
week_last_day = pd.to_datetime(week_grouped.max().to_list())
quarter_last_day = pd.to_datetime(quarter_grouped.max().to_list())
把时间列设为index
data.index = pd.to_datetime(data["Unnamed: 0"])
data.head(3)
| Unnamed: 0 | 000016.SH | CBA02501.CS | 000832.CSI | year | month | week | quarter |
|---|
| Unnamed: 0 |
|
|
|
|
|
|
|
|
|---|
| 2016-01-04 | 2016-01-04 | 2270.4609 | 171.8353 | 312.9428 | 2016 | 1 | 1 | 1 |
|---|
| 2016-01-05 | 2016-01-05 | 2288.1127 | 171.5916 | 309.5110 | 2016 | 1 | 1 | 1 |
|---|
| 2016-01-06 | 2016-01-06 | 2317.6465 | 171.6981 | 310.2719 | 2016 | 1 | 1 | 1 |
|---|
删掉因计算中间数据而增加的列
data.drop(['year', 'month', "quarter", "week", "Unnamed: 0"], axis=1, inplace=True)
四:用计算得到的每季的最后一天数据为索引,生成一个全0的series
quarter_last_day_series = pd.Series(
[0 for _ in range(len(quarter_last_day))],
index=quarter_last_day
)
这个series的作用就是跟data进行内连接,从而达到了取出相应日期数据的目的。
selected_by_quarter = pd.concat(
[quarter_last_day_series, data],
axis=1, join='inner'
).drop(0, axis=1)
selected_by_quarter.head(3)
| 000016.SH | CBA02501.CS | 000832.CSI |
|---|
| 2016-03-31 | 2156.5410 | 172.8271 | 295.1833 |
|---|
| 2016-06-30 | 2122.6330 | 174.1138 | 282.8972 |
|---|
| 2016-09-30 | 2177.3524 | 176.9656 | 294.4042 |
|---|
同理就可以取出其它日期的数据
计算平均值
np.array(
selected_by_quarter.mean()
).reshape(3, -1)
array([[2805.834556],
[ 190.929684],
[ 328.764932]])
五:用pct_change()计算出百分比变化率
selected_by_quarter_pct = selected_by_quarter.pct_change().dropna()
selected_by_quarter_pct.head(3)
| 000016.SH | CBA02501.CS | 000832.CSI |
|---|
| 2016-06-30 | -0.015723 | 0.007445 | -0.041622 |
|---|
| 2016-09-30 | 0.025779 | 0.016379 | 0.040676 |
|---|
| 2016-12-30 | 0.050312 | -0.018757 | -0.037569 |
|---|
selected_by_quarter_pct.cov()
|
000016.SH | CBA02501.CS | 000832.CSI |
|---|
| 000016.SH | 0.006911 | -0.000556 | 0.001930 |
| CBA02501.CS | -0.000556 | 0.000156 | 0.000132 |
| 000832.CSI | 0.001930 | 0.000132 | 0.002710 |
用cov()计算协方差,并且用.values 转化为numpy的array格式
selected_by_quarter_pct.cov().values
array([[ 0.00691068, -0.00055623, 0.00193013],
[-0.00055623, 0.00015612, 0.00013195],
[ 0.00193013, 0.00013195, 0.00271026]])
selected_by_day_pct = data.pct_change().dropna()
selected_by_day_pct.cov().values
array([[ 1.45767216e-04, -1.84259700e-06, 5.16153366e-05],
[-1.84259700e-06, 9.07220432e-07, -1.19196633e-07],
[ 5.16153366e-05, -1.19196633e-07, 4.17346061e-05]])
六:用面向对象的方式重构代码
了解了全流程之后就用类封装数据和计算逻辑
import numpy as np
import pandas as pd
class RiskParity:
def __init__(self):
self.data = pd.read_excel("a1.xlsx")
self.data["year"] = self.data['Unnamed: 0'].apply(lambda x: x.year)
self.data['month'] = self.data['Unnamed: 0'].apply(lambda x: x.month)
self.data['week'] = self.data['Unnamed: 0'].apply(lambda x: x.week)
self.data['quarter'] = self.data['Unnamed: 0'].apply(lambda x: x.quarter)
self.data.index = pd.to_datetime(self.data["Unnamed: 0"])
def __grouped(self, freq):
return self.data['Unnamed: 0'].groupby([self.data['year'], self.data[f"{freq}"]])
def __last_day(self, freq):
return pd.to_datetime(self.__grouped(freq).max().to_list())
def __calculate(self, columns, freq):
# 按给定频率计算协方差和期望
if freq == "day":
selected_data_pct = self.data[columns].pct_change().dropna()
cov = selected_data_pct.cov().values
mean = np.array(selected_data_pct.mean()).reshape(len(columns), -1)
else:
last_day_series = pd.Series([0 for _ in range(len(self.__last_day(freq)))], index=self.__last_day(freq))
selected_data = pd.concat([last_day_series, self.data[columns]], axis=1, join='inner').drop(0, axis=1)
selected_data_pct = selected_data.pct_change().dropna()
cov = selected_data_pct.cov().values
mean = np.array(selected_data.mean()).reshape(len(columns), -1)
return cov, mean
def cov_and_mean
(self, columns: list, freq="day") -> np.array:
"""
columns : 你所需要计算显示的列名,例如 ["000016.SH", "CBA02501.CS", "000832.CSI"]
freq : 你所需要计算的频率,按天: day 按月: month 按周: week 按季: quarter
"""
freq_excepted = ("day", "month", "week", "quarter")
if freq not in freq_excepted:
print("请输入正确的freq")
return None
else:
return self.__calculate(columns, freq)
if __name__ == "__main__":
rp = RiskParity()
v, r = rp.cov_and_mean(["000016.SH", "CBA02501.CS", "000832.CSI"], freq="quarter")
print(v)
print(r)
七:总结
这个需求的难点主要有两个方面,其一是如何从不完整的日期中挑选出每周、每月、每季度的最后一天;其二则是在给定时间序列的情况下,在原数据中取出相应日期的数据。
第一个问题的解决思路是通过(年、月)、(年、季度)这种双重的分组聚合,将每月、每季度的数据进行分割,之后取出每个分组的最后一条数据。
第二个问题的解决思路是通过先生成以给定时间序列为index的任意一个series,然后用这个series进行内连接,以此就起到了筛选数据的效果
最后一点就是用面向对象的方式重构代码时,要明确需要为客户暴露哪些方法,哪些数据,并且选取相同逻辑的部分进行方法封装。