微服务架构设计与实践:用Spring Cloud实现抖音的推荐系统

简介: 本文基于Spring Cloud实现了一个简化的抖音推荐系统,涵盖用户行为管理、视频资源管理、个性化推荐和实时数据处理四大核心功能。通过Eureka进行服务注册与发现,使用Feign实现服务间调用,并借助Redis缓存用户画像,Kafka传递用户行为数据。文章详细介绍了项目搭建、服务创建及配置过程,包括用户服务、视频服务、推荐服务和数据处理服务的开发步骤。最后,通过业务测试验证了系统的功能,并引入Resilience4j实现服务降级,确保系统在部分服务故障时仍能正常运行。此示例旨在帮助读者理解微服务架构的设计思路与实践方法。

一、引子

抖音的推荐系统是其成功的关键之一,而背后是一套复杂的微服务架构支撑着高并发和庞大的用户数据处理。每当用户刷到新的视频时,背后都有一个复杂的推荐算法在实时运行。而在这样的场景下,构建一个高效、可扩展的微服务架构是至关重要的。本文将通过 Spring Cloud 构建一个简化版的抖音推荐系统,探讨微服务架构的设计与实践。

二、业务梳理

在正式的开发前,我们需要先对这个简化版的推荐系统所需要的功能做下梳理:

用户行为数据

推荐系统的核心在于个性化推荐,而个性化推荐的前提是对用户行为的全面了解。用户的每一次操作(如观看点赞转发评论等)都会影响推荐结果。因此,系统需要具备以下功能:

  • 记录用户行为数据:记录用户在平台上与视频的交互行为(比如用户观看了哪些视频、点赞了哪些视频等)。
  • 管理用户画像:基于用户的历史行为,生成用户的兴趣画像,用于推荐计算。

视频资源管理

抖音作为一个短视频平台,需要管理大量的视频资源。每个视频都有不同的标签(如类型话题风格等),这些标签是推荐算法的重要依据。因此,系统需要:

  • 存储视频基本信息:包括视频的ID、标题、标签、上传时间等。
  • 提供视频分类:根据视频的标签信息,将视频分类以便后续推荐。

个性化推荐

推荐系统的核心功能就是根据用户的兴趣和视频内容的标签,生成个性化推荐列表。为了实现这一功能,系统需要:

  • 获取用户画像和视频标签:结合用户的兴趣画像与视频的标签,匹配用户可能感兴趣的视频。
  • 生成推荐列表:根据算法计算,生成个性化推荐的视频列表并返回给用户。

用户行为数据的实时处理

用户在平台上的行为是实时发生的,因此推荐系统需要能够实时处理这些行为数据,并根据最新的行为更新用户画像。为此,系统需要:

  • 实时处理用户行为:当用户进行某个操作时(如点赞或观看某个视频),系统能够实时接收这些事件,并更新用户画像。
  • 异步处理:为了不影响用户的使用体验,行为数据的处理应尽量异步化,通过消息队列等手段解耦实时数据处理与推荐服务。

通过上述的业务需求梳理,我们最终可以总结出一个简化版的推荐系统需要具备的核心功能:

  • 用户行为管理:记录用户的观看、点赞等行为。
  • 视频资源管理:存储视频的基本信息和标签。
  • 个性化推荐:结合用户画像和视频标签,生成推荐列表。
  • 实时数据处理:处理用户行为数据,实时更新用户画像。

三、架构设计

完成了需求梳理后,我们总结了四大核心功能,进而可以抽象出四个服务来分别完成上述功能,因此,我们可以简单地绘制下业务架构图,如下:
01.png

这里我做了一些简化:比如省略了API网关,客户端直接请求到推荐服务上获取响应。因为我们决定使用SpringCloud来搭建这个项目,所以我们的服务都注册到Eureka上,服务之间的调用则采用Feign实现。另外,分别使用RedisKafka来缓存用户画像和传递用户行为数据。

四、具体实现

