# 举例:特征加工 from pyspark.ml.feature import VectorAssembler featuresCreator = VectorAssembler( inputCols=[col[0] for col in labels[2:]] + [encoder.getOutputCol()], outputCol='features' )
pipeline = Pipeline(stages=[encoder, featuresCreator, logistic]) # 特征编码,特征加工,载入LR模型 # 拟合模型 train, test = data.randomSplit([0.7,0.3],seed=123) model = pipeline.fit(train)
2.2 PySpark分布式机器学习原理
在分布式训练中,用于训练模型的工作负载会在多个微型处理器之间进行拆分和共享,这些处理器称为工作器节点,通过这些工作器节点并行工作以加速模型训练。分布式训练可用于传统的 ML 模型,但更适用于计算和时间密集型任务,如用于训练深度神经网络。分布式训练有两种主要类型:数据并行及模型并行,主要代表有Spark ML,Parameter Server和TensorFlow。
from pyspark.sql.functions import *# 数据基本信息分析 df.dtypes # Return df column names and data types df.show() #Display the content of df df.head() #Return first n rows df.first() #Return first row df.take(2) #Return the first n rows df.schema # Return the schema of df df.columns # Return the columns of df df.count() #Count the number of rows in df df.distinct().count() #Count the number of distinct rows in df df.printSchema() #Print the schema of df df.explain() #Print the (logical and physical) plans df.describe().show() #Compute summary statistics df.groupBy('Survived').agg(avg("Age"),avg("Fare")).show() # 聚合分析 df.select(df.Sex, df.Survived==1).show() # 带条件查询 df.sort("Age", ascending=False).collect() # 排序 # 特征加工 df = df.dropDuplicates() # 删除重复值 df = df.na.fill(value=0) # 缺失填充值 df = df.na.drop() # 或者删除缺失值 df = df.withColumn('isMale', when(df['Sex']=='male',1).otherwise(0)) # 新增列:性别0 1 df = df.drop('_c0','Name','Sex') # 删除姓名、性别、索引列
# 设定特征/标签列 from pyspark.ml.feature import VectorAssembler ignore=['Survived'] vectorAssembler = VectorAssembler(inputCols=[x for x in df.columns if x not in ignore], outputCol = 'features') new_df = vectorAssembler.transform(df) new_df = new_df.select(['features', 'Survived'])
# 划分测试集训练集 train, test = new_df.randomSplit([0.75, 0.25], seed = 12345)
# 模型训练 from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(featuresCol = 'features', labelCol='Survived') lr_model = lr.fit(test)
# 模型评估 from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions = lr_model.transform(test) auc = BinaryClassificationEvaluator().setLabelCol('Survived') print('AUC of the model:' + str(auc.evaluate(predictions))) print('features weights', lr_model.coefficientMatrix)