[推荐系统] 特征工程篇

Feature Engineering by Spark

Posted by Penistrong on March 7, 2021

推荐系统

特征工程

  • 特征工程就是要从用户$U$、物品$I$、场景$C$中提取出原始特征,并转换为推荐模型可以使用的输入向量$Vector$。
  • 那么如何分析要处理的特征?
    • 广义上讲,所有的特征都可以分为两大类:
      1. 类别、ID型特征: 无法用数字表征的信息可以看做是类别型特征,例如用户性别、电影分类标签、时间等
      2. 数值型特征: 可以用数字直接表示的特征,例如用户的年龄、点击量等

相关依赖及格式

数据集$movies.csv$格式

\[\begin{array}{c|c|c} \hline \text{movieId} & \text{title} & \text{genres} \\ \hline 1 & \text{Toy Story(1995)} & Adventure|Animation|Children|Comedy|Fantasy \\ 2 & \text{Heat (1995)} & Action|Crime|Thriller \\ 3 & \text{Casino (1995)} & Crime|Drama \\ \cdots & \cdots &\cdots \\ \hline \end{array}\]

数据集$ratings.csv$格式

\[\begin{array}{|c|c|c|c|} \hline \text{userId} & \text{movieId} & \text{rating} & \text{timestamp} \\ \hline 1 & 2 & 3.5 & 1112486027 \\ \hline 1 & 29 & 3.5 & 1112484676 \\ \hline 2 & 3 & 4.0 & 974820889 \\ \hline \end{array}\]

数据集$links.csv$格式

\[\begin{array}{c|c|c} \hline \text{movieId} & \text{imdbId} & \text{tmdbId} \\ \hline 1 & 114709 & 862 \\ 2 & 113497 & 8844 \\ 3 & 113228 & 15602 \\ \cdots & \cdots &\cdots \\ \hline \end{array}\]

数据集$tags.csv$格式

\[\begin{array}{c|c|c|c} \hline \text{userId} & \text{movieId} & \text{tag} & \text{timestamp} \\ \hline 18 & 4141 & \text{Mark Waters} & 1240597180 \\ 65 & 27803 & \text{Oscar (Best Foreign Language Film)} & 1305008715 \\ 121 & 778 & \text{Nudity (Full Frontal - Notable)} & 1300852839 \\ \cdots & \cdots &\cdots & \cdots \\ \hline \end{array}\]

使用到的pyspark的模块

from pyspark import SparkConf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, QuantileDiscretizer, MinMaxScaler, StringIndexerModel
from pyspark.ml.linalg import VectorUDT, Vectors
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

处理类别型特征