通过需求梳理、技术选型、架构设计,我们已经完成了项目开发前的准备工作,现在就可以正式进行开发了。首先需要我们根据业务架构图完成项目的基础搭建。

项目搭建

使用IDEA通过maven-archetype-quickstart进行快速创建,如下:
02.png
接下来通过同样的方式分别创建四大业务模块和Eureka服务模块,项目结构如下:
03.png
此时,在父模块的pom.xml文件中就可以看到子模块都已经被管理起来,我们再引入SpringBoot和
SpringCloud的依赖(Ps:版本可根据喜好自行选择,我这里演示也无所谓了,就不用SpringBoot3了),如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.itasass</groupId>
    <artifactId>recommendation-system</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>pom</packaging>

    <modules>
        <module>eureka-server</module>
        <module>user-service</module>
        <module>video-service</module>
        <module>recommendation-service</module>
        <module>data-processing-service</module>
    </modules>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <spring.boot.version>2.6.4</spring.boot.version>
        <spring.cloud.version>2021.0.1</spring.cloud.version>
    </properties>

    <dependencyManagement>

        <dependencies>

            <!-- Spring Boot Dependencies-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring.boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <!-- Spring Cloud Dependencies-->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring.cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

        </dependencies>

    </dependencyManagement>

</project>

此时,我们就完成了项目的基础搭建,接下来开始编写各个服务的代码。

创建 Eureka 服务

各个服务都有自己的依赖需要导入,Eureka服务所需依赖如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>eureka-server</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

    </dependencies>

</project>

导入相关依赖后,可以创建 eureka-server 模块中的主类 EurekaServerApplication,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication
{
   
    public static void main( String[] args ) {
   
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

然后我们需要对eureka服务进行配置,在 src/main/resources 目录下创建 application.yml如下:

server:
  port: 8761 # 服务运行的端口

eureka:
  client:
    register-with-eureka: false
    fetch-registry: false
  server:
    enable-self-preservation: false

完成了上述配置后,我们需要启动Eureka来看看当前的这个服务是否可以正常运行,也很简单,启动 EurekaServerApplication,访问 http://localhost:8761,此时可以看到 Eureka Server 的控制台界面,如下:
04.png

创建用户服务

依然是先导入依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>user-service</artifactId>
    <packaging>jar</packaging>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

同样地需要配置application.yml,四大业务模块都需要将自己注册到eureka中,如下:

server:
  port: 8081  # 服务运行的端口


spring:
  application:
    name: user-service  # 用户服务的名称
  kafka:
    bootstrap-servers: localhost:9092  # Kafka 服务器地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka

同理,完善用户服务的主类,这里主要是模拟用户看过的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;

@SpringBootApplication
@EnableEurekaClient
@RestController
public class UserServiceApplication {

    public static void main( String[] args ) {
        SpringApplication.run(UserServiceApplication.class, args);
    }

    @GetMapping("/users/{userId}/history")
    public List<String> getUserHistory(@PathVariable String userId) {
        // 模拟用户的历史行为数据,这里返回一些示例视频ID
        return Arrays.asList("1", "3", "5","7");
    }
}

这里说明下,我们本次的实例都采用模拟数据的思路,主要是让大家了解下设计思路,所以就不建立相关的库表了。启动用户服务后就可以在eureka控制台看到user服务了,如下:
05.png
另外,通过工具(如 Apifox)来访问用户服务的 REST API,如下:
06.png
完成了基础的配置,但别忘了在我们的架构设计中用户服务还有一项重任-记录用户行为数据。因此,我们需要在用户服务里编写Kafka的生产者服务,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Service
public class KafkaProducerService {
   

  private static final String TOPIC = "user-behavior-topic";  // Kafka 主题名称

  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;

  /**
   * 发送用户行为到 Kafka
   *
   * @param userId       用户ID
   * @param videoId      视频ID
   * @param videoTag     视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  public void sendUserBehavior(String userId, String videoId, String videoTag, int isInterested) {
   
    // 构建消息
    String message = String.format("User:%s watched Video:%s [Tag:%s] with interest:%d", userId, videoId, videoTag, isInterested);
    kafkaTemplate.send(TOPIC, message);
  }

}

同时,我们编写一个记录用户观看视频的行为的接口来模拟用户刷视频的场景,代码如下:

import com.recommendation.serive.KafkaProducerService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
   

  @Autowired
  private KafkaProducerService kafkaProducerService;

  /**
   * 记录用户观看视频的行为,并发送到 Kafka
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频标签
   * @param isInterested 是否感兴趣 (0: 不感兴趣,1: 感兴趣)
   */
  @PostMapping("/users/{userId}/watch/{videoId}/{videoTag}/{isInterested}")
  public String watchVideo(@PathVariable String userId,
                           @PathVariable String videoId,
                           @PathVariable String videoTag,
                           @PathVariable int isInterested) {
   
    // 调用 Kafka 生产者服务将行为数据(包括视频标签)发送到 Kafka
    kafkaProducerService.sendUserBehavior(userId, videoId, videoTag, isInterested);

    return String.format("User %s watched video %s with tag %s and interest %d", userId, videoId, videoTag, isInterested);
  }
}

通过Apifox模拟数据进行调用,查看API的响应结果和Kafka来观察消息是否发送成功,如下:
07.png
08.png
从结果来看,我们的消息发送成功。

创建视频服务

同样地,依然是先完善依赖,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>video-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

接着配置application.yml文件,如下:

server:
  port: 8082  # 服务运行的端口

spring:
  application:
    name: video-service  # 视频服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka

接着又是主类,视频服务通过模拟提供了所有的视频,如下:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Arrays;
import java.util.List;


@SpringBootApplication
@EnableEurekaClient
@RestController
public class VideoServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(VideoServiceApplication.class, args);
    }

    @GetMapping("/videos")
    public List<Video> getAllVideos() {
   
        // 模拟视频数据,这里返回一些示例视频
        return Arrays.asList(
                new Video("1", "娱乐"),
                new Video("2", "娱乐"),
                new Video("3", "科技"),
                new Video("4", "美食"),
                new Video("5", "科技"),
                new Video("6", "美食"),
                new Video("7", "旅游"),
                new Video("8", "科技")
        );
    }

    static class Video {
   
        private String id;
        private String tag;

        public Video(String id, String tag) {
   
            this.id = id;
            this.tag = tag;
        }

        public String getId() {
   
            return id;
        }

        public String getTag() {
   
            return tag;
        }
    }
}

