주소로 고객 검색 서비스 구축하기(feat. Elastic Search, MacOS) - 4탄
안녕하세요. 클스 입니다.
* 목표
- Elastic Search & Kibana 설치 (1탄) - 보기
- Elastic Search Client for Python 설치 및 프로그램 (2탄) - 보기
- 주소 데이터 검색 구조 설계 및 bulk 생성/업로드 (3탄) - 보기
- 주소 데이터 검색 구조 설계 및 bulk 생성/업로드 (3.5탄) - Polars vs Pandas
- 대량으로 검색 하기 (4탄) - 현재글
- 데이터 백업 및 복구 <유지관리> (5탄)
지난 3탄에서는 데이터 구조를 설계하고 벌크 업로드를 하였습니다. 이번에는 RDBM에 매핑테이블에
데이터를 넣는 것을 해보도록 하겠습니다.
* RDB 테이블
로컬테이터는 RDB에 테이블은 생성해 업로드 하였고, 이후는 업데이트와 추가된 내용 적용되도록 만들었습니다. RBD와 elastic search에 둘다 반영되도록 하였습니다.
* 대량으로 주소를 검색하여 로컬데이터의 관리번호만 가져오고자 합니다.
1) 업무용 주소 테이블에서 주소를 가져온다.
2) loop를 돌면서 로컬데이터 es에 검색을 한다. 제일 점수가 높은것을 가져온다.
3) 매핑테이블에 데이터를 추가한다.
1) 업무용 주소 테이블에서 주소를 가져온다.
2) loop를 돌면서 로컬데이터 es에 검색을 한다. 제일 점수가 높은것을 가져온다.
3) 매핑테이블에 데이터를 추가한다.
* elasticsearch_addr_search_bulk.py
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from pandas import json_normalize
from tabulate import tabulate
# inore ssl warning
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
tabulate.WIDE_CHARS_MODE = True
es_result_size = 1es = Elasticsearch('https://localhost:9200',
basic_auth=('elastic', 'your passwd'),
ca_certs=False,
verify_certs=False)
# 신버전에서는 body tag를 사용하지말고, 각각 사용해라고 나옵니다.
# 그래서 아래와 같이 query, sort를 각각 만들어서 파라메터로 넘겨주었습니다.
# json_normalize 도 pandas.io.json이 아니라 pandas에 바로 있습니다.
# [삭제] from pandas.io.json import json_normalize
# [추가] from pandas import json_normalize
def search_doro_addr(esindex:str, keyword:str):
query = {"match": {'biz_addr_doro':keyword}}
sort = {"_score": {'order':"desc"}}
res = es.search(index=esindex, query=query, sort=sort, size=es_result_size)
df = json_normalize(res['hits']['hits'])
if debug : print(tabulate(df, headers='keys', tablefmt='psql'))
return df
def search_jibun_addr(esindex:str, keyword:str):
query = {"match": {'biz_addr':keyword}}
sort = {"_score": {'order':"desc"}}
res = es.search(index=esindex, query=query, sort=sort, size=es_result_size)
df = json_normalize(res['hits']['hits'])
if debug : print(tabulate(df, headers='keys', tablefmt='psql'))
return df
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-multi-match-query.html
# 도로명 주소 + 상세주소와 지번주소 + 상세주소를 같이 검색하는 함수
def search_addr(esindex:str, addr_jibun:str, addr_doro:str):
query = {"dis_max": {
"queries": [
{ "match" : {'biz_addr':addr_jibun}},
{ "match" : {'biz_addr_doro':addr_doro}}
],
"tie_breaker": 0.3
}
}
sort = {"_score": {'order':"desc"}}
res = es.search(index=esindex, query=query, sort=sort, size=es_result_size)
df = json_normalize(res['hits']['hits'])
if debug : print(tabulate(df, headers='keys', tablefmt='psql'))
return df
# search_doro_addr('local_restaurants', '대전광역시 중구 당디로 61')
# search_jibun_addr('local_restaurants', '대전광역시 동구 판암동 839 ')
# search_jibun_addr('local_restaurants', '충청남도 계룡시 엄사면 엄사리 167-4')
# search_jibun_addr('local_restaurants', '대전광역시 유성구 관평동 1359 한신에스메카')
# search_jibun_addr('local_restaurants', '대전 시 서1구 둔1산동 9108 샘1머리아파트2단지')
search_addr('local_restaurants', '대전시 서구 둔산동 9108 샘머리아파트2단지',
'대전광역시 서구 청사로 281, 주상가동 3층 307호')
다양하게 테스트를 해볼 수 있습니다. 이것을 loop를 돌려서 관리번호만 추출해서 list 형태로 만든다음
RDB 테이블에 upsert를 하면 됩니다.
'''
1. DB에서 업무데이터를 가져온다. 고객번호, 지번주소, 동, 호, 건물명, 상호명
2. loop를 돌면서 elastic search에 검색하고 스코어가 10점 이상이면 매핑을 만든다.
3. db에 insert 한다.
'''
# 임시로 업무용 데이터를 만들었음
bizaddr_data_file = './biz_addr_data.csv'
#건물번호,세대번호,설치번호,계량기번호,세대유형코드,세대유형명,egis사용시설명,번지주소,도로명주소,호수
column_names = ['bld_no', 'hs_no', 'ins_no', 'meter_no', 'hs_type_cd', 'hs_type_nm', 'egis_nm', 'addr_jibun', 'addr_doro', 'ho']
df_biz_addr = pd.read_csv(bizaddr_data_file, skiprows=1, names=column_names, dtype="string");
print(tabulate(df_biz_addr, headers='keys', tablefmt='psql'))
mapping_table_columns=['score', 'biz_code', 'local_code', 'biz_addr_jibun', 'local_addr_jibun', 'biz_addr_doro', 'local_addr_doro']
mapping_table_dtypes=['str', 'str', 'str', 'str', 'str']
mapping_table = pd.DataFrame(columns=mapping_table_columns, dtype='string')
print(tabulate(mapping_table, headers='keys', tablefmt='psql'))
for index, row in df_biz_addr.iterrows():
addr_doro = f'{row["addr_doro"]} {row["ho"]}'
addr_jibun = f'{row["addr_jibun"]} {row["ho"]}'
df_es = search_addr('local_restaurants', addr_jibun, addr_doro)
if( int(df_es['_score']) > 15):
print(row['hs_no'], row['addr_doro'])
print(tabulate(df_es, headers='keys', tablefmt='psql'))
tmp = pd.DataFrame(columns=mapping_table_columns)
tmp['score'] = df_es['_score']
tmp['biz_code'] = row['hs_no'] # 이걸 올리면 NaN 이 나오는 이유는 ?
tmp['local_code'] = df_es['_source.mgmt_no']
tmp['biz_addr_jibun'] = addr_jibun
tmp['local_addr_jibun'] = df_es['_source.biz_addr']
tmp['biz_addr_doro'] = row['addr_doro']
tmp['local_addr_doro'] = df_es['_source.biz_addr_doro']
print(tabulate(tmp, headers='keys', tablefmt='psql'))
mapping_table = mapping_table.append(tmp)
mapping_table.reset_index(drop=True, inplace=True)
print(tabulate(mapping_table, headers='keys', tablefmt='psql'))
부분적으로 데이터는 보안상 숨김 처리 합니다.
* CURL로 테스트 해보기
curl -k -u elastic:pwd -XGET "https://localhost:9200/_all/_search?q="검색어" \
-H "kbn-xsrf: reporting" | jq --color-output
elasticsearch는 기본적으로 https를 사용해야 한다. 그러나 로컬서버의 경우 SSL 인증서를 설치하면
좋겠지만 localhost는 어렵다 이대 -k 옵션을 주면된다.
iterm2에서 json을 아름답게 출력하려면 | jq --color-output 붙혀주면 된다.
{
"took": 220,
"timed_out": false,
"_shards": {
"total": 10,
"successful": 10,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 10000,
"relation": "gte"
},
"max_score": 19.51369,
"hits": [
{
"_index": "대전광역시_일반음식점",
"_id": "36700003670000-101-2017-00260",
"_score": 19.51369,
"_source": {
"번호": 17379,
"개방서비스명": "일반음식점",
"개방서비스아이디": "07_24_04_P",
"개방자치단체코드": 3670000,
"관리번호": "3670000-101-2017-00260",
"인허가일자": "2017-06-15",
"인허가취소일자": "",
"영업상태구분코드": 1,
"영업상태명": "영업/정상",
"상세영업상태코드": 1,
"상세영업상태명": "영업",
"폐업일자": "",
"휴업시작일자": "",
"휴업종료일자": "",
"재개업일자": "",
"소재지전화": "042 934 0420",
"소재지면적": "114.84",
"소재지우편번호": "305-509",
"소재지전체주소": "대전광역시 유성구 관평동 1153 지상1층",
"도로명전체주소": "대전광역시 유성구 관들4길 30, 지상1층 (관평동)",
"도로명우편번호": "34018",
"사업장명": "생각나서",
"최종수정시점": "2021-12-03 09:42:08",
"데이터갱신구분": "U",
* iTerm2 결과 캡쳐
앞에서 localdata를 변경없이 elastic search에 넣어서 컬럼명이 한글입니다. 그래서 수정없이 검색하도록
인덱스와 컬럼명을 수정한 소스 입니다.
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from pandas import json_normalize
from tabulate import tabulate
import pandas as pd
from openpyxl import load_workbook
from openpyxl.styles import Alignment
from openpyxl.utils.dataframe import dataframe_to_rows
from openpyxl.styles import NamedStyle
from tqdm import tqdm
from collections import OrderedDict
import certifi
import urllib3
# inore ssl warning
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
tabulate.WIDE_CHARS_MODE = True
debug = True
es_result_size = 1
es_index_name = '대전광역시_일반음식점'
es = Elasticsearch('https://localhost:9200', basic_auth=('elastic', 'your passwd'), verify_certs=False)
def search_doro_addr(esindex:str, addr_doro:str):
query = {"match": {'도로명전체주소':addr_doro}}
sort = {"_score": {'order':"desc"}}
res = es.search(index=esindex, query=query, sort=sort, size=es_result_size)
df = json_normalize(res['hits']['hits'])
if debug :
print(f'{addr_doro}')
print(f'---------------------------------------------------')
print(res)
print(tabulate(df, headers='keys', tablefmt='psql'))
return df
def search_jibun_addr(esindex:str, addr_jibun:str):
query = {"match": {'소재지전체주소':addr_jibun}}
sort = {"_score": {'order':"desc"}}
res = es.search(index=esindex, query=query, sort=sort, size=es_result_size)
df = json_normalize(res['hits']['hits'])
if debug :
print(f'{addr_jibun}')
print(f'---------------------------------------------------')
print(res)
print(tabulate(df, headers='keys', tablefmt='psql'))
return df
# https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-multi-match-query.html
# https://kazaana2009.tistory.com/6
# 도로명 주소 + 상세주소와 지번주소 + 상세주소를 같이 검색하는 함수
def search_addr(esindex:str, addr_jibun:str, addr_doro:str):
query = {"dis_max": {
"queries": [
{ "match" : {'소재지전체주소':addr_jibun}},
{ "match" : {'도로명전체주소':addr_doro}}
],
"tie_breaker": 0.3
},
}
sort = {"_score": {'order':"desc"}}
res = es.search(index=esindex, query=query, sort=sort, size=es_result_size)
df = json_normalize(res['hits']['hits'])
if debug :
print(f'{addr_jibun} - {addr_doro}')
print(f'---------------------------------------------------')
print(res)
print(tabulate(df, headers='keys', tablefmt='psql'))
return df
# function scroe를 이용
def search_addr_fc(esindex:str, addr_jibun:str, addr_doro:str):
query = {"function_score": {
"query": { "match" : {'소재지전체주소':addr_jibun} },
# "tie_breaker": 0.3
"boost": "5",
"random_score": {},
"boost_mode": "multiply"
},
}
sort = {"_score": {'order':"desc"}}
res = es.search(index=esindex, query=query, sort=sort, size=es_result_size)
df = json_normalize(res['hits']['hits'])
if debug :
print(f'{addr_jibun} - {addr_doro}')
print(f'---------------------------------------------------')
print(res)
print(tabulate(df, headers='keys', tablefmt='psql'))
return df
search_doro_addr(es_index_name, '대전광역시 중구 당디로 61')
search_jibun_addr(es_index_name, '대전광역시 동구 판암동 839 ')
search_jibun_addr(es_index_name, '충청남도 계룡시 엄사면 엄사리 167-4')
search_jibun_addr(es_index_name, '대전광역시 유성구 관평동 1359 한신에스메카')
search_jibun_addr(es_index_name, '대전 시 서1구 둔1산동 9108 샘1머리아파트2단지')
search_addr(es_index_name, '대전시 서구 둔산동 9108 샘머리아파트2단지', '대전광역시 서구 청사로 281, 주상가동 3층 307호')
# 업무용 데이터 살펴보기
'''
1. DB에서 업무데이터를 가져온다. 고객번호, 지번주소, 동, 호, 건물명, 상호명
2. loop를 돌면서 elastic search에 검색하고 스코어가 10점 이상이면 매핑을 만든다.
3. db에 insert 한다.
'''
# 임시로 업무용 데이터를 만들었음
bizaddr_data_file = '~/svn/cncity_svn_new/cncity_ai/utility/python/geo/biz_addr_data.csv'
#건물번호,세대번호,설치번호,계량기번호,세대유형코드,세대유형명,egis사용시설명,번지주소,도로명주소,호수
column_names = ['bld_no', 'hs_no', 'ins_no', 'meter_no', 'hs_type_cd', 'hs_type_nm', 'egis_nm', 'addr_jibun', 'addr_doro', 'ho']
df_biz_addr = pd.read_csv(bizaddr_data_file, skiprows=1, names=column_names, dtype="string");
if debug: print(tabulate(df_biz_addr, headers='keys', tablefmt='psql'))
# exit(0)
mapping_table_columns=['score', 'biz_code', 'local_code', 'biz_addr_jibun', 'local_addr_jibun', 'biz_addr_doro', 'local_addr_doro']
mapping_table_dtypes=['str', 'str', 'str', 'str', 'str']
mapping_table = pd.DataFrame(columns=mapping_table_columns, dtype='string')
if debug: print(tabulate(mapping_table, headers='keys', tablefmt='psql'))
debug = False
print('Mapping local data key and biz data key---------------------------------')
for index, row in df_biz_addr.iterrows():
addr_doro = f'{row["addr_doro"]} {row["ho"]}'
addr_jibun = f'{row["addr_jibun"]} {row["ho"]}'
df_es = search_addr(es_index_name, addr_jibun, addr_doro)
# df_es = search_addr_fc(es_index_name, addr_jibun, addr_doro)
if( int(df_es['_score']) > 0):
if debug: print(row['hs_no'], row['addr_doro'])
if debug: print(tabulate(df_es, headers='keys', tablefmt='psql'))
tmp = pd.DataFrame(columns=mapping_table_columns)
tmp['score'] = df_es['_score']
tmp['biz_code'] = row['hs_no'] # 이걸 올리면 NaN 이 나오는 이유는 ?
tmp['local_code'] = df_es['_source.관리번호']
tmp['biz_addr_jibun'] = addr_jibun
tmp['local_addr_jibun'] = df_es['_source.소재지전체주소']
tmp['biz_addr_doro'] = row['addr_doro']
tmp['local_addr_doro'] = df_es['_source.도로명전체주소']
if debug: print(tabulate(tmp, headers='keys', tablefmt='psql'))
mapping_table = pd.concat([mapping_table, tmp], ignore_index = True)
# print(mapping_table)
mapping_table.reset_index(drop=True, inplace=True)
print(tabulate(mapping_table, headers='keys', tablefmt='psql'))
# 결과 보기
이 dataframe을 매핑 테이블에 Insert를 하여 관리하면 됩니다. 필요한 컬럼은 업데이트 일자, document id
등을 같이 저장하시면 활용성이 더 높아 보입니다.
이상입니다. 다음에는 elastic search의 용량 설정, 유지보수에 필요한 백업, 복구에 대해 알아 보겠습니다.
감사합니다.
댓글
댓글 쓰기