Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
578 changes: 331 additions & 247 deletions dtable_events/common_dataset/common_dataset_sync_utils.py

Large diffs are not rendered by default.

159 changes: 60 additions & 99 deletions dtable_events/common_dataset/common_dataset_syncer.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@

from dtable_events import init_db_session_class
from dtable_events.app.config import DTABLE_PRIVATE_KEY
from dtable_events.common_dataset.common_dataset_sync_utils import import_or_sync, set_common_dataset_invalid, set_common_dataset_sync_invalid
from dtable_events.common_dataset.common_dataset_sync_utils import import_sync_CDS, set_common_dataset_invalid, set_common_dataset_sync_invalid
from dtable_events.utils import get_opt_from_conf_or_env, parse_bool, uuid_str_to_36_chars, get_inner_dtable_server_url

logger = logging.getLogger(__name__)
from dtable_events.utils.dtable_server_api import DTableServerAPI

class CommonDatasetSyncer(object):

Expand Down Expand Up @@ -54,7 +53,7 @@ def get_dtable_server_header(dtable_uuid):
algorithm='HS256'
)
except Exception as e:
logger.error(e)
logging.error(e)
return
return {'Authorization': 'Token ' + access_token}

Expand All @@ -63,89 +62,62 @@ def gen_src_dst_assets(dst_dtable_uuid, src_dtable_uuid, src_table_id, src_view_
"""
return assets -> dict
"""
dst_headers = get_dtable_server_header(dst_dtable_uuid)
src_headers = get_dtable_server_header(src_dtable_uuid)

# request src_dtable
dtable_server_url = get_inner_dtable_server_url()
url = dtable_server_url.strip('/') + '/dtables/' + src_dtable_uuid + '?from=dtable_events'

src_dtable_server_api = DTableServerAPI('dtable-events', src_dtable_uuid, dtable_server_url)
dst_dtable_server_api = DTableServerAPI('dtable-events', dst_dtable_uuid, dtable_server_url)
try:
resp = requests.get(url, headers=src_headers)
src_dtable_json = resp.json()
src_dtable_metadata = src_dtable_server_api.get_metadata()
dst_dtable_metadata = dst_dtable_server_api.get_metadata()
except Exception as e:
logger.error('request src dtable: %s error: %s', src_dtable_uuid, e)
return
logging.error('request src dst dtable: %s, %s metadata error: %s', src_dtable_uuid, dst_dtable_uuid, e)
return None

# check src_table src_view
src_table = None
for table in src_dtable_json.get('tables', []):
if table.get('_id') == src_table_id:
src_table, src_view = None, None
for table in src_dtable_metadata.get('tables', []):
if table['_id'] == src_table_id:
src_table = table
break
if not src_table:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source table not found.')
return

src_view = None
if src_view_id:
for view in src_table.get('views', []):
if view.get('_id') == src_view_id:
src_view = view
break
if not src_view:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source view not found.')
return
else:
views = src_table.get('views', [])
if not views or not isinstance(views, list):
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('No views found.')
return
src_view = views[0]
return None
for view in src_table.get('views', []):
if view['_id'] == src_view_id:
src_view = view
break
if not src_view:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Source view not found.')
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

设置为无效的,日志应该是warning

return None

# get src columns
src_view_hidden_columns = src_view.get('hidden_columns', [])
if not src_view_hidden_columns:
src_columns = src_table.get('columns', [])
else:
src_columns = [col for col in src_table.get('columns', []) if col.get('key') not in src_view_hidden_columns]
src_columns = [col for col in src_table.get('columns', []) if col not in src_view.get('hidden_columns', [])]

# request dst_dtable
url = dtable_server_url.strip('/') + '/dtables/' + dst_dtable_uuid + '?from=dtable_events'
try:
resp = requests.get(url, headers=dst_headers)
dst_dtable_json = resp.json()
except Exception as e:
logging.error('request dst dtable: %s error: %s', dst_dtable_uuid, e)
return
src_enable_archive = (src_dtable_metadata.get('settings') or {}).get('enable_archive', False)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个目前需要判断是否是归档数据吗

src_version = src_dtable_metadata.get('version')

# check dst_table
dst_table = None
for table in dst_dtable_json.get('tables', []):
if table.get('_id') == dst_table_id:
dst_table = table
break
if not dst_table:
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.warning('Destination table: %s not found.' % dst_table_id)
return
if dst_table_id:
for table in dst_dtable_metadata.get('tables', []):
if table['_id'] == dst_table_id:
dst_table = table
break
if not dst_table:
set_common_dataset_invalid(dataset_id, db_session)
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
logging.error('Destination table not found.')
return None

return {
'dst_headers': dst_headers,
'src_headers': src_headers,
'src_table': src_table,
'src_view': src_view,
'src_table_name': src_table['name'],
'src_view_name': src_view['name'],
'src_view_type': src_view.get('type', 'table'),
'src_columns': src_columns,
'dst_columns': dst_table.get('columns'),
'dst_rows': dst_table.get('rows'),
'dst_table_name': dst_table.get('name'),
'src_version': src_dtable_json.get('version')
'src_enable_archive': src_enable_archive,
'src_version': src_version,
'dst_table_name': dst_table['name'] if dst_table else None,
'dst_columns': dst_table['columns'] if dst_table else None
}


Expand Down Expand Up @@ -207,61 +179,50 @@ def check_common_dataset(db_session):
if not assets:
continue

dst_headers = assets.get('dst_headers')
src_table = assets.get('src_table')
src_view = assets.get('src_view')
src_columns = assets.get('src_columns')
src_headers = assets.get('src_headers')
dst_columns = assets.get('dst_columns')
dst_rows = assets.get('dst_rows')
dst_table_name = assets.get('dst_table_name')
dtable_src_version = assets.get('src_version')

if dtable_src_version == last_src_version:
if assets.get('src_version') == last_src_version:
continue

try:
result = import_or_sync({
'dst_dtable_uuid': dst_dtable_uuid,
result = import_sync_CDS({
'src_dtable_uuid': src_dtable_uuid,
'src_rows': src_table.get('rows', []),
'src_columns': src_columns,
'src_table_name': src_table.get('name'),
'src_view_name': src_view.get('name'),
'src_headers': src_headers,
'dst_dtable_uuid': dst_dtable_uuid,
'src_table_name': assets.get('src_table_name'),
'src_view_name': assets.get('src_view_name'),
'src_columns': assets.get('src_columns'),
'src_version': assets.get('src_version'),
'dst_table_id': dst_table_id,
'dst_table_name': dst_table_name,
'dst_headers': dst_headers,
'dst_columns': dst_columns,
'dst_rows': dst_rows,
'lang': 'en' # TODO: lang
'dst_table_name': assets.get('dst_table_name'),
'dst_columns': assets.get('dst_columns'),
'operator': 'dtable-events',
'lang': 'en', # TODO: lang
'dataset_id': dataset_id
})
except Exception as e:
logger.error('sync common dataset error: %s', e)
logging.error('sync common dataset error: %s', e)
continue
else:
if result.get('error_msg'):
logger.error(result['error_msg'])
logging.error(result['error_msg'])
if result.get('error_type') == 'generate_synced_columns_error':
set_common_dataset_sync_invalid(dataset_sync_id, db_session)
continue

dataset_update_map[dataset_sync_id] = dtable_src_version
dataset_update_map[dataset_sync_id] = assets.get('src_version')
sync_count += 1

if sync_count == 1000:
try:
update_sync_time_and_version(db_session, dataset_update_map)
except Exception as e:
logger.error(f'update sync time and src_version failed, error: {e}')
logging.error(f'update sync time and src_version failed, error: {e}')
dataset_update_map = {}
sync_count = 0

if dataset_update_map:
try:
update_sync_time_and_version(db_session, dataset_update_map)
except Exception as e:
logger.error(f'update sync time and src_version failed, error: {e}')
logging.error(f'update sync time and src_version failed, error: {e}')


class CommonDatasetSyncerTimer(Thread):
Expand All @@ -279,7 +240,7 @@ def timed_job():
try:
check_common_dataset(db_session)
except Exception as e:
logger.exception('check periodcal common dataset syncs error: %s', e)
logging.exception('check periodcal common dataset syncs error: %s', e)
finally:
db_session.close()

Expand Down
Loading