和用户服务一样,启动后查看eureka控制台和测试API调用,如下:
09.png
10.png

创建推荐服务

依然是先进行依赖和配置文件的编写,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>recommendation-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- Spring Boot Web Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <!-- Feign Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
        </dependency>

        <!-- Spring Boot Redis Starter -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson Databind (用于 Redis 的序列化/反序列化) -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

    </dependencies>

</project>
server:
  port: 8083  # 服务运行的端口

spring:
  application:
    name: recommendation-service  # 推荐服务的名称

eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/  # Eureka Server 的地址
    fetch-registry: true  # 从 Eureka 拉取服务注册表
    register-with-eureka: true  # 将自己注册到 Eureka

由于推荐服务 需要调用 用户服务视频服务 来获取用户的历史行为和视频列表。我们可以使用 Feign 客户端来简化服务之间的调用。因此,我们需要建立一个client目录来存放客户端接口,如下:
11.png

两个客户端接口都是模拟获取数据,如下:

import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;

import java.util.List;

@FeignClient(name = "user-service")
public interface UserServiceClient {
   

  @GetMapping("/users/{userId}/history")
  List<String> getUserHistory(@PathVariable("userId") String userId);
}
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;

import java.util.List;

@FeignClient(name = "video-service")  // 指定服务名称为 video-service
public interface VideoServiceClient {

  @GetMapping("/videos")
  List<Video> getAllVideos();

  class Video {
    private String id;
    private String tag;

    public String getId() {
      return id;
    }