使用One-hot编码

  • One-hot编码又称独热编码,是将类别、ID型特征转换为数值向量的一种最典型的编码方式。

  • 对输入的所有类别信息进行汇总,每个类别一个维度,将其他维度置0而单独将当前类别或ID对应的维度置1,生成特征向量。
    • 一周内的星期三,其特征向量

      \[\begin{bmatrix} 0 & 0 & 1 & 0 & 0 & 0 & 0 \end{bmatrix} \\ Weekday=Wednesday\]
    • 男性(不考虑LGBT的情况下)的特征向量

      \[\begin{bmatrix} 0 & 1 \end{bmatrix} \\ Gender=Male\]
    • 中国某一城市比如九江的特征向量

      \[\begin{bmatrix} 0 & \cdots & 0 & 1 & 0 & \cdots & 0 \end{bmatrix} \\ City=JiuJiang\]
  • 除此之外,ID型特征也经常用于One-hot编码。假设数据中共9000部电影,每部电影的ID为主码,从0开始编号到9000。如果用户U观看过电影M,这一用户行为对于电影推荐系统来说极为重要,使用One-hot编码将电影M的ID(假设是第360部),那么这个行为可以用一个9000维的向量表示,让第360维的元素为1,其余皆为0即可。
    • 用户U在推荐系统的电影集中只看过编号ID为360的电影M

      \[\begin{bmatrix} 0 & \cdots & 0 & 1 & 0 & \cdots & 0 & \cdots & 0 \end{bmatrix} \\ Behaviour=User \ watched \ Movie \ M(ID=360 \ of \ 9000)\]
  • 使用pyspark将电影数据集中的ID特征转化为One-hot Vector

      # 使用One-hot编码将类别、ID型特征转换为特征向量
      # 独热编码,指某个特征是排他的
      def oneHotEncoderExample(movieSamples):
          samplesWithIdNumber = movieSamples.withColumn("movieIdNumber", F.col("movieId").cast(IntegerType()))
          encoder = OneHotEncoder(inputCols=["movieIdNumber"], outputCols=['movieIdVector'], dropLast=False)
          #先fit进行预处理,尔后使用transform将原始特征转换为One-hot特征
          oneHotEncoderSamples = encoder.fit(samplesWithIdNumber).transform(samplesWithIdNumber)
          oneHotEncoderSamples.printSchema()
          oneHotEncoderSamples.show(10)
    
      # One-hot处理结果
      +-------+--------------------+--------------------+-------------+-----------------+
      |movieId|               title|              genres|movieIdNumber|    movieIdVector|
      +-------+--------------------+--------------------+-------------+-----------------+
      |      1|    Toy Story (1995)|Adventure|Animati...|            1| (1001,[1],[1.0])|
      |      2|      Jumanji (1995)|Adventure|Childre...|            2| (1001,[2],[1.0])|
      |      3|Grumpier Old Men ...|      Comedy|Romance|            3| (1001,[3],[1.0])|
      |      4|Waiting to Exhale...|Comedy|Drama|Romance|            4| (1001,[4],[1.0])|
      |      5|Father of the Bri...|              Comedy|            5| (1001,[5],[1.0])|
      |      6|         Heat (1995)|Action|Crime|Thri...|            6| (1001,[6],[1.0])|
      |      7|      Sabrina (1995)|      Comedy|Romance|            7| (1001,[7],[1.0])|
      |      8| Tom and Huck (1995)|  Adventure|Children|            8| (1001,[8],[1.0])|
      |      9| Sudden Death (1995)|              Action|            9| (1001,[9],[1.0])|
      |     10|    GoldenEye (1995)|Action|Adventure|...|           10|(1001,[10],[1.0])|
      +-------+--------------------+--------------------+-------------+-----------------+
    

