qfdk's Blog

黑科技自留地...

Apache Spark 使用 Pipeline 和 LDA 模型

简介

离开之前的公司快4个月了,现在整理一下以前干的东西.算是扔出一点儿干货吧 :) 之前的博客发表过一篇了,这里是技术博客,技术博客!

之前没有专门研究过大数据和机器学习,但是也做了不少东西都是现学现卖,成果还是可以的. 语言是自己认为比较反人类的语言 Scala.

工作环境:

  • Scala IED
  • Scala 2.10.6
  • Apache Spark 1.6.1
  • Apache Zepplin

这些版本要对应起来,要不然吃不了兜着走。这里集群的管理工具是ambari, 这个工具可以让你轻松的管理集群.

2016-09-20-19-18-43.jpg

这里我们用的是 Spark on Yarn 模式,其中进行提交任务又有两种模式,这里简单的说一句,这两种模式分别为:

  • yarn-cluster : driver运行在container之中,所有的日志本地看不到,只能看到running的信息。

  • yarn-client : drivier运行在client中,可以看到所有的日志,测试比较方便.

机器学习分为两个比较大的步骤: 收集预处理数据;进行学习

收集预处理数据

这里当然选择到处都看得到的数据了,比如法国政府的 一些数据,https://www.data.gouv.fr/fr/datasets/boamp/
这里好多数据,这里挑选他们的招标信息。这里选择2016年的,进行处理和学习。文件是tgz格式的,利用命令解压,最后得到一堆xml文件,
对于这些文件可以利用 https://github.com/databricks/spark-xml 进行处理,处理方式很简单。

命令方式引用外部库

$SPARK_HOME/bin/spark-shell --packages com.databricks:spark-xml_2.10:0.4.0

简单的例子

# 下载xml文件
$ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
import org.apache.spark.sql.SQLContext

val sqlContext = new SQLContext(sc)
val df = sqlContext.read
    .format("com.databricks.spark.xml")
    .option("rowTag", "book")
    .load("books.xml")

val selectedData = df.select("author", "_id")

这里了选择相应的表情进行选择,这里我选择“OBJET_COMPLET”和"LIBELLE",当然路径可以是hdfs的路径,最后你会得到一个DataFrame的类型.

2016-09-20-19-35-12.jpg

DataFrame 简单理解就是一个数据表,这个数据表中包含着各种数据,错别字等等都会在这里找到,还有null的情况,这些都是要处理掉然后进行学习,

学习的目的是找到对应的TOPIC也就是主题。就直接上代码吧,建立一个scala的类。

class LDAml() {