    public void setId(String id) {
      this.id = id;
    }

    public String getTag() {
      return tag;
    }

    public void setTag(String tag) {
      this.tag = tag;
    }
  }
}

当然,别忘了推荐服务是根据用户画像来推荐视频,所以这里我们还需要从redis中获取用户画像,那么我们声明一个redis的工具类来获取用户画像,如下:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Repository;

import java.util.Map;


@Repository
public class RedisUserProfileRepository {
   

  @Autowired
  private StringRedisTemplate redisTemplate;  // 用于操作 Redis

  @Autowired
  private ObjectMapper objectMapper;  // 用于 JSON 序列化/反序列化

  private static final String USER_PROFILE_KEY_PREFIX = "user_profile:";

  /**
   * 从 Redis 中获取用户画像
   * @param userId 用户 ID
   * @return 用户画像(Map结构)
   */
  public Map<String, Object> getUserProfile(String userId) throws JsonProcessingException {
   
    String key = USER_PROFILE_KEY_PREFIX + userId;
    String jsonProfile = redisTemplate.opsForValue().get(key);  // 从 Redis 获取用户画像(JSON 字符串)
    if (jsonProfile != null) {
   
      return objectMapper.readValue(jsonProfile, Map.class);  // 将 JSON 转为 Map
    }
    return null;  // 如果 Redis 中没有用户画像,返回 null
  }
}

接着我们在主类里写推荐服务的主要逻辑,大家可以直接看代码的注释,根据用户画像进行推荐,没有则全部返回,如下:

import com.recommendation.client.VideoServiceClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

@SpringBootApplication
@EnableEurekaClient
@EnableFeignClients
@RestController
public class RecommendationServiceApplication {
   
    public static void main( String[] args ) {
   
        SpringApplication.run(RecommendationServiceApplication.class, args);
    }

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    @GetMapping("/recommendations/{userId}")
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(video.getId()))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(video.getTag()))
                .collect(Collectors.toList());
    }

}

创建数据处理服务

数据处理服务是整个系统运转的关键,一方面它需要接收用户行为数据,另一方面还需要对用户行为数据进行处理去更新用户画像,所以这个服务的主要逻辑也是围绕它们展开。老规矩,依然是先补充依赖和配置文件,如下:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>com.itasass</groupId>
        <artifactId>recommendation-system</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>

    <artifactId>data-processing-service</artifactId>
    <packaging>jar</packaging>


    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>

        <!-- Spring Kafka -->
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <!-- Redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- Jackson -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <!-- Spring Boot Starter Web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Eureka Client -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

    </dependencies>
</project>
server:
  port: 8084  # 服务运行的端口

spring:
  kafka:
    bootstrap-servers: localhost:9092  # Kafka broker 地址
    consumer:
      group-id: data-processing-group   # Kafka 消费者组 ID
      auto-offset-reset: earliest       # 消费者从最早的消息开始读取
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  redis:
    host: 127.0.0.1
    port: 6379

接下来,通过一个kafka的监听接口来实现上述功能,我这里演示就随意了,大家用MQ传递消息还是用JSON格式,否则你解析很容易出问题,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

@Service
public class KafkaConsumerService {
   

  @Autowired
  private DataProcessingService dataProcessingService;

  // 监听 Kafka 主题,消费用户行为数据
  @KafkaListener(topics = "user-behavior-topic", groupId = "data-processing-group")
  public void consumeUserBehavior(String message) {
   
    try {
   
      System.out.println("Received message: " + message);

      // 使用正则表达式提取信息
      String userId = extractValue(message, "User:(\d+)");
      String videoId = extractValue(message, "Video:(\d+)");
      String videoTag = extractValue(message, "Tag:([^\]]+)");

      if (userId != null && videoId != null && videoTag != null) {
   
        dataProcessingService.processUserBehavior(userId, videoId, videoTag);
      } else {
   
        System.err.println("Failed to parse message: " + message);
      }
    } catch (Exception e) {
   
      System.err.println("Error processing message: " + message);
      e.printStackTrace();
    }
  }

