diff --git a/src/WebSDL/settings/base.py b/src/WebSDL/settings/base.py index 3e02529e..21bae5b4 100644 --- a/src/WebSDL/settings/base.py +++ b/src/WebSDL/settings/base.py @@ -202,3 +202,5 @@ STATICFILES_STORAGE = 'django.contrib.staticfiles.storage.ManifestStaticFilesStorage' DEBUG = True if 'debug_mode' in data and data['debug_mode'] == "True" else False + +TIME_ZONE = "UTC" diff --git a/src/WebSDL/settings/settings.json b/src/WebSDL/settings/settings.json new file mode 100755 index 00000000..2f548239 --- /dev/null +++ b/src/WebSDL/settings/settings.json @@ -0,0 +1,40 @@ +{ + "secret_key": "{{django secret key}}", + "debug_mode": "{{True/False, False by default}}", + "static_root": "{{static files root directory}}", + "host": "{{server domain name}}", + "host_alt": ["{{any other server domain names}}", "{{...}}"], + "password_email_sender": "\"Password Recovery\" <{{password recovery email}}>", + "notify_email_sender": "\"Site Data Alert\" <{{data loss notification email}}>", + "email_host": "{{email server address}}", + "email_port": 1234, + "influx_query": "{{influx db server address}}/query?u={influx db user}&p={{influx db password}}&db=envirodiy&q=SELECT%20time,%20DataValue::field,%20UTCOffset::field%20FROM%20%22uuid_{result_uuid}%22%20WHERE%20%22time%22%20%3E=%20%27{last_measurement}%27-{days_of_data}d", + "influx_updater_query": {"url":"{{inlfux db server address}}/write?u=webtsa_root&p=yellowmousewithasmalltinyhat&db=envirodiy&precision=s", "body": "uuid_{result_uuid} DataValue={data_value},UTCOffset={utc_offset}.0 {timestamp_s}"}, + "tsa_url": "{{time series analyst address}}", + "sensor_data_period": "{{days it takes for the data to be considered stale}}", + + "databases": [ + { + "name": "default", + "schema": "{{django default database name}}", + "engine": "django.db.backends.postgresql_psycopg2", + "user": "{{database user}}", + "password": "{{database password}}", + "host": "{{database server address}}", + "port": "5432" + } + ], + "hydroshare_oauth": { + "client_id": "{{hydroshare client id}}", + "client_secret": "{{hydroshare client secret}}", + "redirect_uri": "hydroshare/oauth/" + }, + "crontab_log_file": "{{log file for crontab jobs}}", + "crontab_user": "{{crontab user}}", + "google_api_conf": { + "api_key": "{{api key for leafpack taxon spreadsheet}} (AIzaSyD_uxlkaHEd9h6FK7ULSWxkLbJ4ovySfOI)", + "files": { + "taxon_spreadsheet": "{{leafpack taxon spreadsheet id}} (12CVnLB7XVkOFUf4sL3Nd_oawZCr4BhoxfjL2P8IpZww)" + } + } +} diff --git a/src/WebSDL/settings/settings_nix.py b/src/WebSDL/settings/settings_nix.py new file mode 100755 index 00000000..c736311a --- /dev/null +++ b/src/WebSDL/settings/settings_nix.py @@ -0,0 +1,45 @@ +from .linux_server import * +from os import environ + +DEBUG = True + +# We load the secret key from the environment to not have it in /nix/store. +SECRET_KEY=environ.get('SECRET_KEY') + +# The static root will be a path under /nix/store/ which we don't know yet. +STATIC_ROOT=environ.get('STATIC_ROOT') + +# Allowed hosts are provided via nix config +ALLOWED_HOSTS = list(environ.get('ALLOWED_HOSTS', default='').split(',')) + +### Postgres Database Connection +# We use a local (non TCP) DB connection by setting HOST to an empty string +# In this mode the user gets authenticated via the OS. +# Only processes of a specific system user will be able to access the DB +DATABASES = { + 'default': { + 'ENGINE': 'django.db.backends.postgresql', + 'NAME': environ.get('DB_NAME'), + 'HOST': '', + 'PORT': 0, + 'OPTIONS': { + # ensure django can work with the ODM2 schema by adding that + # to the schema search path + 'options': '-c search_path=public,ODM2', + } + } +} + +# We're using a python module to server static files. Scared of it? +# Read here: http://whitenoise.evans.io/en/stable/index.html#infrequently-asked-questions +MIDDLEWARE += [ 'whitenoise.middleware.WhiteNoiseMiddleware' ] +STATICFILES_STORAGE = 'whitenoise.storage.CompressedStaticFilesStorage' + +# default sqlalchemy cache dir is in the store which can never be written to, +# so we put it in an instance specific temporary diretory. but then does +# it actually help? don't quite trust caching anyhow and this should be a +# relatively long-running process +import tempfile + +_td = tempfile.TemporaryDirectory() +DATAMODELCACHE = _td.name + "/modelcache.pickle" diff --git a/src/dataloaderinterface/ajax.py b/src/dataloaderinterface/ajax.py index a5316342..24e33c2f 100644 --- a/src/dataloaderinterface/ajax.py +++ b/src/dataloaderinterface/ajax.py @@ -56,8 +56,8 @@ def get_sampling_feature_metadata(request_data:Dict[str,Any]) -> str: "LEFT JOIN odm2.units AS un ON un.unitsid = rs.unitsid " \ f"LEFT JOIN odm2.timeseriesresults AS tsr ON tsr.resultid = rs.resultid " \ f"LEFT JOIN odm2.units AS untrs ON untrs.unitsid = tsr.zlocationunitsid "\ - f"WHERE sf.samplingfeaturecode = '{sampling_feature_code}'; " - df = pd.read_sql(query, session.bind) + f"WHERE sf.samplingfeaturecode = %s;" + df = pd.read_sql(query, session.bind, params=[sampling_feature_code]) return df.to_json(orient='records', default_handler=str) def get_sampling_features(request_data:Dict[str,Any]) -> str: diff --git a/src/dataloaderinterface/forms.py b/src/dataloaderinterface/forms.py index b9c4b8d9..74849750 100644 --- a/src/dataloaderinterface/forms.py +++ b/src/dataloaderinterface/forms.py @@ -14,19 +14,9 @@ 'Storm sewer', 'Stream gage', 'Tidal stream', 'Water quality station', 'Weather station', 'Wetland', 'Other' ] -user_affiliations = [ - affiliation[0] - for affiliation - in get_user_model().objects.filter(affiliation_id__isnull=False).values_list('affiliation_id') -] - class SiteTypeSelect(forms.Select): - site_types = { - name: definition - for (name, definition) - in SiteType.objects.filter(name__in=allowed_site_types).values_list('name', 'definition') - } + site_types = None def create_option(self, name, value, label, selected, index, subindex=None, attrs=None): option = super(SiteTypeSelect, self).create_option(name, value, label, selected, index, subindex, attrs) @@ -34,7 +24,16 @@ def create_option(self, name, value, label, selected, index, subindex=None, attr #TECHDEPT - PRT flagging for likely place code will break in future updates of django if isinstance(value, forms.models.ModelChoiceIteratorValue): value = value.value - option['attrs']['title'] = self.site_types[value] if value in self.site_types else '' + + # this is thread-safe under CPython + if SiteTypeSelect.site_types is None: + SiteTypeSelect.site_types = { + name: definition + for (name, definition) + in SiteType.objects.filter(name__in=allowed_site_types).values_list('name', 'definition') + } + + option['attrs']['title'] = SiteTypeSelect.site_types.get(value, '') return option @@ -53,7 +52,7 @@ def label_from_instance(self, obj): class SiteRegistrationForm(forms.ModelForm): affiliation_id = forms.ModelChoiceField( - queryset=Affiliation.objects.filter(affiliation_id__in=(user_affiliations)).for_display(), + queryset=Affiliation.objects.filter(affiliation_id__in=(get_user_model().objects.filter(affiliation_id__isnull=False).values_list('affiliation_id', flat=True))).for_display(), required=False, help_text='Select the user that deployed or manages the site', label='Deployed By' diff --git a/src/dataloaderinterface/management/commands/update_controlled_vocabularies.py b/src/dataloaderinterface/management/commands/update_controlled_vocabularies.py index ef90afc3..1a94ab42 100644 --- a/src/dataloaderinterface/management/commands/update_controlled_vocabularies.py +++ b/src/dataloaderinterface/management/commands/update_controlled_vocabularies.py @@ -47,7 +47,7 @@ def handle(self, *args, **options): api_url = '/api/v1' request_uri = '%s%s/{cv}/?format=json' % (base_url, api_url) - for cv_name in vocabularies_map.iterkeys(): + for cv_name in vocabularies_map.keys(): vocabulary_model = vocabularies_map[cv_name] print('Getting %s vocabulary' % vocabulary_model._meta.verbose_name) @@ -66,6 +66,16 @@ def handle(self, *args, **options): print('- Nothing to add here.') continue + # remove duplicates to avoid the database insert failing due + # to a unique constraint violation + seen_names = set() + deduplicated_add = [] + for concept in response['objects']: + if concept['name'] not in seen_names: + seen_names.add(concept['name']) + deduplicated_add.append(concept) + to_add = deduplicated_add + vocabulary_objects = [vocabulary_model( term=vocabulary['term'], name=vocabulary['name'], diff --git a/src/dataloaderinterface/templates/dataloaderinterface/base.html b/src/dataloaderinterface/templates/dataloaderinterface/base.html index 400e5573..768a076a 100644 --- a/src/dataloaderinterface/templates/dataloaderinterface/base.html +++ b/src/dataloaderinterface/templates/dataloaderinterface/base.html @@ -42,15 +42,6 @@ {% endblock %} - - - diff --git a/src/dataloaderservices/auth.py b/src/dataloaderservices/auth.py index efb102df..45fc4c99 100644 --- a/src/dataloaderservices/auth.py +++ b/src/dataloaderservices/auth.py @@ -1,6 +1,9 @@ from rest_framework import authentication from rest_framework import exceptions +from django.db.models.expressions import Subquery, OuterRef + +from dataloader.models import SamplingFeature from dataloaderinterface.models import SiteRegistration @@ -20,12 +23,17 @@ def authenticate(self, request): # verify sampling_feature uuid is registered by this user, # be happy. token = request.META['HTTP_TOKEN'] - registration = SiteRegistration.objects.filter(registration_token=token).first() + registration = SiteRegistration.objects.filter(registration_token=token + ).annotate(sampling_feature_uuid=Subquery( + SamplingFeature.objects.filter( + pk=OuterRef("sampling_feature_id") + ).values("sampling_feature_uuid")[:1]) + ).values("sampling_feature_uuid").first() if not registration: raise exceptions.PermissionDenied('Invalid Security Token') # request needs to have the sampling feature uuid of the registration - - if str(registration.sampling_feature.sampling_feature_uuid) != request.data['sampling_feature']: + if str(registration["sampling_feature_uuid"]) != request.data['sampling_feature']: raise exceptions.AuthenticationFailed('Site Identifier is not associated with this Token') return None diff --git a/src/dataloaderservices/views.py b/src/dataloaderservices/views.py index c1dbd0e9..62b15db8 100644 --- a/src/dataloaderservices/views.py +++ b/src/dataloaderservices/views.py @@ -52,8 +52,6 @@ # TODO: Check user permissions to edit, add, or remove stuff with a permissions class. # TODO: Use generic api views for create, edit, delete, and list. -from concurrent.futures import ThreadPoolExecutor, as_completed - class ModelVariablesApi(APIView): authentication_classes = (SessionAuthentication, ) @@ -268,6 +266,7 @@ def post(self, request, *args, **kwargs): data_value_units = Unit.objects.get(unit_name='hour minute') sensors = registration.sensors.all() + result_values = [] warnings = [] for row in csv.reader(self.decode_utf8_sig(data_file)): if self.should_skip_row(row): @@ -292,9 +291,10 @@ def post(self, request, *args, **kwargs): continue data_value = row[results_mapping['results'][uuid]['index']] + if len(data_value) == 0: + continue result_value = TimeseriesResultValueTechDebt( result_id=sensor.result_id, - result_uuid=uuid, data_value=data_value, utc_offset=results_mapping['utc_offset'], value_datetime=measurement_datetime, @@ -303,12 +303,17 @@ def post(self, request, *args, **kwargs): time_aggregation_interval=1, time_aggregation_interval_unit=data_value_units.unit_id, ) - try: - result = insert_timeseries_result_values(result_value) - except Exception as e: - warnings.append(f"Error inserting value '{data_value}'"\ - f"at datetime '{measurement_datetime}' for result uuid '{uuid}'") - continue + result_values.append(result_value) + + if len(result_values) > 100000: + with _db_engine.begin() as connection: + insert_timeseries_result_values_bulk(result_values, connection) + result_values = [] + + if len(result_values) > 0: + with _db_engine.begin() as connection: + insert_timeseries_result_values_bulk(result_values, connection) + result_values = [] #block is responsible for keeping separate dataloader database metadata in sync #long term plan is to eliminate this, but need to keep for the now @@ -580,58 +585,72 @@ class TimeSeriesValuesApi(APIView): def post(self, request, format=None): if not all(key in request.data for key in ('timestamp', 'sampling_feature')): raise exceptions.ParseError("Required data not found in request.") - try: - measurement_datetime = parse_datetime(request.data['timestamp']) - except ValueError: - raise exceptions.ParseError('The timestamp value is not valid.') - if not measurement_datetime: - raise exceptions.ParseError('The timestamp value is not well formatted.') - if measurement_datetime.utcoffset() is None: - raise exceptions.ParseError('The timestamp value requires timezone information.') - utc_offset = int(measurement_datetime.utcoffset().total_seconds() / timedelta(hours=1).total_seconds()) - measurement_datetime = measurement_datetime.replace(tzinfo=None) - timedelta(hours=utc_offset) - - sampling_feature = SamplingFeature.objects.filter(sampling_feature_uuid__exact=request.data['sampling_feature']).first() + + sampling_feature = SamplingFeature.objects.filter(sampling_feature_uuid__exact=request.data.pop("sampling_feature")).first() if not sampling_feature: raise exceptions.ParseError('Sampling Feature code does not match any existing site.') - result_uuids = get_result_UUIDs(sampling_feature.sampling_feature_id) - if not result_uuids: - raise exceptions.ParseError(f"No results_uuids matched to sampling_feature '{request.data['sampling_feature']}'") - - #dataloader table related - try: - set_deployment_date(sampling_feature.sampling_feature_id, measurement_datetime) - except Exception as e: - pass - - futures = {} unit_id = Unit.objects.get(unit_name='hour minute').unit_id - - with ThreadPoolExecutor(max_workers=8) as executor: - for key in request.data: + + measurement_datetimes = [] + timestamps = request.data.pop("timestamp") + if not isinstance(timestamps, list): + timestamps = [timestamps] + for timestamp in timestamps: + try: + measurement_datetime = parse_datetime(timestamp) + except ValueError: + raise exceptions.ParseError('The timestamp value is not valid.') + if not measurement_datetime: + raise exceptions.ParseError('The timestamp value is not well formatted.') + if measurement_datetime.utcoffset() is None: + raise exceptions.ParseError('The timestamp value requires timezone information.') + utc_offset = int(measurement_datetime.utcoffset().total_seconds() / timedelta(hours=1).total_seconds()) + measurement_datetime = measurement_datetime.replace(tzinfo=None) - timedelta(hours=utc_offset) + measurement_datetimes.append((measurement_datetime, utc_offset)) + + num_measurements = len(measurement_datetimes) + measurement_data = {k: v if isinstance(v, list) else [v] + for k, v in request.data.items()} + + if not all(len(m) == num_measurements for m in measurement_data.values()): + raise exceptions.ParseError("unequal number of data points") + + with _db_engine.begin() as connection: + result_uuids = get_result_UUIDs(sampling_feature.sampling_feature_id, connection) + if not result_uuids: + raise exceptions.ParseError(f"No results_uuids matched to sampling_feature '{sampling_feature.sampling_feature_uuid}'") + + result_values = [] # values for all times + latest_values = [] # values for only the latest time (i.e. last) + for key, values in measurement_data.items(): try: result_id = result_uuids[key] except KeyError: continue - - result_value = TimeseriesResultValueTechDebt( - result_id=result_id, - result_uuid=key, - data_value=request.data[str(key)], - value_datetime=measurement_datetime, - utc_offset=utc_offset, - censor_code='Not censored', - quality_code='None', - time_aggregation_interval=1, - time_aggregation_interval_unit=unit_id) - futures[executor.submit(process_result_value, result_value)] = None - - errors = [] - for future in as_completed(futures): - if future.result() is not None: errors.append(future.result()) + + for vi, value in enumerate(values): + result_values.append(TimeseriesResultValueTechDebt( + result_id=result_id, + data_value=value, + value_datetime=measurement_datetimes[vi][0], + utc_offset=measurement_datetimes[vi][1], + censor_code='Not censored', + quality_code='None', + time_aggregation_interval=1, + time_aggregation_interval_unit=unit_id + )) + # nab latest value as that was the one we (by assumption) + # generated last in the loop + latest_values.append(result_values[-1]) + + # assume the last measurement datetime is the latest + set_deployment_date(sampling_feature.sampling_feature_id, measurement_datetimes[-1][0], connection) + + insert_timeseries_result_values(result_values, connection) + update_sensormeasurements(latest_values, connection) + sync_result_table(latest_values, num_measurements, connection) - if errors: return Response(errors, status=status.HTTP_500_INTERNAL_SERVER_ERROR) return Response({}, status.HTTP_201_CREATED) ####################################################### @@ -640,24 +659,19 @@ def post(self, request, format=None): #PRT - the code in this block is meant as a hot fix to address poor model performance #the long term goal is to refactor the application models to make them more performant. -def get_result_UUIDs(sampling_feature_id:str) -> Union[Dict[str, str],None]: +def get_result_UUIDs(sampling_feature_id:str, connection) -> Union[Dict[str, str],None]: try: - with _db_engine.connect() as connection: - query = text("SELECT r.resultid, r.resultuuid FROM odm2.results AS r " \ - "JOIN odm2.featureactions AS fa ON r.featureactionid = fa.featureactionid "\ - "WHERE fa.samplingfeatureid = ':sampling_feature_id';") - df = pd.read_sql(query, connection, params={'sampling_feature_id': sampling_feature_id}) - df['resultuuid'] = df['resultuuid'].astype(str) - df = df.set_index('resultuuid') - results = df['resultid'].to_dict() - return results + query = text("SELECT r.resultuuid, r.resultid FROM odm2.results AS r " \ + "JOIN odm2.featureactions AS fa ON r.featureactionid = fa.featureactionid "\ + "WHERE fa.samplingfeatureid = ':sampling_feature_id';") + result = connection.execute(query, sampling_feature_id=sampling_feature_id) + return {str(ruuid): rid for ruuid, rid in result} except: return None class TimeseriesResultValueTechDebt(): def __init__(self, result_id:str, - result_uuid:str, data_value:float, value_datetime:datetime, utc_offset:int, @@ -666,7 +680,6 @@ def __init__(self, time_aggregation_interval:int, time_aggregation_interval_unit:int) -> None: self.result_id = result_id - self.result_uuid = result_uuid self.data_value = data_value self.utc_offset = utc_offset self.value_datetime = value_datetime @@ -675,114 +688,97 @@ def __init__(self, self.time_aggregation_interval = time_aggregation_interval self.time_aggregation_interval_unit = time_aggregation_interval_unit -def process_result_value(result_value:TimeseriesResultValueTechDebt) -> Union[str,None]: - result = insert_timeseries_result_values(result_value) - if result is not None: - return result - # PRT - long term we would like to remove dataloader database but for now - # this block of code keeps dataloaderinterface_sensormeasurement table in sync - try: - query_result = sync_dataloader_tables(result_value) - query_result = sync_result_table(result_value) - return None - except Exception as e: - return None - #dataloader utility function -def get_site_sensor(resultid:str) -> Union[Dict[str, Any],None]: - with _db_engine.connect() as connection: - query = text('SELECT * FROM public.dataloaderinterface_sitesensor ' \ - 'WHERE "ResultID"=:resultid;' - ) - df = pd.read_sql(query, connection, params={'resultid':resultid}) - return df.to_dict(orient='records')[0] - -#dataloader utility function -def check_sensormeasurement(sensor_id:str, result_value:TimeseriesResultValueTechDebt) -> None: - with _db_engine.connect() as connection: - query = text('SELECT COUNT(sensor_id) FROM public.dataloaderinterface_sensormeasurement ' \ - 'WHERE sensor_id=:sensor_id; ') - result = connection.execute(query, - sensor_id=sensor_id - ) - for r in result: - if r[0] == 1: return None - query = text('INSERT INTO public.dataloaderinterface_sensormeasurement ' \ - "VALUES (:sensor_id, :datetime,:utc_offset,:data_value); ") - result = connection.execute(query, - sensor_id=sensor_id, - datetime=result_value.value_datetime, - utc_offset=timedelta(hours=result_value.utc_offset), - data_value=result_value.data_value - ) +def update_sensormeasurements(result_values:TimeseriesResultValueTechDebt, connection) -> None: + query = text('INSERT INTO public.dataloaderinterface_sensormeasurement as m ' + 'VALUES ((select id from public.dataloaderinterface_sitesensor ' + 'where "ResultID" = :result_id limit 1), :datetime, :utc_offset, :data_value) ' + 'on conflict (sensor_id) do update set ' + 'value_datetime = excluded.value_datetime, ' + 'value_datetime_utc_offset = excluded.value_datetime_utc_offset, ' + 'data_value = excluded.data_value ' + 'where m.value_datetime < excluded.value_datetime;') + result = connection.execute(query, + [{ + "result_id": result_value.result_id, + "datetime": result_value.value_datetime, + "utc_offset": timedelta(hours=result_value.utc_offset), + "data_value": result_value.data_value, + } for result_value in result_values] + ) #dataloader utility function -def sync_dataloader_tables(result_value: TimeseriesResultValueTechDebt) -> None: - site_sensor = get_site_sensor(result_value.result_id) - if not site_sensor: return None - result = check_sensormeasurement(site_sensor['id'], result_value) +def set_deployment_date(sample_feature_id:int, date_time:datetime, connection) -> None: + query = text('UPDATE public.dataloaderinterface_siteregistration '\ + 'SET "DeploymentDate"=:date_time '\ + 'WHERE "DeploymentDate" IS NULL AND ' \ + '"SamplingFeatureID"=:sample_feature_id' ) + result = connection.execute(query, + sample_feature_id=sample_feature_id, + date_time=date_time + ) return None -#dataloader utility function -def set_deployment_date(sample_feature_id:int, date_time:datetime) -> None: - with _db_engine.connect() as connection: - query = text('UPDATE public.dataloaderinterface_siteregistration '\ - 'SET "DeploymentDate"=:date_time '\ - 'WHERE "DeploymentDate" IS NULL AND ' \ - '"SamplingFeatureID"=:sample_feature_id' ) - result = connection.execute(query, - sample_feature_id=sample_feature_id, - date_time=date_time - ) - return None - -def sync_result_table(result_value: TimeseriesResultValueTechDebt) -> None: - with _db_engine.connect() as connection: - query = text("UPDATE odm2.results SET valuecount = valuecount + 1, " \ - "resultdatetime = GREATEST(:result_datetime, resultdatetime)" \ - "WHERE resultid=:result_id; ") - result = connection.execute(query, - result_id=result_value.result_id, - result_datetime=result_value.value_datetime, - ) - return result - -def insert_timeseries_result_values(result_value : TimeseriesResultValueTechDebt) -> None: +def sync_result_table(result_values: TimeseriesResultValueTechDebt, num_measurements, connection) -> None: + query = text("UPDATE odm2.results SET valuecount = valuecount + :num_measurements, " \ + "resultdatetime = GREATEST(:result_datetime, resultdatetime)" \ + "WHERE resultid=:result_id; ") + result = connection.execute(query, + [{ + "result_id": result_value.result_id, + "result_datetime": result_value.value_datetime, + "num_measurements": num_measurements, + } for result_value in result_values] + ) + return result + +def insert_timeseries_result_values(result_values : TimeseriesResultValueTechDebt, connection=None) -> None: try: - with _db_engine.connect() as connection: - query = text("INSERT INTO odm2.timeseriesresultvalues " \ - "(valueid, resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \ - "censorcodecv, qualitycodecv, timeaggregationinterval, timeaggregationintervalunitsid) " \ - "VALUES ( " \ - "(SELECT nextval('odm2.\"timeseriesresultvalues_valueid_seq\"'))," \ - ":result_id, " \ - ":data_value, " \ - ":value_datetime, " \ - ":utc_offset, " \ - ":censor_code, " \ - ":quality_code, " \ - ":time_aggregation_interval, " \ - ":time_aggregation_interval_unit);") - result = connection.execute(query, - result_id=result_value.result_id, - data_value=result_value.data_value, - value_datetime=result_value.value_datetime, - utc_offset=result_value.utc_offset, - censor_code=result_value.censor_code, - quality_code=result_value.quality_code, - time_aggregation_interval=result_value.time_aggregation_interval, - time_aggregation_interval_unit=result_value.time_aggregation_interval_unit, - ) - return None - except sqlalchemy.exc.IntegrityError as e: - if hasattr(e, 'orig'): - if isinstance(e.orig, psycopg2.errors.UniqueViolation): - #data is already in database - return None - else: - return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") - else: - return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") + query = text("INSERT INTO odm2.timeseriesresultvalues " \ + "(resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " \ + "censorcodecv, qualitycodecv, timeaggregationinterval, timeaggregationintervalunitsid) " \ + "VALUES ( " \ + ":result_id, " \ + ":data_value, " \ + ":value_datetime, " \ + ":utc_offset, " \ + ":censor_code, " \ + ":quality_code, " \ + ":time_aggregation_interval, " \ + ":time_aggregation_interval_unit) on conflict do nothing;") + result = connection.execute(query, + [vars(v) for v in result_values] + ) + return None except Exception as e: - return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") \ No newline at end of file + return (f"Failed to INSERT data for uuid('{result_value.result_uuid}')") + +def insert_timeseries_result_values_bulk(result_values, connection): + connection.execute(text("create temporary table upload " + "(resultid bigint NOT NULL," + "datavalue double precision NOT NULL," + "valuedatetime timestamp NOT NULL," + "valuedatetimeutcoffset integer NOT NULL," + "censorcodecv varchar (255) NOT NULL," + "qualitycodecv varchar (255) NOT NULL," + "timeaggregationinterval double precision NOT NULL," + "timeaggregationintervalunitsid integer NOT NULL) on commit drop;")) + + result_data = StringIO() + for rv in result_values: + result_data.write(f"{rv.result_id}\t{rv.data_value}\t" + f"{rv.value_datetime}\t{rv.utc_offset}\t{rv.censor_code}\t" + f"{rv.quality_code}\t{rv.time_aggregation_interval}\t" + f"{rv.time_aggregation_interval_unit}\n") + result_data.seek(0) + + cursor = connection.connection.cursor() + cursor.copy_from(result_data, "upload") + cursor.close() + + connection.execute(text("insert into odm2.timeseriesresultvalues " + "(resultid, datavalue, valuedatetime, valuedatetimeutcoffset, " + "censorcodecv, qualitycodecv, timeaggregationinterval, " + "timeaggregationintervalunitsid) " + "(select * from upload) on conflict do nothing;")) diff --git a/src/odm2/base.py b/src/odm2/base.py index 57a83106..f46cd60d 100644 --- a/src/odm2/base.py +++ b/src/odm2/base.py @@ -230,7 +230,10 @@ def __init__(self, engine:sqlalchemy.engine, schema:str='odm2', cache_path:str=N self._model_base = self._prepare_model_base() self.models = Models(self._model_base) if not self._cached: - self._prepare_automap_models() + try: + self._prepare_automap_models() + except sqlalchemy.exc.OperationalError: + warnings.warn('Unable to prepare models, is database up?', RuntimeWarning) def _prepare_model_base(self): try: diff --git a/src/streamwatch/forms.py b/src/streamwatch/forms.py index 0ae7eefb..22d8e104 100644 --- a/src/streamwatch/forms.py +++ b/src/streamwatch/forms.py @@ -16,11 +16,6 @@ (3, 'Choice #3') ) -user_affiliations = [ - affiliation[0] - for affiliation - in get_user_model().objects.filter(affiliation_id__isnull=False).values_list('affiliation_id') -] measurement_method_choices = (('Meter','Meter'), ('Lamotte', 'Lamotte')) class MDLCheckboxSelectMultiple(forms.CheckboxSelectMultiple): @@ -44,13 +39,13 @@ class SetupForm(forms.Form): investigator1 = forms.ModelChoiceField( - queryset=Affiliation.objects.filter(affiliation_id__in=(user_affiliations)).for_display(), + queryset=Affiliation.objects.filter(affiliation_id__in=(get_user_model().objects.filter(affiliation_id__isnull=False).values_list('affiliation_id', flat=True))).for_display(), required=True, help_text='Select a user as the main investigator', label='Investigator #1' ) investigator2 = forms.ModelChoiceField( - queryset=Affiliation.objects.filter(affiliation_id__in=(user_affiliations)).for_display(), + queryset=Affiliation.objects.filter(affiliation_id__in=(get_user_model().objects.filter(affiliation_id__isnull=False).values_list('affiliation_id', flat=True))).for_display(), required=False, help_text='Select a user as the secondary investigator', label='Investigator #2'