  def lda(dataset: DataFrame, sc: SparkContext, inputCol: String,
          numbTopic: Int, MaxIterations: Int,
          vocabSize: Int) = {

    val (documents, vocabArray, model) = preprocess(dataset, inputCol, sc, vocabSize)
    val corpus = documents.cache() // use cache
    val corpusSize = corpus.count()
    /**
     * Configure and run LDA
     */
    val mbf = {
      // add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
      2.0 / MaxIterations + 1.0 / corpusSize
    }
    // running lda
    val lda = new LDA()
      .setK(numbTopic)
      .setMaxIterations(MaxIterations)
      .setOptimizer(new OnlineLDAOptimizer().setMiniBatchFraction(math.min(1.0, mbf))) //add optimizer
      .setDocConcentration(-1) // use default symmetric document-topic prior
      .setTopicConcentration(-1) // use default symmetric topic-word prior

    /**
     * Print results.
     */
    val startTime = System.nanoTime()
    val ldaModel = lda.run(corpus)
    val elapsed = (System.nanoTime() - startTime) / 1e9

    ldaModel.save(sc, sc.getConf.get("spark.client.ldamodelPath"))

    /************************************************************************
     * Print results. for Zeppelin
     ************************************************************************/
    // Print training time
    println(s"Finished training LDA model.  Summary:")
    println(s"Training time (sec)\t$elapsed")
    println(s"==========")

    // Print the topics, showing the top-weighted terms for each topic.
    val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 5)
    val topics = topicIndices.map {
      case (terms, termWeights) =>
        terms.map(vocabArray(_)).zip(termWeights)
    }
    println(s"$numbTopic topics:")
    topics.zipWithIndex.foreach {
      case (topic, i) =>
        println(s"TOPIC $i")
        topic.foreach { case (term, weight) => println(s"$term\t$weight") }
        println(s"==========")
    }
  }

  def preprocess(dataset: DataFrame, inputCol: String, sc: SparkContext, vocabSize: Int): (RDD[(Long, Vector)], Array[String], PipelineModel) = {
    val stopWordText1 = sc.textFile(sc.getConf.get("spark.client.stopWordText")).collect().flatMap(_.stripMargin.split("\\s+"))
    val stopWordText2 = sc.textFile(sc.getConf.get("spark.client.stopWordText2")).collect().flatMap(_.stripMargin.split("\\s+"))
    val data = dataset.na.drop()
    // ----------------Pipeline stages---------------------------------------------
    // - tokenizer-->stopWordsRemover1-->stemmer-->stopWordRemover2-->AccentRemover
    // 简单分词->删除无用词汇->词根->删除无用词汇->删除重音符号
    // ----------------------------------------------------------------------------
    val tokenizer = new RegexTokenizer()
      .setInputCol(inputCol)
      .setPattern("[a-z0-9éèêâîûùäüïëô]+")
      .setGaps(false)
      .setOutputCol("rawTokens")

    val stopWordsRemover1 = new StopWordsRemover()
      .setInputCol("rawTokens")
      .setOutputCol("tokens")
    stopWordsRemover1.setStopWords(stopWordsRemover1.getStopWords ++ stopWordText1)

    val stemmer = new Stemmer()
      .setInputCol("tokens")
      .setOutputCol("stemmed")
      .setLanguage("French")

    val stopWordsRemover2 = new StopWordsRemover()
      .setInputCol("stemmed")
      .setOutputCol("tokens2")
    stopWordsRemover2.setStopWords(stopWordsRemover2.getStopWords ++ stopWordText2)

    val accentRemover = new AccentRemover()
      .setInputCol("tokens2")
      .setOutputCol("mot")

    val countVectorizer = new CountVectorizer()
      .setVocabSize(vocabSize)
      .setInputCol("mot")
      .setOutputCol("features")
    //------------------------------------------------------
    //stage 0,1,2,3,4,5
    val pipeline = new Pipeline().setStages(Array(
      tokenizer, //stage 0
      stopWordsRemover1, //1
      stemmer, //2
      stopWordsRemover2, //3
      accentRemover, //4
      countVectorizer //stage 5
      ))

    // creates the PipeLineModel to use for the dataset transformation
    val model = pipeline.fit(data)
    // countVectorizer stage ==> 5
    val vocabArray = model.stages(5).asInstanceOf[CountVectorizerModel].vocabulary
    sc.parallelize(vocabArray).saveAsTextFile(sc.getConf.get("spark.client.vocab"))
    val documents = model.transform(data)
      .select("features")
      .rdd
      .map { case Row(features: Vector) => features }
      .zipWithIndex()
      .map(_.swap)

    (documents, vocabArray, model)
}

}

最重要的步骤就是 tokenizer-->stopWordsRemover1-->stemmer-->stopWordRemover2-->AccentRemover

处理完成之后,需要transform成dataframe的类型。

这样就可以使用LDA里面的算法进行筛选TOPIC了,这里我选择100个TOPIC迭代100次,这很花时间的。

最后会打印出来。每个TOPIC都是不同的,LDAOptimizer 有两种算法,一种是em算法,一种是online算法,这里选择online算法,

因em算法的问题导致之前结果的TOPIC是重复的。

2016-09-20-19-45-43.jpg