기본 콘텐츠로 건너뛰기

주소로 고객 검색 서비스 구축하기(feat. Elastic Search v 8.6.2, MacOS) - 3탄

주소로 고객 검색 서비스 구축하기(feat. Elastic Search,  MacOS) - 3탄

안녕하세요. 클스 입니다.


* 목표

- Elastic Search & Kibana 설치 (1탄) - 보기

- Elastic Search Client for Python 설치 및 프로그램 (2탄) - 보기

- 주소 데이터 검색 구조 설계 및 bulk 생성/업로드 (3탄)  - 현재글

- 주소 데이터 검색 구조 설계 및 bulk 생성/업로드 (3.5탄)  - Polars vs Pandas

- 대량으로 검색 하기 (4탄)

- 데이터 백업 및 복구 <유지관리> (5탄)

지난 1탄에서는 설치하고 구동하고 화면 접속을 확인 했습니다. 이번 2탄에서는 데이터를 설계하고

넣는 방법을 알아보았습니다. 3탄 실제 주소데이터를 설계하고 JSON을 생성해서 BULK로 업로드 하는 것을

해보도록 하겠습니다. 

* 데이터 현황은 아래와 같습니다. 대전/계룡 지역의 인허가 데이터 중, 도시가스를 사용하는 고객은 노란색
  영업입니다. 파란색 영역은 미 공급 지역으로 실제 영업을 해야하는 식당이 되겠습니다.

   영업팀은 신규영업을 통해 확대를 해야하는데, 공급하지 않는 곳을 알아야 합니다. 이때 미공급이 된다고해도
   경제성을 분석해서 가능한 식당인지 분석을 하고 영업을 시작합니다.

   우리팀에서 가장많이 요청받는 업무는 이 미공급 식당을 추출해 달라는 요청입니다. 

   문제점은 1탄에서 언급한 것 처럼, 표준화가 어렵고 표준화 되지도 않았다는 것입니다. 

    1) 로컬데이터 주소와 회사 고객 주소가 표준화 되어 있지 않아서 RDB로 full text search, like 검색등으로
        조회하기 어렵다.
    2) 대량으로 조회하기 어렵다.

    3) 동일 주소에 식당이 개업/폐업등을 이력으로 관리하기 어렵다.

   이 문제를 해결하기위해 elastic search를 검토하였고, 일부 테스트는 성공적이었습니다.

* 주소 데이터 설계

   먼저 주소를 활용할 목적에 따라 설계가 되는 것이 중요합니다. 업무에서 가장 많이 필요로 하는 것이
   회사가 대전/계룡 지역에 도시가스를 공급하다보니, 식당에 도시가스 공급 여부를 일자별로 알아야 합니다.

   전략은 지역데이터<local_restaurant>와 회사 데이터로 2개의 index<biz_restaurant>를 생성합니다.

   그래서 상황에 따라 조회 방향을 다르게 가져가서 일치하는 부분을 추출하거나 그 외 부분을 추출할 수 
   있도록 합니다. SQL Join 개념과 동일합니다. 아래 벤다이어그램은 개념적인 표현입니다.
   실제는 로컬데이터 100% 서브셋이 업무데이터입니다. 그러나 데이터 업데이트 주기에 따라서 100%가
   아닐 수 있어 아래와 같이 표현했습니다.

   


* 로컬데이터 설계

      index : local_restaurant
      doc : 관리번호, 주소, 영업시작일, 폐업일, 상태, 업종, 업태

      로컬데이터 주소는 지번(도로명)주소 + 상세주소(건물명, 상호명, 동, 호)가 포함되어 있습니다. 

* 업무데이터 설계

      index : biz_restaurant
      doc : 고객번호, 주소, 영업시작일, 폐업일, 상태, 업종, 업태, 행안부 건물 관리번호

      업무데이터 주소는 지번(도로명)주소, 건물명, 상호명, 동, 호 컬럼이 분리되어 있습니다.
      그래서 elastic search를 위해서는 해당 주소를 합쳐야 합니다.

* elastic search 에는 업무데이터와 로컬데이터를 전부 올리고, 변경시는 index를 업데이트 한다.

* RDBMS의 테이블 설계
   여기서는 기준을 로컬데이터로 잡았습니다.
     
   1) 로컬데이터를 가져온다. 변경 내역만 추출한다. elastic search에 업데이트 한다.
   2) 업무용 주소로 elastic search에서 조회한다. score가 높은 것 1개를 가져온다.
   3) 위의 매핑 테이블에  데이터를 추가한다. 

    최초 매핑 과정은 필요합니다.

