原创 结果采集器——locust源码分析

2020/02/17 源码分析

统计对象

从Runner通信机制那篇文章中我们知道有一个非常重要的消息类型——stats,这个是slave给master发送的消息,默认每3秒钟上报一次。stats消息的结构如下所示:

{
    "stats": [],
    "stats_total": {},
    "errors": {},
    "user_count": 10
}

实际上,slave也是持有着类似如上json格式的三个对象:

  • RequestStats
  • StatsEntry
  • StatsError

其中每一个locust进程会维护一个全局RequestStats单例global_stats,这个实例包含一个StatsEntry实例total(对应json的stats_total),以及两个字典entries(对应json的stats)和errors(对应json的errors),其中entries字典key为(name,method),对应值为一个StatsEntry实例,另一个errors字典的key为(name,method,error),对应值为一个StatsError实例。可见,global_stats包含了单个slave汇总的信息,以及各个请求url或name的统计信息。在分布式的场景下,每个slave都会维护一个global_stats, 在一个上报周期达到后将信息发送到master,发送完成后就会重置所有数据,进入下一个周期的统计。

上图中红色的字段是slave真正上报给master的数据。

统计过程

那么slave是如何统计消息,又究竟需要上报什么内容给master?master又是如何汇总的呢?下面我们来看看整个统计过程:

在每一次请求后,无论是成功还是失败,都会触发对应的request_success或者request_failure事件,stats.py文件中的数据统计模块订阅了对应的事件,会调用global_stats对数据进行统计。

在slave的一个上报周期达到后,触发on_report_to_master事件,此时global_stats就会依次调用以下方法对数据进行序列化:

  • serialize_stats
  • total.get_stripped_report
  • serialize_errors

其实也就是对上面提到的total和两个字典中的内容进行序列化,其实就是转为json字符串。

def on_report_to_master(client_id, data):
    data["stats"] = global_stats.serialize_stats()
    data["stats_total"] = global_stats.total.get_stripped_report()
    data["errors"] =  global_stats.serialize_errors()
    global_stats.errors = {}

下图是断点过程看到的stats消息内容(在msgpack序列化之前):

每秒请求数和响应时间及其对应请求个数

StatsEntry有两个比较重要的对象,分别是num_reqs_per_sec和response_times,它们都是字典类型,其中num_reqs_per_sec的key是秒时间戳,显示当前秒完成了多少个请求,统计的时间是完成请求的时刻,比如如果一个请求从第0秒开始,第3秒完成,那么这个请求统计在第3秒的时间戳上,这个对象可以很方便的计算出rps。response_times的key是响应时间,单位是豪秒,为了防止key过多,做了取整,比如147 取 150, 3432 取 3400 和 58760 取 59000,这个是为了方便获得类似90%请求的完成时间(小于等于该时间),99%请求的完成时间,下面具体的源码:

    def _log_time_of_request(self, current_time):
        t = int(current_time)
        self.num_reqs_per_sec[t] = self.num_reqs_per_sec.setdefault(t, 0) + 1
        self.last_request_timestamp = current_time

    def _log_response_time(self, response_time):
        if response_time is None:
            self.num_none_requests += 1
            return

        self.total_response_time += response_time

        if self.min_response_time is None:
            self.min_response_time = response_time

        self.min_response_time = min(self.min_response_time, response_time)
        self.max_response_time = max(self.max_response_time, response_time)

        # to avoid to much data that has to be transfered to the master node when
        # running in distributed mode, we save the response time rounded in a dict
        # so that 147 becomes 150, 3432 becomes 3400 and 58760 becomes 59000
        if response_time < 100:
            rounded_response_time = response_time
        elif response_time < 1000:
            rounded_response_time = int(round(response_time, -1))
        elif response_time < 10000:
            rounded_response_time = int(round(response_time, -2))
        else:
            rounded_response_time = int(round(response_time, -3))

        # increase request count for the rounded key in response time dict
        self.response_times.setdefault(rounded_response_time, 0)
        self.response_times[rounded_response_time] += 1

master汇总信息

slave的每一个stats消息到达master后,都会触发master的slave_report事件,master也拥有自己的global_stats,因此只需要将对应的信息进行累加(可以理解是所有slave对应内容的汇总)。具体在StatsEntry的extend方法:

def extend(self, other):
        """
        Extend the data from the current StatsEntry with the stats from another
        StatsEntry instance. 
        """
        if self.last_request_timestamp is not None and other.last_request_timestamp is not None:
            self.last_request_timestamp = max(self.last_request_timestamp, other.last_request_timestamp)
        elif other.last_request_timestamp is not None:
            self.last_request_timestamp = other.last_request_timestamp
        self.start_time = min(self.start_time, other.start_time)

        self.num_requests = self.num_requests + other.num_requests
        self.num_none_requests = self.num_none_requests + other.num_none_requests
        self.num_failures = self.num_failures + other.num_failures
        self.total_response_time = self.total_response_time + other.total_response_time
        self.max_response_time = max(self.max_response_time, other.max_response_time)
        if self.min_response_time is not None and other.min_response_time is not None:
            self.min_response_time = min(self.min_response_time, other.min_response_time)
        elif other.min_response_time is not None:
            # this means self.min_response_time is None, so we can safely replace it
            self.min_response_time = other.min_response_time
        self.total_content_length = self.total_content_length + other.total_content_length

        for key in other.response_times:
            self.response_times[key] = self.response_times.get(key, 0) + other.response_times[key]
        for key in other.num_reqs_per_sec:
            self.num_reqs_per_sec[key] = self.num_reqs_per_sec.get(key, 0) + other.num_reqs_per_sec[key]
        for key in other.num_fail_per_sec:
            self.num_fail_per_sec[key] = self.num_fail_per_sec.get(key, 0) + other.num_fail_per_sec[key]
    

核心指标

Locust核心的指标其实就4个:

  • 并发数
  • RPS
  • 响应时间
  • 异常率

我们还是回到文章开头的那个json:

{
    "stats": [],
    "stats_total": {},
    "errors": {},
    "user_count": 10
}

结合上方Locust的压测过程截图,我们可以看到,各个接口的指标其实就是stats对象里的各个字段,而最下方汇总的Aggregated这一行则对应stats_total的各个字段,尽管这个json只是slave单个stats消息的内容,却也是最终要显示的内容,只是master对各个消息做了汇总而已。汇总的方式也相当简单,请见上方的StatsEntry的extend方法。

因为master和web模块是一起部署的,因此web可以直接使用master的global_stats对象并展示其内容,可以做到动态显示。

Search

    Table of Contents