  private String extractValue(String message, String pattern) {
   
    Pattern r = Pattern.compile(pattern);
    Matcher m = r.matcher(message);
    return m.find() ? m.group(1) : null;
  }

}

在数据处理服务中,我们需要处理消息并更新用户画像,如下:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

@Service
public class DataProcessingService {

  @Autowired
  private StringRedisTemplate redisTemplate;

  private static final String USER_PROFILE_KEY_PREFIX = "user:";
  private static final String USER_HISTORY_KEY = "history";
  private static final String USER_INTERESTS_KEY = "interests";

  /**
   * 处理用户行为数据:更新 Redis 中的用户画像
   * @param userId 用户 ID
   * @param videoId 视频 ID
   * @param videoTag 视频的标签
   */
  public void processUserBehavior(String userId, String videoId, String videoTag) {
    //1.更新用户的观看历史记录
    updateUserHistory(userId, videoId);
    //2.更新用户的兴趣标签
    updateUserInterests(userId, videoTag);
  }

  /**
   * 更新用户的观看历史记录
   * @param userId 用户 ID
   * @param videoId 视频 ID
   */
  private void updateUserHistory(String userId, String videoId) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForList()
            .leftPush(userProfileKey + ":" + USER_HISTORY_KEY, videoId);
  }

  /**
   * 更新用户的兴趣标签
   * @param userId 用户 ID
   * @param tag 视频的标签
   */
  private void updateUserInterests(String userId, String tag) {
    String userProfileKey = USER_PROFILE_KEY_PREFIX + userId;
    redisTemplate.opsForSet()
            .add(userProfileKey + ":" + USER_INTERESTS_KEY, tag);
  }

}

至此,我们的这个简版推荐系统就搭建完成了,可以看到eureka的控制台上四大业务服务都已经注册到了,如下:
12.png

五、业务测试

完成了系统的开发后,我们现在开始测试,首先我们先访问推荐服务,按照我们的逻辑:由于是第一次访问还没有形成画像,此时应该返回所有视频,如下:
13.png
可以看到此时返回了我们模拟的所有8个视频,那么假定我这个用户对科技类视频感兴趣,假如我看了id为3的视频并点赞了,那么下次推荐服务应该只返回id=5和8的视频。我们先去调用一次记录用户观看视频的行为的接口,如下:
14.png
然后我们去看下reids里是否已经有用户画像了,如下:
15.png
16.png
那么这时候我们再去调用推荐服务,就会得到我们上面期望的结果了,如下:
17.png
我们的推荐系统通过多个服务的协作成功实现了预期功能,但是按照现在的实现,如果其中一个服务挂了,整个推荐系统就不可用了,显然这是我们不期望发生的,所以我们就需要考虑做服务降级,确保即使某个服务出问题,不影响整个系统的使用。

六、服务降级

我们在推荐服务的主类接口里有调用redis来获取用户画像,就以它为例,因为我们用的springcloud,就直接用配套的resilience4j来实现,依然是先引入依赖和补充配置,如下:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-circuitbreaker-resilience4j</artifactId>
</dependency>
resilience4j:
  circuitbreaker:
    instances:
      redisService:  # 熔断器名称,与 @CircuitBreaker 注解中的 name 属性对应
        slidingWindowSize: 10  # 熔断器的滑动窗口大小,用于计算失败率的调用数量
        failureRateThreshold: 50  # 失败率阈值,50% 代表当失败率达到 50% 时开启熔断
        waitDurationInOpenState: 10000  # 熔断器在打开状态下保持的时间(毫秒),即熔断器打开后等待 10 秒再尝试进入半开状态
        permittedNumberOfCallsInHalfOpenState: 3  # 半开状态下允许的最大调用次数,用于测试服务是否恢复
        minimumNumberOfCalls: 5  # 统计失败率所需的最小调用次数,至少进行 5 次调用后熔断器才会判断是否开启熔断
        automaticTransitionFromOpenToHalfOpenEnabled: true  # 允许自动从打开状态过渡到半开状态,尝试恢复服务
        slowCallDurationThreshold: 2000  # 慢调用的阈值,2 秒内完成的调用视为正常,超过 2 秒的调用视为慢调用
        slowCallRateThreshold: 50  # 慢调用比例阈值,50% 代表当慢调用比例达到 50% 时开启熔断