* 벌크 JSON 데이터 생성


python 프로그램은 아래와 같은 인자로 할 것입니다.
$ elasticsearch_addr_bulk.py
-f <file path>
-t <file type : json, csv, excel> -i <index_name> -o <출력 json 파일, 입력안할 경우 index_name.json>


이런 형식이 되어야 하는데, 만들기 어렵다. elasticsearch python lib에서는 쉽게 만드는 것을
제공한다.

{ "index" : {"_id":3} } { "biz_category":"일반음식점", "biz_category_id":"07_24_04_P", "biz_category_areacd":3640000, "mgmt_no":"3640000-101-2022-00101","allow_date":"2022-06-07", "biz_status":"영업\/정상", "biz_status_detail":"영업","biz_addr":"대전광역시 동구 용운동 295-10 ", "biz_addr_doro":"대전광역시 동구 용운로 159-1, 1층 (용운동)", "biz_name":"라홍방마라탕 대전용운점","biz_domain":"중국식"} { "index" : {"_id":3} } { "biz_category":"일반음식점", "biz_category_id":"07_24_04_P", "biz_category_areacd":3640000, "mgmt_no":"3640000-101-2022-00101","allow_date":"2022-06-17", "biz_status":"영업\/정상", "biz_status_detail":"영업","biz_addr":"대전광역시 동구 용운동 295-10 ", "biz_addr_doro":"대전광역시 동구 용운로 159-1, 1층 (용운동)", "biz_name":"라홍방마라탕 대전용운점","biz_domain":"중국식"}



Bulk는 클라이언트 API에서 두 가지 형태로 제공 된다. index나 get 같이 직접 API를 호출 방식과 helpers를 이용한 방식이다. 동일하게 bulk의 기능을 충실히 수행하지만, 한 가지 큰 차이가 있다. 바로 세그먼트화. helpers.bulk는 원하는 수만큼 나누어 여러 번에 걸쳐 전송을 도와준다. 서버나 네트워크의 상황에 따라 안정적인 전송 보장하기 위한 훌륭한 도구로 사용된다.
https://towardsdatascience.com/how-to-index-elasticsearch-documents-with-the-bulk-api-in-python-b5bb01ed3824


* 우선 가장 기본적인 es client로 bulk 업로드 하기 : 데이터 수는 약 25,504개 임

from elasticsearch import Elasticsearch
from elasticsearch import helpers

import pandas as pd
import json
from tabulate import tabulate
from logging import root
import os, sys
import argparse
import configparser
import shutil
from pathlib import Path
from glob import glob
from tabulate import tabulate
from logging import root
import os, sys
import argparse
import configparser
import shutil
from pathlib import Path
from glob import glob
from random import getrandbits

import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

_source_file = "./지역데이터_대전계룡_식당_SIMPLE.xlsx";
_target_index = "local_restaurants"
_sheet_name = "대전"

df_columns = ["biz_category", "biz_category_id", "biz_category_areacd", "mgmt_no", "allow_date", "biz_status",
"biz_status_detail", "biz_addr", "biz_addr_doro", "biz_name", "biz_domain"]

df_addr = pd.read_excel(_source_file, sheet_name = _sheet_name, header=0,
usecols=["개방서비스명", "개방서비스아이디", "개방자치단체코드", "관리번호", "인허가일자", "영업상태명",
"상세영업상태명", "소재지전체주소", "도로명전체주소", "사업장명", "위생업태명"], engine="openpyxl");

df_addr.columns = df_columns
# print(tabulate(df_addr.head(5), headers="keys", tablefmt="psql"))

datas = df_addr.to_dict(orient = "records")
# json_data = df_addr.to_json(force_ascii=False, orient = "records", indent=2)

def bulk_upload_use_es_client():
actions = []
index_name = _target_index
id = 0
for data in datas:
id += 1
action = {"index": {"_id": data["mgmt_no"]}}
# print(action)

doc = data
# print(doc)

actions.append(action)
actions.append(doc)

# print(actions)
es_client.bulk(index=index_name, operations=actions)

bulk_upload_use_es_client()

# 테스트 해보기

def search_doro_addr(esindex:str, keyword:str):
doc = {"query":{"match":{'biz_addr_doro':keyword}}}
res = es.search(index=esindex, body=doc, size=10)

df = json_normalize(res['hits']['hits'])
print(tabulate(df, headers='keys', tablefmt='psql'))

def search_jibun_addr(esindex:str, keyword:str):
doc = {"query":{"match":{'biz_addr':keyword}}}
res = es.search(index=esindex, body=doc, size=10)

