Flink实现PageRank算法

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: PageRank估计是很多面试场合上镜率比较高的吧,面试Spark的时候会被问到,最近flink热,估计也会被问到吧,浪尖就在这里帮大家解决这个疑难杂症。算法常见的原题是:pagerank的算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算。将每个页面的排序值初始化为1.0。在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。将每个页面的排序值设为0.15 +

PageRank估计是很多面试场合上镜率比较高的吧,面试Spark的时候会被问到,最近flink热,估计也会被问到吧,浪尖就在这里帮大家解决这个疑难杂症。

算法常见的原题是:

pagerank的算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算。

将每个页面的排序值初始化为1.0。
在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。
将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。
最后两个步骤会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank值。在实际操作中,收敛通常需要大约10轮迭代。
下面废话少说直接上案例吧。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

import static org.apache.flink.api.java.aggregation.Aggregations.SUM;

/**
算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;
另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算:
将每个页面的排序值初始化为1.0。
在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。
将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。
最后两个步骤会重复几个循环,在此过程中,算法会逐渐收敛于每个游戏拍卖平台的实际PageRank值。在实际操作中,收敛通常需要大约10轮迭代。
*/
@SuppressWarnings("serial")
public class PageRank {

private static final double DAMPENING_FACTOR = 0.85;
private static final double EPSILON = 0.0001;

// *
// PROGRAM
// *

public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);

final int numPages = params.getInt("numPages", PageRankData.getNumberOfPages());
final int maxIterations = params.getInt("iterations", 10);

// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// make the parameters available to the web ui
env.getConfig().setGlobalJobParameters(params);

// get input data
DataSet<Long> pagesInput = getPagesDataSet(env, params);
DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env, params);

// 初始化rank <pageID,rank>
DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
    map(new RankAssigner((1.0d / numPages)));

// 获取<pageID,NeighborsLinkList>
DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
    linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());

// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

DataSet<Tuple2<Long, Double>> newRanks = iteration
    // join pages with outgoing edges and distribute rank
    .join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
    // collect and sum ranks
    .groupBy(0).aggregate(SUM, 1)
    // apply dampening factor
    .map(new Dampener(DAMPENING_FACTOR, numPages));

DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
    newRanks,
    newRanks.join(iteration).where(0).equalTo(0)
    // termination condition
    .filter(new EpsilonFilter())); //自定义了一个终止条件

// emit result
if (params.has("output")) {
  finalPageRanks.writeAsCsv(params.get("output"), "\n", " ");
  // execute program
  env.execute("Basic Page Rank Example");
} else {
  System.out.println("Printing result to stdout. Use --output to specify output path.");
  finalPageRanks.print();
}

}

// *
// USER FUNCTIONS
// *

/**

  • A map function that assigns an initial rank to all pages.

*/
public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {

Tuple2<Long, Double> outPageWithRank;

public RankAssigner(double rank) {
  this.outPageWithRank = new Tuple2<Long, Double>(-1L, rank);
}

@Override
public Tuple2<Long, Double> map(Long page) {
  outPageWithRank.f0 = page;
  return outPageWithRank;
}

}

/**

  • A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
  • originate. Run as a pre-processing step.

*/
@ForwardedFields("0")
public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {

private final ArrayList<Long> neighbors = new ArrayList<Long>();

@Override
public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
  neighbors.clear();
  Long id = 0L;

  for (Tuple2<Long, Long> n : values) {
    id = n.f0;
    neighbors.add(n.f1);
  }
  out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
}

}

/**

  • Join function that distributes a fraction of a vertex's rank to all neighbors.

*/
public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {

@Override
public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){
  Long[] neighbors = value.f1.f1;
  double rank = value.f0.f1;
  double rankToDistribute = rank / ((double) neighbors.length);

  for (Long neighbor: neighbors) {
    out.collect(new Tuple2<Long, Double>(neighbor, rankToDistribute));
  }
}

}

/**

  • The function that applies the page rank dampening formula.

*/
@ForwardedFields("0")
public static final class Dampener implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {

private final double dampening;
private final double randomJump;

public Dampener(double dampening, double numVertices) {
  this.dampening = dampening;
  this.randomJump = (1 - dampening) / numVertices;
}

@Override
public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
  value.f1 = (value.f1 * dampening) + randomJump;
  return value;
}

}

/**

  • Filter that filters vertices where the rank difference is below a threshold.

*/
public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {

@Override
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
  return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
}

}