然后我们改造原推荐接口,加上注解和降级方法,主要逻辑是当redis连接不上时,直接调视频服务的接口获取所有视频返回,代码如下:

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private VideoServiceClient videoServiceClient;

    private static final String REDIS_BACKEND = "redisService"; // 定义熔断器名称

    @GetMapping("/recommendations/{userId}")
    @CircuitBreaker(name = REDIS_BACKEND, fallbackMethod = "getAllVideosFallback") // 添加熔断器
    public List<VideoServiceClient.Video> getRecommendations(@PathVariable("userId") String userId) {
   
        //从Redis获取用户画像,包括兴趣标签和观看历史
        String userProfileKey = "user:" + userId;
        //获取用户的兴趣标签
        Set<String> userInterests = redisTemplate.opsForSet().members(userProfileKey + ":interests");
        //获取用户的观看历史
        List<String> userHistory = redisTemplate.opsForList().range(userProfileKey + ":history", 0, -1);
        //如果用户画像不存在或用户没有兴趣标签,返回默认推荐列表
        if (userInterests.isEmpty() || userHistory.isEmpty()) {
   
            return videoServiceClient.getAllVideos();
        }
        //获取所有可用的视频列表
        List<VideoServiceClient.Video> allVideos = videoServiceClient.getAllVideos();
        //根据用户画像中的兴趣标签,推荐符合兴趣且用户未看过的视频
        return allVideos.stream()
                //筛选出用户未看过的视频
                .filter(video -> !userHistory.contains(String.valueOf(video.getId())))
                //只推荐与用户画像中的兴趣标签匹配的视频
                .filter(video -> userInterests.contains(String.valueOf(video.getTag())))
                .collect(Collectors.toList());
    }

    // 降级方法,当 Redis 连接失败时调用此方法
    public List<VideoServiceClient.Video> getAllVideosFallback(Throwable ex) {
   
        // 打印日志,记录异常信息
        System.err.println("Redis is unavailable. Returning fallback recommendation. Exception: " + ex.getMessage());

        // 在降级方法中调用 Feign 客户端返回默认的视频列表
        return videoServiceClient.getAllVideos();
    }
}

然后我们把Redis停掉,如下:
18.png
这时候我们再调用推荐接口,分别观察控制台有输出报错信息提示我们redis连接失败以及接口返回了所有视频数据,如下:
19.png
20.png
我们这里模拟了redis挂掉的情况,但其实熔断降级机制只要是需要调用别的服务的地方,我们都可以做相关处理来提升可用性。

七、小结

本文基于Spring Cloud实现了一个简单的推荐系统,当然真实的推荐系统远比这复杂,涉及到很多算法,例如协同过滤、用户行为等等。这篇文章的重点是带大家建立微服务的设计思想:怎么围绕业务去做服务拆解,怎么通过Spring Cloud去构建微服务项目以及服务的熔断降级处理,并最终希望让大家在看完后能够有所收获!

