社区所有版块导航
Python
python开源   Django   Python   DjangoApp   pycharm  
DATA
docker   Elasticsearch  
aigc
aigc   chatgpt  
WEB开发
linux   MongoDB   Redis   DATABASE   NGINX   其他Web框架   web工具   zookeeper   tornado   NoSql   Bootstrap   js   peewee   Git   bottle   IE   MQ   Jquery  
机器学习
机器学习算法  
Python88.com
反馈   公告   社区推广  
产品
短视频  
印度
印度  
Py学习  »  Python

案例,python金融时间序列数据分析

蚂蚁学Python • 4 年前 • 498 次点击  
今天遇到一个关于金融背景的时间序列数据分析计算的需求,其中的分析处理方案非常具有代表性,所以分享给大家。
数据大概长这个样子,其中Unnamed: 0这一列代表时间
import numpy as np
import pandas as pd
data = pd.read_excel("a1.xlsx")
data.head(3)

Unnamed: 0000016.SHCBA02501.CS000832.CSI
02016-01-042270.4609171.8353312.9428
12016-01-052288.1127171.5916309.5110
22016-01-062317.6465171.6981310.2719
客户需要按日、周、月、季度计算各列数据百分比变化率的协方差以及期望值。
例如:按季度计算数据百分比变化率=(本季度最后一天-上一个季度的最后一天)/上一个季度的最后一天。
核心问题就在于客户给的数据的时间列并不是完整的,需要先判断出哪一天是(周、月、季度)的最后一天,并且取出对应的数据。
接下来就分享我的解决方案。
一:通过Unnamed: 0这一列计算出某一日期的年份以及当年是第几个季度、第几个月、第几个周。
data["year"] = data['Unnamed: 0'].apply(lambda x: x.year)
data['month'] = data['Unnamed: 0'].apply(lambda x: x.month)
data['week'] = data['Unnamed: 0'].apply(lambda x: x.week)
data['quarter'] = data['Unnamed: 0'].apply(lambda x: x.quarter)
二:通过上面计算得到的数据,按year、month分组聚合,得到按每年每月分组聚合的数据。同理得到每年每季度、每年每周的数据。
month_grouped = data['Unnamed: 0'].groupby([data['year'], data['month']])
week_grouped = data['Unnamed: 0'].groupby([data['year'], data['week']])
quarter_grouped = data['Unnamed: 0'].groupby([data['year'], data['quarter']])
三:通过上面的分组聚合数据,用max()函数取出最后一天数据
month_last_day = pd.to_datetime(month_grouped.max().to_list())
week_last_day = pd.to_datetime(week_grouped.max().to_list())
quarter_last_day = pd.to_datetime(quarter_grouped.max().to_list())
把时间列设为index
data.index = pd.to_datetime(data["Unnamed: 0"])
data.head(3)

Unnamed: 0000016.SHCBA02501.CS000832.CSIyearmonthweekquarter
Unnamed: 0







2016-01-042016-01-042270.4609171.8353312.94282016111
2016-01-052016-01-052288.1127171.5916309.51102016111
2016-01-062016-01-062317.6465171.6981310.27192016111
删掉因计算中间数据而增加的列
data.drop(['year''month'"quarter""week""Unnamed: 0"], axis=1, inplace=True)
四:用计算得到的每季的最后一天数据为索引,生成一个全0的series
quarter_last_day_series = pd.Series(
 [0 for _ in range(len(quarter_last_day))], 
 index=quarter_last_day
)
这个series的作用就是跟data进行内连接,从而达到了取出相应日期数据的目的。



    
selected_by_quarter = pd.concat(
 [quarter_last_day_series, data], 
 axis=1, join='inner'
).drop(0, axis=1)
selected_by_quarter.head(3)

000016.SHCBA02501.CS000832.CSI
2016-03-312156.5410172.8271295.1833
2016-06-302122.6330174.1138282.8972
2016-09-302177.3524176.9656294.4042
同理就可以取出其它日期的数据
计算平均值
np.array(
 selected_by_quarter.mean()
).reshape(3-1)
array([[2805.834556],
       [ 190.929684],
       [ 328.764932]])
五:用pct_change()计算出百分比变化率
selected_by_quarter_pct = selected_by_quarter.pct_change().dropna()
selected_by_quarter_pct.head(3)