// *
// UTIL METHODS
// *

private static DataSet getPagesDataSet(ExecutionEnvironment env, ParameterTool params) {

if (params.has("pages")) {
  return env.readCsvFile(params.get("pages"))
    .fieldDelimiter(" ")
    .lineDelimiter("\n")
    .types(Long.class)
    .map(new MapFunction<Tuple1<Long>, Long>() {
      @Override
      public Long map(Tuple1<Long> v) {
        return v.f0;
      }
    });
} else {
  System.out.println("Executing PageRank example with default pages data set.");
  System.out.println("Use --pages to specify file input.");
  return PageRankData.getDefaultPagesDataSet(env);
}

}

private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env, ParameterTool params) {

if (params.has("links")) {
  return env.readCsvFile(params.get("links"))
    .fieldDelimiter(" ")
    .lineDelimiter("\n")
    .types(Long.class, Long.class);
} else {
  System.out.println("Executing PageRank example with default links data set.");
  System.out.println("Use --links to specify file input.");
  return PageRankData.getDefaultEdgeDataSet(env);
}

}
}

public class PageRankData {

public static final Object[][] EDGES = {

{1L, 2L},
{1L, 15L},
{2L, 3L},
{2L, 4L},
{2L, 5L},
{2L, 6L},
{2L, 7L},
{3L, 13L},
{4L, 2L},
{5L, 11L},
{5L, 12L},
{6L, 1L},
{6L, 7L},
{6L, 8L},
{7L, 1L},
{7L, 8L},
{8L, 1L},
{8L, 9L},
{8L, 10L},
{9L, 14L},
{9L, 1L},
{10L, 1L},
{10L, 13L},
{11L, 12L},
{11L, 1L},
{12L, 1L},
{13L, 14L},
{14L, 12L},
{15L, 1L},

};

private static int numPages = 15;

public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {

List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
for (Object[] e : EDGES) {
  edges.add(new Tuple2<Long, Long>((Long) e[0], (Long) e[1]));
}
return env.fromCollection(edges);

}

public static DataSet getDefaultPagesDataSet(ExecutionEnvironment env) {

return env.generateSequence(1, 15);

}

public static int getNumberOfPages() {

return numPages;

}

}
是不是很简单,大家可以自己搞个案例测试一下哦~

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
3月前
|
自然语言处理 算法 数据挖掘
【数据挖掘】十大算法之PageRank连接分析算法
文章介绍了PageRank算法的基本概念和数学模型,包括如何通过一阶马尔科夫链定义随机游走模型以及如何计算网页的重要性评分,并提供了PageRank迭代算法的具体步骤。
67 0
|
5月前
|
数据采集 自然语言处理 搜索推荐
心得经验总结:浅析PageRank算法
心得经验总结:浅析PageRank算法
46 0
|
6月前
|
算法 关系型数据库 MySQL
实时计算 Flink版产品使用合集之哪个版本可以做增量快照算法
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
6月前
|
机器学习/深度学习 数据采集 自然语言处理
经典机器学习算法——Pagerank算法(二)
PageRank 算法由 Google 创始人 Larry Page 在斯坦福读大学时提出,又称 PR——佩奇排名。主要针对网页进行排名,计算网站的重要性,优化搜索引擎的搜索结果。PR 值是表示其重要性的因子
|
6月前
|
机器学习/深度学习 数据采集 算法
经典机器学习算法——Pagerank算法(一)
PageRank 算法由 Google 创始人 Larry Page 在斯坦福读大学时提出,又称 PR——佩奇排名。主要针对网页进行排名,计算网站的重要性,优化搜索引擎的搜索结果。PR 值是表示其重要性的因子
经典机器学习算法——Pagerank算法(一)
|
6月前
|
存储 SQL 算法
flink cdc 算法问题之low hign点位有重叠如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
6月前
|
机器学习/深度学习 算法 搜索推荐
Flink中的流式机器学习是什么?请解释其作用和常用算法。
Flink中的流式机器学习是什么?请解释其作用和常用算法。
143 0
|
6月前
|
算法 搜索推荐 Java
图计算中的PageRank算法是什么?请解释其作用和计算原理。
图计算中的PageRank算法是什么?请解释其作用和计算原理。
78 0
|
消息中间件 存储 算法
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。

热门文章

最新文章

下一篇
无影云桌面