MongoDB/성능

[Python]에서 MongoDB 집계 파이프 사용법 정리(MongoDB aggregation)

인텔로퍼 2025. 3. 13. 15:02
반응형

[Python]에서 MongoDB 집계 파이프 사용법 정리

MongoDB 집계 파이프라인 개요

MongoDB의 집계 파이프라인(Aggregation Pipeline) 은 데이터를 단계별로 처리하여 필터링, 그룹화, 정렬 및 변환하는 강력한 분석 기능을 제공합니다. SQL의 GROUP BY와 유사하지만, 더 복잡하고 유연한 데이터 조작이 가능합니다.

 

이 문서에서는 "products" 데이터베이스의 "laptops" 컬렉션을 사용합니다. 이 컬렉션에는 200개의 노트북 데이터가 저장되어 있으며, 문서에는 brand(브랜드), in_stock(재고 상태), 그리고 attributes(속성 배열) 등의 정보가 포함되어 있습니다. 속성 배열에는 "8GB", "16GB"와 같은 메모리 크기 정보가 문자열 형태로 저장될 수 있습니다.

 

반응형

🛠 환경 설정

Python에서 MongoDB를 사용하려면 PyMongo 라이브러리가 필요합니다. 로컬 MongoDB 서버에 연결하는 기본 코드 예제는 다음과 같습니다:

from pymongo import MongoClient

client = MongoClient('localhost', 27017)
db = client['products']
collection = db['laptops']

💡 Tip:
데이터를 로드하는 방법으로는 JSON 파일을 임포트하는 것 외에도, Docker를 사용한 로컬 MongoDB 실행 또는 MongoDB Atlas 무료 티어 활용이 가능합니다.

 


📊 [$group] 기본 집계: 문서 개수 세기

컬렉션의 총 문서 개수를 구하는 간단한 예제입니다.
MongoDB 쉘에서는 countDocuments()를 사용할 수 있지만, 집계 파이프라인을 통해서도 동일한 결과를 얻을 수 있습니다:

pipeline = [
    {'$group': {'_id': None, 'total': {'$count': {}}}}
]

result = list(collection.aggregate(pipeline))
print("Total number of laptops:", result[0]['total'])

🔹 결과 예시:

{"_id": null, "total": 200}

🔍 $group 단계에서 _id: None 으로 설정하면 모든 문서를 하나의 그룹으로 묶고, $count 연산자로 문서 개수를 계산합니다.


🏷 [$count] 브랜드별 노트북 개수 구하기

각 브랜드별 노트북 수를 계산하려면 $group 단계를 사용하여 브랜드(brand) 필드를 기준으로 그룹화합니다:

pipeline = [
    {'$group': {'_id': '$brand', 'count': {'$count': {}}}}
]

result = list(collection.aggregate(pipeline))
print("Laptop counts by brand:", result)

🔹 결과 예시:

[
    {"_id": "HP", "count": 50},
    {"_id": "Dell", "count": 40},
    {"_id": "Lenovo", "count": 30}
]

🏷 설명:

  • "$brand": 문서에서 브랜드 필드 값을 가져옵니다.
  • $count: 각 그룹에 속한 문서 개수를 계산합니다.

🔽 정렬: $sort 단계

브랜드별 노트북 개수를 내림차순 정렬(많은 브랜드 순) 하려면 $sort 단계를 추가하면 됩니다:

pipeline = [
    {'$group': {'_id': '$brand', 'count': {'$count': {}}}},
    {'$sort': {'count': -1}}  # count 값을 기준으로 내림차순 정렬
]

result = list(collection.aggregate(pipeline))
print("Sorted laptop counts by brand:", result)

🔹 결과 예시:

[
    {"_id": "HP", "count": 50},
    {"_id": "Dell", "count": 40},
    {"_id": "Lenovo", "count": 30}
]

📌 설명:

  • $sort: {'count': -1} → count 값을 기준으로 내림차순 정렬
  • $sort: {'count': 1} → count 값을 기준으로 오름차순 정렬

 


 

🔎 필터링: $match 단계

재고가 있는 노트북(in_stock: true)만 필터링하려면 $match 단계를 사용합니다.

pipeline = [
    {'$match': {'in_stock': True}},  # 재고가 있는 문서만 필터링
    {'$group': {'_id': '$brand', 'count': {'$count': {}}}},
    {'$sort': {'count': -1}}  # 브랜드별 노트북 개수를 내림차순 정렬
]

result = list(collection.aggregate(pipeline))
print("In-stock laptop counts by brand:", result)

🟢 $match 단계 설명

  • $match는 특정 조건을 만족하는 문서만 다음 단계로 전달하는 역할을 합니다.
  • $match 단계를 여러 번 사용할 수도 있으며, 예를 들어 재고가 10개 이상인 브랜드만 출력하려면 추가 필터링을 적용할 수 있습니다.

🔄 배열 처리: $unwind 단계

어떤 문서의 속성 배열(attributes) 이 다음과 같이 저장되어 있다고 가정합니다.

{"_id": 1, "brand": "HP", "attributes": ["8GB", "16GB"]}

이제 $unwind를 사용하여 각 배열 요소를 개별 문서로 변환해 보겠습니다.

pipeline = [
    {'$unwind': '$attributes'}
]

result = list(collection.aggregate(pipeline))
print("Unwound attributes:", result[:5])  # 처음 5개만 출력