000016.SHCBA02501.CS000832.CSI
2016-06-30-0.0157230.007445-0.041622
2016-09-300.0257790.0163790.040676
2016-12-300.050312-0.018757-0.037569
selected_by_quarter_pct.cov()

000016.SHCBA02501.CS000832.CSI
000016.SH0.006911-0.0005560.001930
CBA02501.CS-0.0005560.0001560.000132
000832.CSI0.0019300.0001320.002710
用cov()计算协方差,并且用.values 转化为numpy的array格式
selected_by_quarter_pct.cov().values
array([[ 0.00691068-0.00055623,  0.00193013],
       [-0.00055623,  0.00015612,  0.00013195],
       [ 0.00193013,  0.00013195,  0.00271026]])
selected_by_day_pct = data.pct_change().dropna()
selected_by_day_pct.cov().values
array([[ 1.45767216e-04-1.84259700e-06,  5.16153366e-05],
       [-1.84259700e-06,  9.07220432e-07-1.19196633e-07],
       [ 5.16153366e-05-1.19196633e-07,  4.17346061e-05]])
六:用面向对象的方式重构代码
了解了全流程之后就用类封装数据和计算逻辑
import numpy as np
import pandas as pd


class RiskParity:
 def __init__(self):
  self.data = pd.read_excel("a1.xlsx")
  self.data["year"] = self.data['Unnamed: 0'].apply(lambda x: x.year)
  self.data['month'] = self.data['Unnamed: 0'].apply(lambda x: x.month)
  self.data['week'] = self.data['Unnamed: 0'].apply(lambda x: x.week)
  self.data['quarter'] = self.data['Unnamed: 0'].apply(lambda x: x.quarter)
  self.data.index = pd.to_datetime(self.data["Unnamed: 0"])

 def __grouped(self, freq):
  return self.data['Unnamed: 0'].groupby([self.data['year'], self.data[f"{freq}"]])

 def __last_day(self, freq):
  return pd.to_datetime(self.__grouped(freq).max().to_list())

 def __calculate(self, columns, freq):
  # 按给定频率计算协方差和期望
  if freq == "day":
   selected_data_pct = self.data[columns].pct_change().dropna()
   cov = selected_data_pct.cov().values
   mean = np.array(selected_data_pct.mean()).reshape(len(columns), -1)
  else:
   last_day_series = pd.Series([0 for _ in range(len(self.__last_day(freq)))], index=self.__last_day(freq))
   selected_data = pd.concat([last_day_series, self.data[columns]], axis=1, join='inner').drop(0, axis=1)
   selected_data_pct = selected_data.pct_change().dropna()
   cov = selected_data_pct.cov().values
   mean = np.array(selected_data.mean()).reshape(len(columns), -1)
  return cov, mean

 def cov_and_mean (self, columns: list, freq="day") -> np.array:
  """
   columns : 你所需要计算显示的列名,例如 ["000016.SH", "CBA02501.CS", "000832.CSI"]
   freq : 你所需要计算的频率,按天: day   按月: month   按周: week   按季: quarter
  """

  freq_excepted = ("day""month""week""quarter")
  if freq not in freq_excepted:
   print("请输入正确的freq")
   return None
  else:
   return self.__calculate(columns, freq)


if __name__ == "__main__":
 rp = RiskParity()
 v, r = rp.cov_and_mean(["000016.SH""CBA02501.CS""000832.CSI"], freq="quarter")
 print(v)
 print(r)
七:总结
这个需求的难点主要有两个方面,其一是如何从不完整的日期中挑选出每周、每月、每季度的最后一天;其二则是在给定时间序列的情况下,在原数据中取出相应日期的数据。
第一个问题的解决思路是通过(年、月)、(年、季度)这种双重的分组聚合,将每月、每季度的数据进行分割,之后取出每个分组的最后一条数据。
第二个问题的解决思路是通过先生成以给定时间序列为index的任意一个series,然后用这个series进行内连接,以此就起到了筛选数据的效果
最后一点就是用面向对象的方式重构代码时,要明确需要为客户暴露哪些方法,哪些数据,并且选取相同逻辑的部分进行方法封装。

  最后,推荐蚂蚁老师的Pandas 编程课程,推荐:

  


图片扫码购买,购买后加蚂蚁老师微信ant_learn_python答疑

Python社区是高质量的Python/Django开发社区
本文地址:http://www.python88.com/topic/126997