mongo 聚合查询性能优化

1. 减少不必要的$lookup$unwind操作

  • docscount两个子管道中都有类似的$lookup$unwind操作,这些操作可能会导致重复计算。你可以在$facet之前,尽量合并或者避免不必要的$lookup$unwind

2. 调整$skip$limit位置

  • 目前,你在$skip$limit之前进行了一些操作,如$lookup。可以尝试在分页($skip$limit)前尽量减少数据量,例如将部分匹配或过滤逻辑提前到$match

3. 提前进行$match过滤

  • 将条件尽量提前放到$match,这样可以减少参与后续操作的数据量。例如,docs子管道中的最后一个$match可能可以提前到管道的开头。

4. 优化索引

  • 确保在$lookup涉及的localFieldforeignField字段上有合适的索引。如果这些字段没有索引,$lookup的性能可能会大大降低。

5. 合并$project步骤

  • 尽量合并多个$project步骤,避免在管道中频繁进行字段的包含和排除操作。

6. 预计算/缓存count

  • 如果可能,考虑将count部分分离出来,作为独立的查询来运行,并缓存结果,避免每次都执行同样的统计操作。

7. 减少文档大小

  • 通过$project排除不必要的字段,这是一个很好的做法。不过可以再检查一下是否还有其他不必要的字段可以排除

还有一些更加精细化的优化手段:

  1. 区分简单match 和复杂match, 尽可能提前match
/**
 * 拆分查询条件,分为简单查询和复杂查询
 * @param query
 * @returns {{complexMatch: {}, simpleMatch: {}}}
 */
function splitAggregateQuery(query) {
  const simpleMatch = {};
  const complexMatch = {};

  for (const key in query) {
    if (query.hasOwnProperty(key)) {
      if (key.includes('.')) {
        complexMatch[key] = query[key];
      } else {
        simpleMatch[key] = query[key];
      }
    }
  }

  return { simpleMatch, complexMatch };
}

2. 对文档总数和文档查询,使用两个AggregateQuery,因为查询 total , 很多时候是不需要做lookup 等操作的。

const {simpleMatch, complexMatch}=splitAggregateQuery(query);
    const hasComplexMatch=!_.isEmpty(complexMatch)
    const pipeline = [
      {
        $match: {
            createdAt: { $gte: dayjs().subtract(30, 'days').toDate() },
            ...simpleMatch,
        },
      },
      {
        $lookup: { localField: 'category', from: 'categories', as: 'category', foreignField: '_id' },
      },
      {
        $unwind: {
          path: '$category',
        },
      },
      {
        $lookup: { localField: 'user', from: 'users', as: 'user', foreignField: '_id' },
      },
      {
        $project: {
          'post.tokenized_content': 0,  // 如果不需要_id字段,也可以将其排除
        }
      },
      {
        $unwind: {
          path: '$user',
        },
      },
      {
        $match: complexMatch,
      },
    ];
    // 性能优化
    let countPipeline = pipeline
    if(!hasComplexMatch){
        pipeline.splice(1,0,"__PREPAGINATE__")
        countPipeline= pipeline.slice(0,1)
    }

    const countAggregate = this.doc.aggregate(countPipeline);
    return this.doc.aggregatePaginate(pipeline, {
        ...option,
      countQuery:countAggregate
    },async (err, result) => {
      if(err) console.log(err)
      result.list=await this.packList(result.list)
      return result;
    });

3. 更精细化的,可以匹配出每次个阶段后,可以立即执行的match 操作,把match 进行分片插入。

4. 实在不行的情况下,可以通过现在查询文档的数量进行加速,例如只查最近三月数据,很多时候也不一定需要全部数据。

5. 最后就是,后续在数据表设计、以及存储的时候,对于经常需要连表查询的,可以考虑冗余存储或内嵌存储。

Leave a Comment

邮箱地址不会被公开。 必填项已用*标注