🔹 $unwind 적용 후 결과:

{"_id": 1, "brand": "HP", "attributes": "8GB"}
{"_id": 1, "brand": "HP", "attributes": "16GB"}

🟢 $unwind 단계 설명

  • 배열을 개별 요소로 분해하여 하나의 문서를 여러 개의 문서로 확장합니다.
  • 배열을 처리해야 하는 경우 $unwind가 필수적이며, 이후의 분석이 훨씬 쉬워집니다.

🏗 문서 재구성: $project 단계

이제 attributes 필드에 저장된 메모리 크기("8GB", "16GB" 등) 에서 "GB"를 제거하고, 이를 정수형 데이터로 변환해 보겠습니다.

pipeline = [
    {'$unwind': '$attributes'},
    {'$project': {
        'brand': 1,
        'memory': {
            '$let': {
                'vars': {
                    'mem': { '$trim': { 'input': '$attributes', 'chars': 'GB' } },
                },
                'in': { '$toInt': '$$mem' }
            }
        }
    }}
]

result = list(collection.aggregate(pipeline))
print("Projected memory values:", result[:5])  # 처음 5개 출력

🟢 $project 단계 설명

  • $trim: "GB" 문자열을 제거하여 숫자만 남김.
  • $toInt: 문자열을 정수형으로 변환.
  • $let: 변수(mem)를 사용하여 변환 과정을 단계적으로 수행.

💡 이제 메모리를 숫자로 변환했기 때문에, 크기 비교 및 정렬이 정확하게 가능합니다!
("8GB" > "16GB" 같은 오류 방지)


🔝 브랜드별 최대 메모리 찾기

이제 앞서 배운 $unwind, $project, $group, $sort 단계를 모두 결합하여,
브랜드별 최대 메모리 크기를 찾아보겠습니다.

pipeline = [
    {'$unwind': '$attributes'},
    {'$project': {
        'brand': 1,
        'memory': {
            '$let': {
                'vars': {
                    'mem': { '$trim': { 'input': '$attributes', 'chars': 'GB' } },
                },
                'in': { '$toInt': '$$mem' }
            }
        }
    }},
    {'$group': {
        '_id': '$brand',
        'max_memory': { '$max': '$memory' }
    }},
    {'$sort': {'max_memory': -1}}  # 내림차순 정렬
]

result = list(collection.aggregate(pipeline))
print("Maximum memory by brand:", result)

🔹 출력 예시:

[
    {"_id": "HP", "max_memory": 32},
    {"_id": "Lenovo", "max_memory": 32},
    {"_id": "Dell", "max_memory": 16}
]

🟢 각 단계 설명

1️⃣ $unwind: 배열을 개별 요소로 변환
2️⃣ $project: "GB"를 제거하고 정수형 데이터로 변환
3️⃣ $group: 브랜드별 최대 메모리 값 계산
4️⃣ $sort: 내림차순 정렬


✅ 마무리

이제까지 배운 MongoDB 집계 파이프라인의 주요 기능을 정리해 봅시다.

단계 설명 예시 사용법

$group 문서를 그룹화하고 집계 수행 브랜드별 노트북 개수 계산
$sort 특정 필드를 기준으로 정렬 카운트 내림차순 정렬
$match 특정 조건을 만족하는 문서만 필터링 재고가 있는 노트북만 필터링
$unwind 배열을 개별 요소로 변환 속성 배열 분해
$project 문서를 재구성하고 필드를 변환 메모리 크기를 정수형으로 변환

MongoDB 집계 파이프라인을 활용하면 데이터를 더욱 효율적으로 처리하고, 복잡한 분석도 쉽게 수행할 수 있습니다.
이제 직접 실습해 보며 자신만의 데이터 분석 파이프라인을 만들어 보세요! 🚀

 

 

함께보면 좋은글

MongoDB 성능 개선을 위한 점검 (DB 프로파일링 / db.setProfilingLevel)

 

MongoDB 성능 개선을 위한 점검 (DB 프로파일링 / db.setProfilingLevel)

⚠️ 성능에 영향을 주는 쿼리를 확인하기 위한 용도일뿐, 이 자체로 성능을 개선하는 것과는 무관합니다.(오히려 프로파일링 활성화 모드일때 성능을 더 잡아먹음. 프로파일링 완료 후 반드시

intelloper.tistory.com

MongoDB 성능 개선! 빠른 속도 튜닝!을 위한 DB 인덱싱 적용 db.collection.createIndex

 

MongoDB 성능 개선! 빠른 속도 튜닝!을 위한 DB 인덱싱 적용 db.collection.createIndex

인덱싱은 데이터베이스 성능 최적화의 핵심이며, 제대로 사용하면 쿼리 속도를 크게 향상시킬 수 있습니다. MongoDB 인덱싱이란?인덱싱은 데이터베이스 테이블의 검색 속도를 향상시키기 위해

intelloper.tistory.com

Python 성능 향상: 3배 더 빠른 스크립트로 바꾸는 방법

 

Python 성능 향상: 3배 더 빠른 스크립트로 바꾸는 방법

Python 코드가 느려서 답답한 경험, 누구나 한 번쯤 있죠? 제가 사용한 주요 Python 성능 최적화 방법들을 공유하려고 합니다. 이 방법들을 통해 여러분도 Python 스크립트를 빠르게 만들 수 있을 것

intelloper.tistory.com

 

반응형