df = json_normalize(res['hits']['hits'])
print(tabulate(df, headers='keys', tablefmt='psql'))


# search_doro_addr('local_restaurants', '대전광역시 중구 당디로 61')
# search_jibun_addr('local_restaurants', '대전광역시 동구 판암동 839 ')
search_jibun_addr('local_restaurants', '충청남도 계룡시 엄사면 엄사리 167-4')


* 어느 블로그에서 가져왔는데 출처가 기억이 안나요

Object 형식의 필드를 추천하는 상황과 추천하지 않는 상황
상황에 따라 object형식으로 데이터를 저장할 때가 좋은 경우가 있는 반면 그렇지 않은 경우도 있습니다.

추천하는 상황object의 필드 별로 데이터를 정렬하여 검색하는 경우
object의 필드 별로 데이터를 집계 및 통계를 내야 되는 경우

추천하지 않는 상황 그냥 데이터를 저장하고 읽기만 하는 경우
추천하지 않는 상황에서는 그냥 필드를 text로 하여 json데이터를 text로 저장하는 것을 추천합니다.

* local.go.kr 에서 제공하는 csv 파일은 euc-kr 이기 때문에 python에서 읽기가 어렵다.
  그래서 utf8로 변환해서 사용하면 좋습니다.

   $ iconv -f cp949 -t utf8 ~/Downloads/6300000_CSV/6300000_대전광역시_07_24_04_P_일반음식점.csv > 6300000_대전광역시_07_24_04_P_일반음식점_UTF8.csv

  디렉토리와 하위 디렉토리의 모든 파일을 UTF8로 일괄 변환하는 shell script을 짜서 활용하면 좋습니다.

인코딩 체크하는 shell script : $ sh localdata_check_encoding.sh
#!/bin/bash

SOURCE_DIR="/Downloads/6300000_CSV"

# Use the find command to locate all files in the source directory and subdirectories with the cp949 encoding
find "$SOURCE_DIR" -type f -name "*.csv" -exec sh -c '
echo $(file -b --mime-encoding "$1") "\t:" "$1"
' sh {} \;

* 디렉토리내 파일의 문자셋을 찾아서 전부 변환하는 shell script
#!/bin/bash

SOURCE_DIR="/Downloads/6300000_CSV"

find "$SOURCE_DIR" -type f -name "*.csv" -exec sh -c '
if [[ $(file -b --mime-encoding "$1") == "unknown-8bit" ]]; then
iconv -f cp949 -t utf8 "$1" > "$1".UTF8
fi' sh {} \;


* localdata에서 csv로 다운받으면 작업이 편리하기때문에 csv를 읽어서 elastic search로 업로드
하는 것을 코드를 만들어 사용하면 편리합니다. 다만, 업종별로 column이 공통 부분도 있고,
그렇지 않는 부분도 있습니다. 업종에 따라서 usecols를 정의해서 사용해야 합니다.

* 등록된 데이터 삭제하기
[인덱스 목록 보기]
$ curl -k -u elastic:your_pwd -XGET 'https://localhost:9200/_cat/indices?v&pretty'

health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open local_restaurants 31zoyfxlSJeSNJyS5JwfLg 1 1 25464 1 12mb 6mb green open rasturant mzi3xVV4TQOdrOaRHq5dFA 1 1 2 0 46.3kb 23.1kb green open customer XH_azxb5THqrJA-h5Z4j0Q 1 1 2 0 15.8kb 10.4kb green open restaurants wDs0ClPHRq-FBVnhJ-TWBA 1 1 4 1 76.7kb 38.3kb green open 숙박업 czMA32SbQvqI-T4ZIgZTZQ 1 1 56102 0 72.3mb 30.9mb green open metrics-endpoint.metadata_current_default 2TSwdhhkQVWF7_SOCNYohQ 1 1 0 0 450b 225b green open rasturants PgQSx-s9QPqAfVzGWqgvxA 1 1 7 3 102.9kb 51.4kb green open 대전광역시_일반음식점 OK7baPruRTCiQtuZrwJCbA 1 1 56102 0 63.2mb 31.5mb green open my_index WH9BmfssRRSwuflrSOXTFA 1 1 9 0 35.6kb 17.8kb green open fruits2 blID5A2iRPuEnrriuyV9nw 1 1 2 0 12kb 5kb


[인덱스 전체 삭제하기] - Console에서 삭제하면 제일 편하다. 전체 삭제 wildcard는 사용이 불가