目录
相关文章
|
7天前
|
供应链 监控 安全
对话|企业如何构建更完善的容器供应链安全防护体系
随着云计算和DevOps的兴起,容器技术和自动化在软件开发中扮演着愈发重要的角色,但也带来了新的安全挑战。阿里云针对这些挑战,组织了一场关于云上安全的深度访谈,邀请了内部专家穆寰、匡大虎和黄竹刚,深入探讨了容器安全与软件供应链安全的关系,分析了当前的安全隐患及应对策略,并介绍了阿里云提供的安全解决方案,包括容器镜像服务ACR、容器服务ACK、网格服务ASM等,旨在帮助企业构建涵盖整个软件开发生命周期的安全防护体系。通过加强基础设施安全性、技术创新以及倡导协同安全理念,阿里云致力于与客户共同建设更加安全可靠的软件供应链环境。
150227 10
|
4天前
|
供应链 监控 安全
|
15天前
|
弹性计算 人工智能 安全
对话 | ECS如何构筑企业上云的第一道安全防线
随着中小企业加速上云,数据泄露、网络攻击等安全威胁日益严重。阿里云推出深度访谈栏目,汇聚产品技术专家,探讨云上安全问题及应对策略。首期节目聚焦ECS安全性,提出三道防线:数据安全、网络安全和身份认证与权限管理,确保用户在云端的数据主权和业务稳定。此外,阿里云还推出了“ECS 99套餐”,以高性价比提供全面的安全保障,帮助中小企业安全上云。
201928 14
对话 | ECS如何构筑企业上云的第一道安全防线
|
6天前
|
SQL 安全 前端开发
预编译为什么能防止SQL注入?
SQL注入是Web应用中常见的安全威胁,攻击者通过构造恶意输入执行未授权的SQL命令。预编译语句(Prepared Statements)是一种有效防御手段,它将SQL代码与数据分离,确保用户输入不会被解释为SQL代码的一部分。本文详细介绍了SQL注入的危害、预编译语句的工作机制,并结合实际案例和多语言代码示例,展示了如何使用预编译语句防止SQL注入,强调了其在提升安全性和性能方面的重要性。
|
10天前
|
搜索推荐 物联网 PyTorch
Qwen2.5-7B-Instruct Lora 微调
本教程介绍如何基于Transformers和PEFT框架对Qwen2.5-7B-Instruct模型进行LoRA微调。
421 34
Qwen2.5-7B-Instruct Lora 微调
|
1月前
|
人工智能 自然语言处理 前端开发
从0开始打造一款APP:前端+搭建本机服务,定制暖冬卫衣先到先得
通义灵码携手科技博主@玺哥超carry 打造全网第一个完整的、面向普通人的自然语言编程教程。完全使用 AI,再配合简单易懂的方法,只要你会打字,就能真正做出一个完整的应用。
9952 29
|
3天前
|
人工智能 算法 搜索推荐
阿里云百炼xWaytoAGI共学课开课:手把手学AI,大咖带你从零搭建AI应用
阿里云百炼xWaytoAGI共学课开课啦。大咖带你从零搭建AI应用,玩转阿里云百炼大模型平台。3天课程,涵盖企业级文本知识库案例、多模态交互应用实操等,适合有开发经验的企业或独立开发者。直播时间:2025年1月7日-9日 20:00,地点:阿里云/WaytoAGI微信视频号。参与课程可赢取定制保温杯、雨伞及磁吸充电宝等奖品。欢迎加入钉钉共学群(群号:101765012406),与百万开发者共学、共享、共实践!
|
3天前
|
SQL 存储 Apache
基于 Flink 进行增量批计算的探索与实践
本文整理自阿里云高级技术专家、Apache Flink PMC朱翥老师在Flink Forward Asia 2024的分享,内容分为三部分:背景介绍、工作介绍和总结展望。首先介绍了增量计算的定义及其与批计算、流计算的区别,阐述了增量计算的优势及典型需求场景,并解释了为何选择Flink进行增量计算。其次,详细描述了当前的工作进展,包括增量计算流程、执行计划生成、控制消费数据量级及执行进度记录恢复等关键技术点。最后,展示了增量计算的简单示例、性能测评结果,并对未来工作进行了规划。
254 5
基于 Flink 进行增量批计算的探索与实践
|
3天前
|
人工智能 自然语言处理 API
阿里云百炼xWaytoAGI共学课DAY1 - 必须了解的企业级AI应用开发知识点
本课程旨在介绍阿里云百炼大模型平台的核心功能和应用场景,帮助开发者和技术小白快速上手,体验AI的强大能力,并探索企业级AI应用开发的可能性。

热门文章

最新文章