使用Multi-hot编码

  • 由One-hot编码的思想自然可以衍生至Multi-hot编码(多热编码),比如在分类标签、历史行为序列等特征方面,用户会与多个物品产生交互行为。在电影数据集中,每个电影有多个Genre,可以使用Multi-hot完成每部电影的多分类标签到向量的转换

      # 将数组转换为向量
      def array2vec(genreIndexes, indexSize):
          genreIndexes.sort()
          fill_list = [1.0 for _ in range(len(genreIndexes))]
          return Vectors.sparse(indexSize, genreIndexes, fill_list)
    
      # 使用Multi-hot编码,对物品生成多个标签
      # 对movies.csv数据集中的电影分类,刚好使用多热编码,因为每个电影有多种分类
      def multiHotEncoderExample(movieSamples):
          samplesWithGenres = movieSamples.select("movieId", "title", explode(split(F.col("genres"), "\\|").cast(ArrayType(StringType()))).alias("genre"))
          genreIndexer = StringIndexer(inputCol="genre", outputCol="genreIndex")
          StringIndexerModel = genreIndexer.fit(samplesWithGenres)
          genreIndexSamples = StringIndexerModel.transform(samplesWithGenres).withColumn("genreIndexInt", F.col("genreIndex").cast(IntegerType()))
    
          indexSize = genreIndexSamples.agg(max(F.col("genreIndexInt"))).head()[0] + 1
          processedSamples = genreIndexSamples.groupBy("movieId").agg(F.collect_list("genreIndexInt").alias('genreIndexes')).withColumn("IndexSize", F.lit(indexSize))
          finalSample = processedSamples.withColumn("vector", udf(array2vec, VectorUDT())(F.col("genreIndexes"), F.col("indexSize")))
    
          finalSample.printSchema()
          finalSample.show(10)
    
      # Multi-hot处理结果
      +-------+------------+---------+--------------------+
      |movieId|genreIndexes|IndexSize|              vector|
      +-------+------------+---------+--------------------+
      |    296|[1, 5, 0, 3]|       19|(19,[0,1,3,5],[1....|
      |    467|         [1]|       19|      (19,[1],[1.0])|
      |    675|   [4, 0, 3]|       19|(19,[0,3,4],[1.0,...|
      |    691|      [1, 2]|       19|(19,[1,2],[1.0,1.0])|
      |    829| [1, 10, 14]|       19|(19,[1,10,14],[1....|
      |    125|         [1]|       19|      (19,[1],[1.0])|
      |    451|   [0, 8, 2]|       19|(19,[0,2,8],[1.0,...|
      |    800|  [0, 8, 16]|       19|(19,[0,8,16],[1.0...|
      |    853|         [0]|       19|      (19,[0],[1.0])|
      |    944|         [0]|       19|      (19,[0],[1.0])|
      +-------+------------+---------+--------------------+
    

处理数值型特征

数值型特征不能因为其本身是数字而将其直接放入特征向量。要从特征的两个方面讨论问题:

  • 特征的尺度。即特征的取值范围,比如电影的平均评分$f_s$在$[0,1000]$之间,而电影的评价次数$f_r$却是一种无上限的特征。
  • 特征的分布。上述提到的$f_r$与$f_s$两特征的尺度差距过大,如果直接将特征的原始数值直接输入推荐模型,就会导致两特征对于模型影响程度地显著区别。如果不做处理,$f_r$由于波动范围很大,可能会完全掩盖$f_s$的作用。因此我们希望将它们的尺度放在同一区域内,比如$[0, 1]$,此即归一化。

归一化(Normalization)

  • 归一化只能解决特征取值范围不统一的问题,可以理解为按比例缩小,但其结构没有改变,仍然无法改变特征分布。
  • 在一般的电影数据集中,人们的评分一般趋于中庸偏上,在3.5/5分聚集了大量评分。这对模型的学习而言也不是好的现象,因为特征的区分度很低。

分桶(Bucketing)

  • 使用分桶来解决特征值分布极不均匀的情况,将样本按某特征值自高到低排序,按照给定桶的数量找到分位数,将样本分拣到桶中,最后将桶ID作为特征值

使用Spark MLlib

  • 归一化使用MinMaxScaler,分桶使用QuantileDiscretizer。代码如下:

      def ratingFeatures(ratingSamples):
          ratingSamples.printSchema()
          ratingSamples.show()
          # calculate average movie rating score and rating count
          movieFeatures = ratingSamples.groupBy('movieId').agg(F.count(F.lit(1)).alias('ratingCount'),
                                                              F.avg("rating").alias("avgRating"),
                                                              F.variance('rating').alias('ratingVar')) \
              .withColumn('avgRatingVec', udf(lambda x: Vectors.dense(x), VectorUDT())('avgRating'))
          movieFeatures.show(10)
          # bucketing
          ratingCountDiscretizer = QuantileDiscretizer(numBuckets=100, inputCol="ratingCount", outputCol="ratingCountBucket")
          # Normalization
          ratingScaler = MinMaxScaler(inputCol="avgRatingVec", outputCol="scaleAvgRating")
          pipelineStage = [ratingCountDiscretizer, ratingScaler]
          featurePipeline = Pipeline(stages=pipelineStage)
          movieProcessedFeatures = featurePipeline.fit(movieFeatures).transform(movieFeatures)
          movieProcessedFeatures.show(10)
    
      # 电影特征初步处理提取。只显示前10行,下同。
      +-------+-----------+------------------+------------------+--------------------+
      |movieId|ratingCount|         avgRating|         ratingVar|        avgRatingVec|
      +-------+-----------+------------------+------------------+--------------------+
      |    296|      14616| 4.165606185002737|0.9615737413069363| [4.165606185002737]|
      |    467|        174|3.4367816091954024|1.5075410271742742|[3.4367816091954024]|
      |    829|        402|2.6243781094527363|1.4982072182727266|[2.6243781094527363]|
      |    691|        254|3.1161417322834644|1.0842838691606236|[3.1161417322834644]|
      |    675|          6|2.3333333333333335|0.6666666666666667|[2.3333333333333335]|
      |    125|        788| 3.713197969543147|0.8598255922703314| [3.713197969543147]|
      |    800|       1609|4.0447482908638905|0.8325734596130598|[4.0447482908638905]|
      |    944|        259|3.8262548262548264|0.8534165394630507|[3.8262548262548264]|
      |    853|         20|               3.5| 1.526315789473684|               [3.5]|
      |    451|        159|  3.00314465408805|0.7800533397022527|  [3.00314465408805]|
      +-------+-----------+------------------+------------------+--------------------+
    
      # 归一化与分桶的结果
      +-------+-----------+------------------+------------------+--------------------+-----------------+--------------------+
      |movieId|ratingCount|         avgRating|         ratingVar|        avgRatingVec|ratingCountBucket|      scaleAvgRating|
      +-------+-----------+------------------+------------------+--------------------+-----------------+--------------------+
      |    296|      14616| 4.165606185002737|0.9615737413069363| [4.165606185002737]|             99.0|[0.9170998054196596]|
      |    467|        174|3.4367816091954024|1.5075410271742742|[3.4367816091954024]|             38.0|[0.7059538707722662]|
      |    829|        402|2.6243781094527363|1.4982072182727266|[2.6243781094527363]|             54.0|[0.4705944962973248]|
      |    691|        254|3.1161417322834644|1.0842838691606236|[3.1161417322834644]|             45.0|[0.6130620985364005]|
      |    675|          6|2.3333333333333335|0.6666666666666667|[2.3333333333333335]|              4.0|[0.38627664627161...|
      |    125|        788| 3.713197969543147|0.8598255922703314| [3.713197969543147]|             67.0|[0.7860337592595664]|
      |    800|       1609|4.0447482908638905|0.8325734596130598|[4.0447482908638905]|             79.0|[0.8820863689021069]|
      |    944|        259|3.8262548262548264|0.8534165394630507|[3.8262548262548264]|             46.0|[0.8187871768460151]|
      |    853|         20|               3.5| 1.526315789473684|               [3.5]|             12.0|[0.7242687117592825]|
      |    451|        159|  3.00314465408805|0.7800533397022527|  [3.00314465408805]|             37.0|[0.5803259992335382]|
      +-------+-----------+------------------+------------------+--------------------+-----------------+--------------------+
    

More

  • 对于数值型特征的处理方法远不止这些。就电影推荐而言,在处理用户的观看时间间隔和视频曝光量这两种特征时,由于不便确定桶的数量,可以使用归一化将它们处理之后,使用特征值平方、开方生成新的特征。主要是为了改变特征的分布规律,尤其是当人工经验难以判断哪种处理方式更好时,可以将这些新特征都输入模型,交由模型选择。

小结

特征处理没有标准答案,诸多深度学习模型或多或少都在特征处理上提出新的概念和方法。比如阿里的DIN(深度兴趣网络[^1])中,他们认为在学习特征之间的非线性关系时如果是用压缩到固定长度的向量表示时会丢失用户的多样化兴趣。他们提出了local activation(局部激活)的概念,自适应地学习用户兴趣的表示,大大提高了模型的表达能力。 [^1]:Zhou, G., “Deep Interest Network for Click-Through Rate Prediction”, arXiv:1706.06978v2, 2017. https://arxiv.org/pdf/1706.06978v2.pdf

\[\textrm{实践是检验真理的唯一标准}\]