csv 데이터중 nan 인 '' 로 처리해줘야 합니다. 안그러면 아래 오류를 만나게 됩니다.
raise BulkIndexError(f"{len(errors)} document(s) failed to index.", errors)


test_action = [{'_index': '대전광역시_일반음식점',
'_id': '3660000-3660000-101-2017-00098',
'_source': {'개방서비스명': '일반음식점', '개방서비스아이디': '07_24_04_P', '개방자치단체코드': 3660000,
'관리번호': '3660000-101-2017-00098', '인허가일자': '2017-03-21', '인허가취소일자': 'nan',
'영업상태구분코드': 1, '영업상태명': '영업/정상', '상세영업상태코드': 1, '상세영업상태명': '영업',
'폐업일자': 'nan', '휴업시작일자': 'nan', '휴업종료일자': 'nan', '재개업일자': 'nan',
'소재지전체주소': '대전광역시 서구 괴정동 423-6 나이스 팰리스',
'도로명전체주소': '대전광역시 서구 도솔로388번길 52, 1층 109호 (괴정동, 나이스 팰리스)',
'사업장명': '초밥어선생', '최종수정시점': '2021-05-07 10:14:05', '데이터갱신구분': 'U',
'데이터갱신일자': '2021-05-09 02:40:00', '위생업태명': '일식'}}]


소스 : elasticsearch_addr_csv_bulk.py (1000개씩 row를 읽어서 elastic search에 insert )
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from elasticsearch.helpers import bulk

import pandas as pd
import json
from tabulate import tabulate
from logging import root
import os, sys
import argparse
import configparser
import shutil
from pathlib import Path
from glob import glob
from tabulate import tabulate
from logging import root
import os, sys
import argparse
import configparser
import shutil
from pathlib import Path
from glob import glob
from random import getrandbits
import re
import numpy as np
import urllib3


"""
-f <file path>
-t <file type : json, csv, excel>
-i <_index_name>
-o <출력 json 파일, 입력안할 경우 _index_name.json>
"""
_parser = argparse.ArgumentParser()

_parser.add_argument("-f", "--file",
type=str,
help="원천 파일 경로",
required=False)

_parser.add_argument("-i", "--index",
type=str,
help="생성할 index name",
required=False)

_args = _parser.parse_args()

_source_file = _args.file
_target_index = _args.index

tabulate.WIDE_CHARS_MODE = True

_source_file = '~/Downloads/6300000_CSV/6300000_대전광역시_07_24_04_P_일반음식점_UTF8.csv';
_index_name = ""

if _target_index :
_index_name = _target_index
else:
filename = os.path.splitext(os.path.basename(_source_file))[0]
reg_pattern = re.compile(r'[ㄱ-ㅣ가-힣]+')
reg_results = re.findall(reg_pattern, _source_file)
_index_name = "_".join(reg_results) # index name은 파일명에서 가져온다.

source_usecols = [ "개방서비스명", "개방서비스아이디", "개방자치단체코드", "관리번호", "인허가일자", "인허가취소일자",
"영업상태구분코드", "영업상태명", "상세영업상태코드", "상세영업상태명", "폐업일자","휴업시작일자","휴업종료일자","재개업일자",
"소재지전체주소", "도로명전체주소", "사업장명", "최종수정시점","데이터갱신구분","데이터갱신일자", "좌표정보(x)","좌표정보(y)", "위생업태명"]


urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

es_client = Elasticsearch("https://localhost:9200", basic_auth=("elastic", "yourpwd"), ca_certs=False, verify_certs=False)

for df_addr_batch in pd.read_csv(_source_file, usecols=source_usecols, chunksize=1000, na_values=['nan']):
df_addr_batch = df_addr_batch.replace(np.nan, '')
documents = df_addr_batch.to_dict(orient = "records")
actions = [
{
"_index": _index_name,
"_id" : f'{document["개방자치단체코드"]}-{document["관리번호"]}',
"_source": document
}
for document in documents
]

bulk(es_client, actions)

print('Process done...')

csv 파일은 약 27MB 인데 elastic search 의 storage : 35 MB 를 차지하네요

