lda 不能预测新进文档吗 spark lda 迭代

1433人阅读
程序技术(28)
本文主要对使用Spark MLlib LDA进行主题预测时遇到的工程问题做一总结,列出其中的一些小坑,或可供读者借鉴。关于LDA模型训练可以参考:
开发环境:spark-1.5.2,hadoop-2.6.0,spark-1.5.2要求jdk7+。语料有大概70万篇博客,十亿+词汇量,词典大概有五万左右的词。
利用spark mllib LDA进行主题预测需要训练好的LDAModel以及词典,注意词典需要是训练LDAModel时所对应的词典,索引与词需要一一对应。使用时只需LocalLDAModel.load(sc, trained_model)即可。
注意,此处是将SparkContext嵌入一个独立的java程序中使用(on windows),而不是直接spark-submit。
// 加载模型
System.setProperty("hadoop.home.dir", hadoop_home_dir)
val conf = new SparkConf().
setAppName("Spark LDA Model").
setMaster(spark_master).
setJars(Array("target/xxx.jar")).
set("spark.driver.maxResultSize", "8g").
set("spark.executor.memory", "16g")
sc = new SparkContext(conf)
ldaModel = LocalLDAModel.load(sc, trained_model)
vocabArray = sc.textFile(lda_vocabulary).collect()
vocabArrayMap = vocabArray.zipWithIndex.toMap
// 预测新文档
def getKeyWords(docInWords: java.util.List[String],maxWordsTopic:Integer):JMap[String, Double] = {
val results
= new util.HashMap[String, Double]()
// 将输入文档转化为词频索引向量
val wc = new mutable.HashMap[Int, Int]()
docInWords.foreach { term =&
if (vocabArrayMap.contains(term)) {
val termIndex = vocabArrayMap(term)
wc(termIndex) = wc.getOrElse(termIndex, 0) + 1
// 与词典长度相同
val indices = wc.keys.toArray.sorted
val values = indices.map(i =& wc(i).toDouble)
val docVector = Vectors.sparse(vocabArrayMap.size, indices, values)
// 转换为RDD
val list = new ArrayBuffer[Vector]()
list += docVector
val documents = sc.parallelize(list).zipWithIndex.map(e =& (e._2, e._1))
// 预测文档在topic上的分布
val topicDists = ldaModel.topicDistributions(documents).collect
// 叠乘累加,得到文档到词的权重分布
topicDists.foreach{ case (t,vec) =& {
val wordArray = ldaModel.topics.multiply(vec).toArray
// 降序排列
val wordRdd = wordArray.zipWithIndex.sortBy(-_._1).take(maxWordsTopic)
// 从词典索引到实际词映射
val topWords = wordRdd.map { case (weight, index) =& (vocabArray(index.toInt), weight) }
topWords.foreach(e =& results.put(e._1, e._2))
System.setProperty(“hadoop.home.dir”, “c:\hadoop_home”)
下载winutils.exe,放于”c:\hadoop_home”。否则会有下列错误:“ERROR Shell: Failed to locate the winutils binary in the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.”
-XX:-UseSplitVerifier -Xmx3g -XX:MaxPermSize=128m
(intellj idea)设置”run-edit configuration-vm options”,对于jdk7可能需要”-XX:-UseSplitVerifier”。”-XX:MaxPermSize=128m”是为了预编译时设置固定不回收的内存,如果过小则有内存溢出错误,笔者误以为是spark内存不够,浪费了不少时间修改spark.executor.memory,然并卵。
–spark.driver.maxResultSize
根据模型大小灵活设置,小则内存溢出。
–spark.executor.memory
根据模型大小灵活设置,小则内存溢出。
setJars(Array(“target/xxx.jar”))
尽管是java独立程序,但是spark部分需远程执行,因此必须需要相关依赖文件,需要指定java程序jar包路径。否则”org.apache.spark.SparkException: Job aborted due to stage failure: Task from application”。注意在ide中调试执行的时候setJars(SparkContext.jarOfClass(this.getClass).toList)是不能解决此问题的。有时还可能需要fat jar,将所有依赖打进一个包,可以用maven-shade-plugin。此处问题具体见参考。
–预测结果
预测新文档主题分布:ldaModel.topicDistributions(newDocument)
新文档主题分布与各主题在词上分布叠乘相加:
ldaModel.topics.multiply(newDocument)
最后得到新文档在词典上的权重分布,排序输出即可找到新文档的top 关键词
推测新文档的时候,spark还需要一定时间计算,耗时可能达到10秒,感觉在线上实用还有距离,当然也有可能是训练模型问题。
不知道我的理解对不对,LDA最开始用来对大量文本聚类,现在也有用来主题词抽取。聚类实际上是一个粗粒度的分类,而关键词抽取则是个性化(具体到一篇文章),所以效果稍差,应该是有很多理解和实践不到位的地方,所以还要进一步研究,也欢迎大家给些建议。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:46606次
排名:千里之外
原创:27篇
转载:17篇
评论:20条
(1)(2)(2)(1)(1)(2)(1)(14)(2)(2)(5)(1)(2)(3)(2)(2)(1)(1)4195人阅读
主题模型(8)
spark(1)
关于LDA的理论部分,参考其他博客(链接待定),本文主要记录spark中LDA的实现。
spark1.4版本的LDA比较简单,下面主要是以翻译官网为主。
LDA是一个主题模型,它能够推理出一个文本文档集合的主题。LDA可以认为是一个聚类算法,原因如下:
主题对应聚类中心,文档对应数据集中的样本(数据行)
主题和文档都在一个特征空间中,其特征向量是词频向量
跟使用传统的距离来评估聚类不一样的是,LDA使用评估方式是一个函数,该函数基于文档如何生成的统计模型。
LDA以词频向量表示的文档集合作为输入,输出结果提供:
Topics:推断出的主题,每个主题是单词上的概率分布。
Topic distributions for documents:对训练集中的每个文档,LDA给一个在主题上的概率分布。
K:主题数量(或者说聚簇中心数量)
maxIterations:EM算法的最大迭代次数
docConcentration:文档在主题上分布的先验参数。当前必须大于1,值越大,推断出的分布越平滑。默认为-1,自动设置。
topicConcentration:主题在单词上的先验分布参数。当前必须大于1,值越大,推断出的分布越平滑。默认为-1,自动设置。
checkpointInterval:检查点间隔。maxIterations很大的时候,检查点可以帮助减少shuffle文件大小并且可以帮助故障恢复。
通过设置setOptimizer 函数,spark提供不同的推断算法。
EMLDAOptimizer 通过在likelihood函数上计算最大期望EM,提供较全面的结果。例如1.4版本想得到训练文档的主题分布,就只能使用这种算法。
OnlineLDAOptimizer 通过在小批量数据上迭代采样实现online变分推断,对于内存较友好。
注意:在1.4.*版本的Mllib中,LDA还不支持新文档的预测,另外也没有python的API。1.5版本提供了预测新版本的功能(效果还不确定)。
下面讨论1.5版本中的两种算法。
Expectation Maximization
EM算法主要在EMLDAOptimizer和DistributedLDAModel类中实现。
需要提供给算法的参数有:
docConcentration: 只支持对称先验,K维向量的值都相同,必须&1.0。向量-1表示默认,k维向量值为(50/k)+1。
topicConcentration: 只支持对称先验,值必须&1.0。向量-1表示默认。
maxIterations:最大迭代次数。
EMLDAOptimizer生成的结果为DistributedLDAModel类, 后者不仅存储了推断的主题,还有整个训练集的主题分布。DistributedLDAModel提供:
topTopicsPerDocument: 训练集中每篇文档权重最高的主题。
topDocumentsPerTopic: 每个主题中权重最高的文档以及对应权重。
logPrior: 根据模型分布计算的log probability。
logLikelihood: 根据训练集的模型分布计算的log likelihood。
topicDistributions: 训练集中每篇文档的主题分布,相当于theta。
topicsMatrix: 主题-词分布,相当于phi。
Online Variational Bayes
OnlineVB算法主要在OnlineLDAOptimizer和LocalLDAModel类中实现。
需要提供给算法的参数有:
docConcentration: 可以通过传递一个k维等价于Dirichlet参数的向量作为非对称先验。值应该&=0。向量-1表示默认,k维向量值取(1.0/k)。
topicConcentration: 只支持对称先验。值必须&=0。-1表示默认,取值为(1.0/k)。
maxIterations: 提交的最小批次的最大数量。
此外,OnlineLDAOptimizer 还接受下面的参数:
miniBatchFraction: 每次迭代时采样的部分语料,取值范围(0,1]。
optimizeDocConcentration: 默认false,是否在训练时优化docConcentration。
tau0 and kappa: 用于学习率的衰减,公式 (τ0+iter)的-κ次方,iter为当前迭代次数。tau0 默认1024,kappa默认0.51,取值范围(0.5, 1]。(我对onlineVB不太了解,参数的意义还需要再看论文。)
OnlineLDAOptimizer生成的结果为LocalLDAModel类, 只存储了推断的主题信息,提供的输出结果:
logLikelihood(documents): 模型评价指标,越大越好。
logPerplexity(documents): 模型评价指标,越小越好。
此外,在查看了1.5的API文档发现,LocalLDAModel不仅提供了上面两个结果,还有一个更重要的信息:
topicDistributions(documents: RDD[(Long, Vector)]):这也是1.5版本用来预测新文档的方法。DistributedLDAModel类中提供的该方法只存储了训练文档的主题分布,而LocalLDAModel的该方法需要传递以词频向量表示的文档,也可以是新文档,进而实现了预测。
1.5版本的API可以查看
下面的示例中,我们加载了用词向量表示的文本文件,然后用LDA训练三个主题。然后打印主题,用主题在词上的概率分布表示主题。
import org.apache.spark.mllib.clustering.LDA
import org.apache.spark.mllib.linalg.Vectors
// 输入的文件每行用词频向量表示一篇文档
val data = sc.textFile("data/mllib/sample_lda_data.txt")
val parsedData = data.map(s =& Vectors.dense(s.trim.split(' ').map(_.toDouble)))
val corpus = parsedData.zipWithIndex.map(_.swap).cache()
val ldaModel = new LDA().setK(3).run(corpus)
// 打印主题
println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):")
val topics = ldaModel.topicsMatrix
for (topic &- Range(0, 3)) {
print("Topic " + topic + ":")
for (word &- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic))
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:361185次
积分:3832
积分:3832
排名:第6765名
原创:110篇
转载:13篇
评论:106条
(1)(5)(1)(2)(4)(1)(6)(2)(8)(2)(9)(1)(1)(4)(4)(5)(5)(8)(45)(10)当前位置: &
6,039 次阅读 -
主题模型可以从一系列文章中自动推测讨论的主题。这些主题可以被用作总结和整理文章,也可以在机器学习流程的后期阶段用于特征化和降维。
在Spark 1.3中,MLlib现在支持最成功的主题模型之一,隐含狄利克雷分布(LDA)。LDA也是基于GraphX上构建的第一个MLlib算法。在这篇博文中,我们概述LDA和及其用例,并且解释GraphX是实现它最自然的方式。
抽象地说,主题模型旨在一系列文章中找到一种结构。学习到这种“结构”之后,一个主题模型能回答以下这样的问题:X文章讨论的是什么?X文章和Y文章有多相似?如果我对Z文章感兴趣,我应该先读哪些文章?
主题模型是一个比较广的领域。Spark 1.3加入了隐含狄利克雷分布(LDA),差不多是现今最成功的主题模型。最初被开发用于文本分析和群体遗传学,LDA之后被不断拓展,应用到从时间序列分析到图片分析等问题。首先,我们从文本分析的角度描述LDA。
什么是主题?主题不是LDA的输入,所以LDA必须要从纯文本中推断主题。LDA将主题定义为词的分布。例如,当我们在一个20个新闻组的文章数据集上运行MLlib的LDA,开始的几个主题是:
看下三个主题中的高权重词语,我们可以很快了解每个主题在说什么:运动,空间探索和电脑。LDA的成功很大程度上源自它产生可解释主题的能力。
除了推断出这些主题,LDA还可以推断每篇文章在主题上的分布。例如,X文章大概有60%在讨论“空间探索”,30%关于“电脑”,10%关于其他主题。
这些主题分布可以有多种用途:
聚类: 主题是聚类中心,文章和多个类簇(主题)关联。聚类对整理和总结文章集合很有帮助。
参看Blei教授和Lafferty教授对于Science杂志的文章生成的。点击一个主题,看到该主题下一系列文章。
特征生成:LDA可以生成特征供其他机器学习算法使用。如前所述,LDA为每一篇文章推断一个主题分布;K个主题即是K个数值特征。这些特征可以被用在像逻辑回归或者决策树这样的算法中用于预测任务。
降维:每篇文章在主题上的分布提供了一个文章的简洁总结。在这个降维了的特征空间中进行文章比较,比在原始的词汇的特征空间中更有意义。
在MLlib中使用LDA
我们给出一个使用LDA的小例子。我们在这儿描述这个过程,实际的代码在这个上。本例首先读取并预处理文章。预处理最重要的部分是选择词典。在本例中,我们将文本拆成词,之后去除(a)非字母词 (b)4个字符一下的短词 (c)最常见的20个词(停用词)。一般来说,在你自己的数据集上调整这个预处理步骤很重要。
我们运行LDA,使用10个主题和10轮迭代。根据你的数据集选择主题的数量很重要。其他参数设成默认,我们在Spark文档的Markdown文件(spark/docs/*.md)上训练LDA。
我们得到10个主题。下面是5个人工挑选出来的主题,每个主题配以最重要的5个词语。请注意每个主题有多么清晰地对应到Spark的一个组件!(打引号的主题标题是为了更清晰手动加的)
在Spark 1.3中LDA有Scala和Java的API。Python的API很快会加入。
实现:GraphX
有许多算法可以训练一个LDA模型。我们选择EM算法,因为它简单并且快速收敛。因为用EM训练LDA有一个潜在的图结构,在GraphX之上构建LDA是一个很自然的选择。
LDA主要有两类数据:词和文档。我们把这些数据存成一个偶图(如下所示),左边是词节点,右边是文档节点。每个词节点存储一些权重值,表示这个词语和哪个主题相关;类似的,每篇文章节点存储当前文章讨论主题的估计。
每当一个词出现在一篇文章中,图中就有一个边连接对应的词节点和文章节点。例如,在上图中,文章1包含词语“hockey” 和“system”
这些边也展示了这个算法的流通性。每轮迭代中,每个节点通过收集邻居数据来更新主题权重数据。下图中,文章2通过从连接的词节点收集数据来更新它的主题估计。
GraphX因此是LDA自然的选择。随着MLlib的成长,我们期望未来可以有更多图结构的学习算法!
LDA的并行化并不直观,已经有许多研究论文提出不同的策略来实现。关键问题是所有的方法都需要很大量的通讯。这在上图中很明显:词和文档需要在每轮迭代中用新数据更新相邻节点,而相邻节点太多了。
我们选择了EM算法的部分原因就是它通过很少轮的迭代就能收敛。更少的迭代,更少的通讯。
在Spark中加入LDA之前,我们在一个很大的Wikipedia数据集上做了测试。下面是一些数字:
训练集规模:460万文档
词典规模:110万词汇
训练集规模:11亿词(大约239词/文章)
16个 worker节点的EC2集群
计时结果:10轮迭代中平均176秒/迭代
接下来是?
Spark的贡献者正在开发更多LDA算法:在线变分贝叶斯(一个快速近似算法)和吉布斯采样(一个更慢但是有时更准确的算法)。我们也在增加帮助模块,例如用于自动数据准备的Tokenizers和更多预测方法。
想开始用LDA,今天下载
查看例子,了解API的细节,查看
LDA的开发是许多Spark贡献者的合作的结果:
通过这些综述学习更多关于主题模型和LDA的内容:
主题模型综述:
, 包含数学细节。
从这些研究论文中获得深入了解:
原始LDA论文
应用:文本分析
应用:群体遗传学分析
一篇清楚解释包含EM在内多个算法的文章: .
英文出处:
文章出处:http://www.multiprocess.net/8.html
注:转载文章均来自于公开网络,仅供学习使用,不会用于任何商业用途,如果侵犯到原作者的权益,请您与我们联系删除或者授权事宜,联系邮箱:contact@dataunion.org。转载数盟网站文章请注明原文章作者,否则产生的任何版权纠纷与数盟无关。
相关文章!
不用想啦,马上 发表自已的想法.
做最棒的数据科学社区
扫描二维码,加微信公众号
联系我们:2986人阅读
程序技术(28)
本文主要对使用Spark MLlib LDA进行主题抽取时遇到的工程问题做一总结,列出其中的一些小坑,或可供读者借鉴。关于LDA的具体理论等可以自行google。主题预测请参考:
开发环境:spark-1.5.2,hadoop-2.6.0,spark-1.5.2要求jdk7+。语料有大概70万篇博客,十亿+词汇量,词典大概有五万左右的词。
训练语料代码
package org.apache.spark.examples.mllib
import java.text.BreakIterator
import scala.collection.mutable
import scopt.OptionParser
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.clustering.{EMLDAOptimizer, OnlineLDAOptimizer, DistributedLDAModel, LDA}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
* An example Latent Dirichlet Allocation (LDA) app. Run with
* ./bin/run-example mllib.LDAExample [options] &input&
* If you use it as a template to create your own app, please use `spark-submit` to submit your app.
object LDAExample {
private case class Params(
input: Seq[String] = Seq.empty,
k: Int = 20,
maxIterations: Int = 10,
docConcentration: Double = -1,
topicConcentration: Double = -1,
vocabSize: Int = 10000,
stopwordFile: String = "",
algorithm: String = "em",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10) extends AbstractParams[Params]
def main(args: Array[String]) {
val defaultParams = Params()
val parser = new OptionParser[Params]("LDAExample") {
head("LDAExample: an example LDA app for plain text data.")
opt[Int]("k")
.text(s"number of topics. default: ${defaultParams.k}")
.action((x, c) =& c.copy(k = x))
opt[Int]("maxIterations")
.text(s"number of iterations of learning. default: ${defaultParams.maxIterations}")
.action((x, c) =& c.copy(maxIterations = x))
opt[Double]("docConcentration")
.text(s"amount of topic smoothing to use (& 1.0) (-1=auto)." +
default: ${defaultParams.docConcentration}")
.action((x, c) =& c.copy(docConcentration = x))
opt[Double]("topicConcentration")
.text(s"amount of term (word) smoothing to use (& 1.0) (-1=auto)." +
default: ${defaultParams.topicConcentration}")
.action((x, c) =& c.copy(topicConcentration = x))
opt[Int]("vocabSize")
.text(s"number of distinct word types to use, chosen by frequency. (-1=all)" +
default: ${defaultParams.vocabSize}")
.action((x, c) =& c.copy(vocabSize = x))
opt[String]("stopwordFile")
.text(s"filepath for a list of stopwords. Note: This must fit on a single machine." +
default: ${defaultParams.stopwordFile}")
.action((x, c) =& c.copy(stopwordFile = x))
opt[String]("algorithm")
.text(s"inference algorithm to use. em and online are supported." +
s" default: ${defaultParams.algorithm}")
.action((x, c) =& c.copy(algorithm = x))
opt[String]("checkpointDir")
.text(s"Directory for checkpointing intermediate results." +
Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
default: ${defaultParams.checkpointDir}")
.action((x, c) =& c.copy(checkpointDir = Some(x)))
opt[Int]("checkpointInterval")
.text(s"Iterations between each checkpoint.
Only used if checkpointDir is set." +
s" default: ${defaultParams.checkpointInterval}")
.action((x, c) =& c.copy(checkpointInterval = x))
arg[String]("&input&...")
.text("input paths (directories) to plain text corpora." +
Each text file line should hold 1 document.")
.unbounded()
.required()
.action((x, c) =& c.copy(input = c.input :+ x))
parser.parse(args, defaultParams).map { params =&
run(params)
}.getOrElse {
parser.showUsageAsError
sys.exit(1)
private def run(params: Params) {
val conf = new SparkConf().setAppName(s"LDAExample with $params")
val sc = new SparkContext(conf)
Logger.getRootLogger.setLevel(Level.WARN)
val preprocessStart = System.nanoTime()
val (corpus, vocabArray, actualNumTokens) =
preprocess(sc, params.input, params.vocabSize, params.stopwordFile)
corpus.cache()
val actualCorpusSize = corpus.count()
val actualVocabSize = vocabArray.size
val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9
println(s"Corpus summary:")
println(s"\t Training set size: $actualCorpusSize documents")
println(s"\t Vocabulary size: $actualVocabSize terms")
println(s"\t Training set size: $actualNumTokens tokens")
println(s"\t Preprocessing time: $preprocessElapsed sec")
val lda = new LDA()
val optimizer = params.algorithm.toLowerCase match {
case "em" =& new EMLDAOptimizer
case "online" =& new OnlineLDAOptimizer().setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize)
case _ =& throw new IllegalArgumentException(
s"Only em, online are supported but got ${params.algorithm}.")
lda.setOptimizer(optimizer)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDocConcentration(params.docConcentration)
.setTopicConcentration(params.topicConcentration)
.setCheckpointInterval(params.checkpointInterval)
if (params.checkpointDir.nonEmpty) {
sc.setCheckpointDir(params.checkpointDir.get)
val startTime = System.nanoTime()
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9
println(s"Finished training LDA model.
Summary:")
println(s"\t Training time: $elapsed sec")
if (ldaModel.isInstanceOf[DistributedLDAModel]) {
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
val avgLogLikelihood = distLDAModel.logLikelihood / actualCorpusSize.toDouble
println(s"\t Training data average log likelihood: $avgLogLikelihood")
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)
val topics = topicIndices.map { case (terms, termWeights) =&
terms.zip(termWeights).map { case (term, weight) =& (vocabArray(term.toInt), weight) }
println(s"${params.k} topics:")
topics.zipWithIndex.foreach { case (topic, i) =&
println(s"TOPIC $i")
topic.foreach { case (term, weight) =&
println(s"$term\t$weight")
* Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
(corpus, vocabulary as array, total token count in corpus)
private def preprocess(
sc: SparkContext,
paths: Seq[String],
vocabSize: Int,
stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {
val textRDD: RDD[String] = sc.textFile(paths.mkString(","))
val tokenizer = new SimpleTokenizer(sc, stopwordFile)
val tokenized: RDD[(Long, IndexedSeq[String])] = textRDD.zipWithIndex().map { case (text, id) =&
id -& tokenizer.getWords(text)
tokenized.cache()
val wordCounts: RDD[(String, Long)] = tokenized
.flatMap { case (_, tokens) =& tokens.map(_ -& 1L) }
.reduceByKey(_ + _)
wordCounts.cache()
val fullVocabSize = wordCounts.count()
val (vocab: Map[String, Int], selectedTokenCount: Long) = {
val tmpSortedWC: Array[(String, Long)] = if (vocabSize == -1 || fullVocabSize &= vocabSize) {
wordCounts.collect().sortBy(-_._2)
wordCounts.sortBy(_._2, ascending = false).take(vocabSize)
(tmpSortedWC.map(_._1).zipWithIndex.toMap, tmpSortedWC.map(_._2).sum)
val documents = tokenized.map { case (id, tokens) =&
val wc = new mutable.HashMap[Int, Int]()
tokens.foreach { term =&
if (vocab.contains(term)) {
val termIndex = vocab(term)
wc(termIndex) = wc.getOrElse(termIndex, 0) + 1
val indices = wc.keys.toArray.sorted
val values = indices.map(i =& wc(i).toDouble)
val sb = Vectors.sparse(vocab.size, indices, values)
val vocabArray = new Array[String](vocab.size)
vocab.foreach { case (term, i) =& vocabArray(i) = term }
(documents, vocabArray, selectedTokenCount)
* Simple Tokenizer.
* TODO: Formalize the interface, and make this a public class in mllib.feature
private class SimpleTokenizer(sc: SparkContext, stopwordFile: String) extends Serializable {
private val stopwords: Set[String] = if (stopwordFile.isEmpty) {
Set.empty[String]
val stopwordText = sc.textFile(stopwordFile).collect()
stopwordText.flatMap(_.stripMargin.split("\\s+")).toSet
private val allWordRegex = "^(\\p{L}*)$".r
private val minWordLength = 3
def getWords(text: String): IndexedSeq[String] = {
val words = new mutable.ArrayBuffer[String]()
val wb = BreakIterator.getWordInstance
wb.setText(text)
var current = wb.first()
var end = wb.next()
while (end != BreakIterator.DONE) {
val word: String = text.substring(current, end).toLowerCase
word match {
case allWordRegex(w) if w.length &= minWordLength && !stopwords.contains(w) =&
words += w
current = end
end = wb.next()
case e: Exception =&
end = BreakIterator.DONE
执行命令:
spark-submit
–class “LDAExample”
–master local[*]
–driver-memory 32g
target/pack/lib/project.jar
“file:/tmp/documents”
–stopwordFile “file:/tmp/stopwords”
–algorithm online
–maxIterations 50
–vocabSize 50000
代码使用sbt 编译,然后提交到spark执行,所以需要打包程序所有依赖
–driver-memory
由于在master处指定了local[*] ,所以此处需要根据训练样本大小设置该参数,否则会内存溢出,如果是yarn或者mesos,则改为设置executor-memory。
–stopwordFile
可以先训练出词典,然后剔除其中不要的词,放入stopwordFile即可,词典对于最终的topic影响很大,所以尽量剔除干扰词。
topic数量,越大则对内存要求越大,执行时长也相应增大
–algorithm
当前支持em和online两种,前者训练出来的是DistributedLDAModel,包含丰富的样本信息,但目前不能直接预测新文档(可以调用toLocal转换为LocalLDAModel)。后者是LocalLDAModel,可以用来预测新文档。online是后来加入的算法,性能更好。gibbs sampling 可能后续推出
–maxIterations
越大则内存和时长越大
–vocabSize
词典最大包含词数
maxResultSize
在程序中设定,存储处理结果,样本数量比较大的话,默认内存是不够的。
SparkConf().set(“spark.driver.maxResultSize”, “5g”)
–docConcentration and topicConcentration
前者为文档对主题的先验概率,后者为主体对词的先验概率,默认为-1,则系统自动赋值。见参考4
docConcentration赋值
* Optimizer-specific parameter settings:
- Value should be & 1.0
- default = (50 / k) + 1, where 50/k is common in LDA libraries and +1 follows
Asuncion et al. (2009), who recommend a +1 adjustment for EM.
- Value should be &= 0
- default = (1.0 / k), following the implementation from
topicConcentration赋值
* Optimizer-specific parameter settings:
- Value should be & 1.0
- default = 0.1 + 1, where 0.1 gives a small amount of smoothing and +1 follows
Asuncion et al. (2009), who recommend a +1 adjustment for EM.
- Value should be &= 0
- default = (1.0 / k), following the implementation from
文档预处理
注意训练集每行是一个源文档。SimpleTokenizer 将每行切分为词组,在此处可以通过stopwordFile来过滤词组。在训练集预处理函数preprocess中,wordCounts包含训练集中所有的词及其词频,可理解为map,并且被倒序排序,然后取vocabSize个词作为词典。将词典输出,高频词在前,可以将其中的干扰词或者不重要的词放入stopwordFile,这样反复训练几次,词典的质量就会比较高。参考1和2中训练了维基百科中500万篇文档,最后取词也就一万左右,词典质量越高,topic质量也就越高。
训练结束,可以在模型上调用save方法保存模型,已备后续使用.
通过训练模型,可以查看不同topic在词典上的分布,以及训练样本的主题分布.
LocalLDAModel包含了topicsMatrix, 是一个vocabSize x k 矩阵.实际上给出了k个主题在词典上的分布.此处矩阵只存储了单词的索引,所以后续使用的话,需要自己保存词典,并且确保索引与该矩阵一致.在预处理训练样本的时候,每篇文档都被处理成”词索引&-&词频”向量.
describeTopics(maxTermsPerTopic: Int)可以指定每个topic返回的词数量(已经按照权重降序排列),返回所有主题.
具体如何使用,用户可以参考spark 中LocalLDAModel和DistributedLDAModel的api文档。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:46606次
排名:千里之外
原创:27篇
转载:17篇
评论:20条
(1)(2)(2)(1)(1)(2)(1)(14)(2)(2)(5)(1)(2)(3)(2)(2)(1)(1)}

我要回帖

更多关于 spark mllib lda 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信