Dev Console에서 GET _all/_search?q=3660000-101-2017-00098 로 검색해보면 잘 나옵니다.
Result : 200-OK 596 ms
{ "took": 558, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 1, "relation": "eq" }, "max_score": 0.72928625, "hits": [ { "_index": "대전광역시_일반음식점", "_id": "3660000-3660000-101-2017-00098", "_score": 0.72928625, "_source": { "개방서비스명": "일반음식점", "개방서비스아이디": "07_24_04_P", "개방자치단체코드": 3660000, "관리번호": "3660000-101-2017-00098", "인허가일자": "2017-03-21", "인허가취소일자": "", "영업상태구분코드": 1, "영업상태명": "영업/정상", "상세영업상태코드": 1, "상세영업상태명": "영업", "폐업일자": "", "휴업시작일자": "", "휴업종료일자": "", "재개업일자": "", "소재지전체주소": "대전광역시 서구 괴정동 423-6 나이스 팰리스", "도로명전체주소": "대전광역시 서구 도솔로388번길 52, 1층 109호 (괴정동, 나이스 팰리스)", "사업장명": "초밥어선생", "최종수정시점": "2021-05-07 10:14:05", "데이터갱신구분": "U", "데이터갱신일자": "2021-05-09 02:40:00", "위생업태명": "일식" } } ] } }

* 참고 자료

https://github.com/elastic/elasticsearch-py/tree/main/examples/bulk-ingest

* curl 로 업로드 해보기 <수정중>
$ curl -XPOST "http://localhost:9200/_bulk" -H 'Content-Type: application/json' \
--data-binary @bulk.json


* 벌크업데이트 할때 비동기식으로 하기위한 설정이 필요합니다.
https://danawalab.github.io/elastic/2022/01/12/Update-By-Query-copy.html

* 다중 검색을 위한 API
https://victorydntmd.tistory.com/316

* https://pydole.tistory.com/entry/Python-elasticsearch-bulk-insert-contain-id


----

* Trouble Shooting

1) InsecureRequestWarning: Unverified HTTPS request is being made to host 'localhost'. Adding certificate verification is strongly advised. 안뜨게 하기

import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)


다음편에서는 검색을 다뤄보겠습니다. 원하는 방향은 주소를 기반으로 우리의 고객/비고객을 구분하는 것입니다.

댓글

이 블로그의 인기 게시물

[quaser.dev][2014-09-14] 윈도우즈(10, 64bit)에 개발환경 설정하기

[quaser.dev][2014-09-14] 윈도우즈(10, 64bit)에 개발환경 설정하기

[2024-10-19] iPhone, iPad에서 ChatGPT로 PDF 생성시 한글 깨짐 해결 방법

iPhone, iPad에서 ChatGPT로 PDF 생성 시 한글 깨짐 해결 방법

[2025-04-16(수)] OpenAI gpt-4.1 시리즈 발표, Anthropic Claude에 대한 생각

OpenAI gpt-4.1 시리즈 발표, Anthropic Claude에 대한 생각 안녕하세요. 클스 입니다. 4/15일자로 openai가 gpt-4.1 시리즈를 발표 했습니다. 현재는 api로만 사용가능합니다. 점차 웹/앱 사용자에게 오픈 될거라 생각 됩니다. 비용상 문제로 4.1-mini, nano를 사용해서 chatbot을 만들어 보고 있습니다. 4o 시리즈 보다는 확실히 빠르고, 답변의 정확도는 올라간 것 같습니다. 앤트로픽 클로드와 비교를 많이 하는데, 업무 시스템 혹은 AI 솔루션을 개발하는 입장에서는 어떤 생태계를 제공하는가가 주요한 결정 입니다. AI관련 인력을 충분히 보유한 회사의 경우는 어떤걸 사용해도 좋을 결과를 가지겠지만 일반적인 챗봇 개발 절차를 보면 다음과 같이 볼 수 있습니다. 1. 문서를 준비한다. 대부분 pdf, text, markdown 2. 문서를 파싱해서 vectordb에 올린다.     - 별도 벡터디비 구성 필요. 어떤 db를 선택할지 고민 필요     - 어떤 Parser를 사용할지, 텍스트 오버래핑은 얼마가 적당한지 고민 필요        (회사의 문서가 워낙 많고, 다양하면 하나하나 테스트 해서 좋은걸 선택하는 것이 어렵다)     - 유사도 측정은 어떤 알고리즘을 써야할지 고민 필요     - llamaindex도 고민해야 함. 3. RAG flow를 만든다.     - langchain을 쓸지, 각 AI 벤더에서 제공하는 sdk를 쓸지 고민 필요       (대부분 락인이 되지 않으려면 langchain을 사용하면 좋지만, 벤더에 특화면 기능 적용이 늦음) 4. 챗봇 UI 앱을 만든다.     - 답변이 text 로 구성되다 보니. 그래프, 이미지등 복합적인 컨텐츠를 재배치 하여 표현하기 상당히 어렵네요. (이건 제가 실력이 모자라서 .. 패스) ...