diff --git a/CHANGELOG.md b/CHANGELOG.md index 11c18862..d4a3e7bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,23 +13,57 @@ Types of changes: - `Fixed`: for any bug fixes. - `Security`: in case of vulnerabilities. +## [1.11.0] + +### Added + +- Added three new fields to the Host mode (`object_id`, `catalog_name`, `catalog_release`) to capture the + source catalog information provided by the host matcher. +- Added support for input arguments to custom functions invoked by the `blast_admin` Django custom + management command. See `blast_admin.py` module docstring for details. + +### Changed + +- The `HostMatch._run_process()` function was altered to treat any unhandled exception as a failure with a + corresponding "failed" status message. Previously, an unhandled exception would result in a misleading + "no host match" status. +- Additional logic was added to the host matching task to avoid creating duplicate `Host` objects: The host + information returned by Prost is compared against existing Host objects by cone search and by catalog information. +- Updated the host information card displayed on result pages to include the new catalog information. +- Removed redundant call to `initialize_all_tasks_status()` in `reprocess_transient()` + +### Removed + +- Removed the obsolete "Initialize transient task" periodic task. Workflows for new transients ingested + from TNS are now triggered upon discovery. Thus, all pathways for adding new transients now automatically + initialize and trigger workflows immediately, eliminating the need for this periodic task. +- Removed unused TransientImportForm +- Removed unused CatalogManage + +### Fixed + +- Fixed a bug in the dataset archive importer to support importing transients lacking host information. + ## [1.10.0] ### Deprecated -- The `log_age` columns (`log_age_16`, `log_age_50`, `log_age_84`) in the `SEDFittingResult` model are deprecated. They have been replaced by - corresponding `age` columns and will be removed in a future release. +- The `log_age` columns (`log_age_16`, `log_age_50`, `log_age_84`) in the `SEDFittingResult` model are + deprecated. They have been replaced by corresponding `age` columns and will be removed in a future release. ### Changed -- Updated documentation to reflect how the `log_age` columns in the `SEDFittingResult` model will be deprecated and replaced by the `age` columns. +- Updated documentation to reflect how the `log_age` columns in the `SEDFittingResult` model will be + deprecated and replaced by the `age` columns. - Updated units in documentation for above columns from log years to gigayears. - Prospector results will feed the age info to the `age` columns as well. -- Updated archive file import algorithm to populate the `age` values from `log_age` values if the `age` keys are missing. +- Updated archive file import algorithm to populate the `age` values from `log_age` values if the `age` + keys are missing. ### Fixed -- Added `age` columns to the `SEDFittingResult` model to accurately reflect how the age is in gigayears and not in `log_10` years as implied by the `log_age` columns. +- Added `age` columns to the `SEDFittingResult` model to accurately reflect how the age is in gigayears + and not in `log_10` years as implied by the `log_age` columns. ## [1.9.5] diff --git a/app/app/settings.py b/app/app/settings.py index 92719372..50d9e12f 100644 --- a/app/app/settings.py +++ b/app/app/settings.py @@ -4,7 +4,7 @@ ###################################################################### # Blast application config # -APP_VERSION = '1.10.0' +APP_VERSION = '1.11.0' # Data paths DUSTMAPS_DATA_ROOT = os.environ.get("DUSTMAPS_DATA_ROOT", "/data/dustmaps") CUTOUT_ROOT = os.environ.get("CUTOUT_ROOT", "/data/cutout_cdn") diff --git a/app/host/debug_utils.py b/app/host/debug_utils.py index 84f09df7..89e1e0b9 100644 --- a/app/host/debug_utils.py +++ b/app/host/debug_utils.py @@ -56,7 +56,6 @@ def rerun_failed_task(task_register): GlobalHostSEDFitting(task_register.transient.name), LocalHostSEDFitting(task_register.transient.name), TNSDataIngestion(), - InitializeTransientTasks(), IngestMissedTNSTransients(), ] diff --git a/app/host/forms.py b/app/host/forms.py index ecf5cb65..9d10600b 100644 --- a/app/host/forms.py +++ b/app/host/forms.py @@ -30,10 +30,6 @@ def __init__(self, *args, **kwargs): ) -class TransientImportForm(forms.Form): - file = forms.FileField() - - class TransientUploadForm(forms.Form): tns_names = forms.CharField( widget=forms.Textarea(attrs={ diff --git a/app/host/host_utils.py b/app/host/host_utils.py index ea08a414..0f57564f 100644 --- a/app/host/host_utils.py +++ b/app/host/host_utils.py @@ -1030,48 +1030,49 @@ def process_transient_dataset(dataset): # TODO: How concerned should we be about duplicates? Should we perform a cone search instead of assuming # perfect coordinate matching? Should the redshift values be updated from the imported data if they are # missing? - host_name = dataset['host']['fields']['name'] - ra_deg = dataset['host']['fields']['ra_deg'] - dec_deg = dataset['host']['fields']['dec_deg'] - cone_search = (Q(ra_deg__gte=ra_deg - ARCSEC_RA_IN_DEG) - & Q(ra_deg__lte=ra_deg + ARCSEC_RA_IN_DEG) - & Q(dec_deg__gte=dec_deg - ARCSEC_DEC_IN_DEG) - & Q(dec_deg__lte=dec_deg + ARCSEC_DEC_IN_DEG)) - proximate_hosts = Host.objects.filter(cone_search) - if proximate_hosts: - logger.info(f'''{len(proximate_hosts)} existing hosts were found within an arcsecond of ''' - f'''importing host "{host_name}".''') host = None - # If there is an existing proximate host for an unnamed host, claim this is the same host - if not host_name and proximate_hosts: - host = proximate_hosts[0] - elif host_name: - # Find existing hosts with the same name - host_search = Host.objects.filter(name__exact=host_name) - if host_search: - # If the host name matches, require that the position overlaps - proximity_search = host_search.filter(cone_search) - # Consider the import a failure if there is an inconsistent host definition - if not proximity_search: - record_import_error(transient_name, - f'[{transient_name}] Host with matching name "{host_name}" ' - f'exists, but it is in a different location.') - return - # If the name and location match, claim this is the same host - host = proximity_search[0] - # If no host match was found, create a new Host object - if not host: - host = Host.objects.create( - ra_deg=dataset['host']['fields']['ra_deg'], - dec_deg=dataset['host']['fields']['dec_deg'], - name=dataset['host']['fields']['name'], - redshift=dataset['host']['fields']['redshift'], - redshift_err=dataset['host']['fields']['redshift_err'], - photometric_redshift=dataset['host']['fields']['photometric_redshift'], - photometric_redshift_err=dataset['host']['fields']['photometric_redshift_err'], - milkyway_dust_reddening=dataset['host']['fields']['milkyway_dust_reddening'], - software_version=dataset['host']['fields']['software_version'], - ) + if dataset['host']: + host_name = dataset['host']['fields']['name'] + ra_deg = dataset['host']['fields']['ra_deg'] + dec_deg = dataset['host']['fields']['dec_deg'] + cone_search = (Q(ra_deg__gte=ra_deg - ARCSEC_RA_IN_DEG) + & Q(ra_deg__lte=ra_deg + ARCSEC_RA_IN_DEG) + & Q(dec_deg__gte=dec_deg - ARCSEC_DEC_IN_DEG) + & Q(dec_deg__lte=dec_deg + ARCSEC_DEC_IN_DEG)) + proximate_hosts = Host.objects.filter(cone_search) + if proximate_hosts: + logger.info(f'''{len(proximate_hosts)} existing hosts were found within an arcsecond of ''' + f'''importing host "{host_name}".''') + # If there is an existing proximate host for an unnamed host, claim this is the same host + if not host_name and proximate_hosts: + host = proximate_hosts[0] + elif host_name: + # Find existing hosts with the same name + host_search = Host.objects.filter(name__exact=host_name) + if host_search: + # If the host name matches, require that the position overlaps + proximity_search = host_search.filter(cone_search) + # Consider the import a failure if there is an inconsistent host definition + if not proximity_search: + record_import_error(transient_name, + f'[{transient_name}] Host with matching name "{host_name}" ' + f'exists, but it is in a different location.') + return + # If the name and location match, claim this is the same host + host = proximity_search[0] + # If no host match was found, create a new Host object + if not host: + host = Host.objects.create( + ra_deg=dataset['host']['fields']['ra_deg'], + dec_deg=dataset['host']['fields']['dec_deg'], + name=dataset['host']['fields']['name'], + redshift=dataset['host']['fields']['redshift'], + redshift_err=dataset['host']['fields']['redshift_err'], + photometric_redshift=dataset['host']['fields']['photometric_redshift'], + photometric_redshift_err=dataset['host']['fields']['photometric_redshift_err'], + milkyway_dust_reddening=dataset['host']['fields']['milkyway_dust_reddening'], + software_version=dataset['host']['fields']['software_version'], + ) # Verify that the Cutout objects do not exist (by name). for cutout in dataset['cutouts']: cutout_name = cutout['fields']['name'] @@ -1268,7 +1269,7 @@ def process_transient_dataset(dataset): tr_obj.save() # Calculate workflow progress and mark tasks as initialized so retriggering works. transient.progress, transient.processing_status = get_processing_status_and_progress(transient) - transient.tasks_initialized = True + transient.tasks_initialized = "True" transient.save() # Record successful database import imported_transient_names.append(transient.name) diff --git a/app/host/management/commands/blast_admin.py b/app/host/management/commands/blast_admin.py index 6ebda907..a5f58df1 100644 --- a/app/host/management/commands/blast_admin.py +++ b/app/host/management/commands/blast_admin.py @@ -5,12 +5,15 @@ The usage syntax is as follows: - python manage.py blast_admin [func_name] + python manage.py blast_admin [func_name] --input_args '{"arg1": "val1", "arg2": "val2"}' where "func_name()" is defined in either "util.py" (public code) or "local_util.py" -(local scratch ignored by Git). +(local scratch ignored by Git), and --input_args is a JSON-formatted string containing either +a list of scalar values to pass as positional arguments to "func_name" or a dictionary to be +passed as keyword arguments. """ -from django.core.management.base import BaseCommand +from django.core.management.base import BaseCommand, CommandError +import json from host.log import get_logger logger = get_logger(__name__) from .util import * # noqa: F401,F403 @@ -25,13 +28,35 @@ class Command(BaseCommand): help = "Run scratch function" def add_arguments(self, parser): - parser.add_argument("func_name", type=str) + parser.add_argument('func_name', type=str, + help="Fully-qualified function name to call, e.g. 'myapp.utils.process_item'") + parser.add_argument('--input_args', type=str, default='[]', + help=( + "JSON string representing the function arguments. " + "Use a list for positional args or an object for keyword args. " + )) def handle(self, *args, **options): + func = options['func_name'] + + # Parse the JSON argument payload. + # Accept either a list (positional args) or dict (keyword args). + raw_args = options['input_args'] try: - eval(f'''{options['func_name']}()''') - except NameError as err: - logger.error(err) - if not scratch_module_exists: - logger.error('''Custom functions must be defined in a local ''' - '''"app/host/management/commands/local_util.py" module.''') + parsed = json.loads(raw_args) + except json.JSONDecodeError as err: + raise CommandError(f"Could not parse JSON args: {err}") + + if isinstance(parsed, list): + call_args = parsed + call_kwargs = {} + elif isinstance(parsed, dict): + # Decide: if keys are numeric-string and user intended positional args, they should pass a list. + call_args = [] + call_kwargs = parsed + else: + # Single scalar -> pass as single positional arg + call_args = [parsed] + call_kwargs = {} + + eval(f'''{func}(*{call_args}, **{call_kwargs})''') diff --git a/app/host/managers.py b/app/host/managers.py index a4978169..70c2e762 100644 --- a/app/host/managers.py +++ b/app/host/managers.py @@ -8,7 +8,6 @@ logger = get_logger(__name__) - class TransientManager(models.Manager): def get_by_natural_key(self, name): return self.get(name=name) @@ -29,11 +28,6 @@ def get_by_natural_key(self, name): return self.get(name=name) -class CatalogManager(models.Manager): - def get_by_natural_key(self, name): - return self.get(name=name) - - class FilterManager(models.Manager): def get_by_natural_key(self, name): return self.get(name=name) diff --git a/app/host/migrations/0049_host_info.py b/app/host/migrations/0049_host_info.py new file mode 100644 index 00000000..cb96490e --- /dev/null +++ b/app/host/migrations/0049_host_info.py @@ -0,0 +1,42 @@ +# Generated by Django 5.1.14 on 2026-04-29 18:17 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('host', '0048_sedfittingresult_age_16_sedfittingresult_age_50_and_more'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AddField( + model_name='host', + name='catalog_name', + field=models.CharField(max_length=100, null=True), + ), + migrations.AddField( + model_name='host', + name='catalog_release', + field=models.CharField(max_length=100, null=True), + ), + migrations.AddField( + model_name='host', + name='object_id', + field=models.CharField(blank=True, max_length=100, null=True), + ), + migrations.AlterField( + model_name='transient', + name='added_by', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, + to=settings.AUTH_USER_MODEL), + ), + migrations.AlterField( + model_name='transient', + name='host', + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, to='host.host') + ), + ] diff --git a/app/host/models.py b/app/host/models.py index 193f3070..3071bd5d 100644 --- a/app/host/models.py +++ b/app/host/models.py @@ -16,7 +16,6 @@ import re from .managers import ApertureManager -from .managers import CatalogManager from .managers import CutoutManager from .managers import FilterManager from .managers import HostManager @@ -92,6 +91,9 @@ class Host(SkyObject): photometric_redshift = models.FloatField(null=True, blank=True) photometric_redshift_err = models.FloatField(null=True, blank=True) milkyway_dust_reddening = models.FloatField(null=True, blank=True) + object_id = models.CharField(max_length=100, blank=True, null=True) + catalog_name = models.CharField(max_length=100, blank=False, null=True) + catalog_release = models.CharField(max_length=100, blank=False, null=True) objects = HostManager() software_version = models.CharField(max_length=50, blank=True, null=True) @@ -138,16 +140,17 @@ def validate_name(name): raise ValidationError(f'''Invalid transient identifier: "{name}" must begin and end with alphanumeric ''' '''characters, and may include underscores and hyphens. ''' '''Spaces are not allowed.''') - if name.find('--') > -1 or name.find('__') > -1: - raise ValidationError(f'''Invalid transient identifier: "{name}" may not contain consecutive ''' - '''underscores or hyphens.''') + for nonconsecutive_char in '-_': + if name.find(nonconsecutive_char * 2) > -1: + raise ValidationError(f'''Invalid transient identifier: "{name}" may not contain consecutive ''' + f'''"{nonconsecutive_char}" characters.''') name = models.CharField(max_length=64, unique=True, validators=[validate_name]) display_name = models.CharField(null=True, blank=True) tns_id = models.IntegerField() tns_prefix = models.CharField(max_length=20) public_timestamp = models.DateTimeField(null=True, blank=True) - host = models.ForeignKey(Host, on_delete=models.CASCADE, null=True, blank=True) + host = models.ForeignKey(Host, on_delete=models.SET_NULL, null=True, blank=True) objects = TransientManager() tasks_initialized = models.CharField(max_length=20, default="False") redshift = models.FloatField(null=True, blank=True) @@ -155,7 +158,7 @@ def validate_name(name): photometric_class = models.CharField(max_length=20, null=True, blank=True) milkyway_dust_reddening = models.FloatField(null=True, blank=True) processing_status = models.CharField(max_length=20, default="processing") - added_by = models.ForeignKey(User, null=True, blank=True, on_delete=models.CASCADE) + added_by = models.ForeignKey(User, null=True, blank=True, on_delete=models.SET_NULL) progress = models.IntegerField(default=0) software_version = models.CharField(max_length=50, blank=True, null=True) diff --git a/app/host/prost.py b/app/host/prost.py index 9a5c8a0c..b979faee 100644 --- a/app/host/prost.py +++ b/app/host/prost.py @@ -1,107 +1,194 @@ import os from shutil import rmtree from django.conf import settings +from django.db.models import Q from .models import Host from astropy.coordinates import SkyCoord +from astropy import units as u # Prost dependencies import pandas as pd from scipy.stats import gamma, halfnorm, uniform # from astropy.cosmology import LambdaCDM from astro_prost.helpers import SnRateAbsmag from astro_prost.associate import associate_sample +from host.host_utils import ARCSEC_DEC_IN_DEG, ARCSEC_RA_IN_DEG +from host.log import get_logger +logger = get_logger(__name__) -def run_prost(transient, output_dir_root=settings.PROST_OUTPUT_ROOT): +def update_redshifts(host, host_redshift_info, host_redshift_mean, host_redshift_std, catalog_name): + """Update redshift null values only; do not change existing values""" + if host_redshift_info == 'SPEC': + host.redshift = host.redshift if host.redshift else host_redshift_mean + host.redshift_err = host.redshift_err if host.redshift_err else host_redshift_std + elif host_redshift_info == 'PHOT' and catalog_name != 'panstarrs': + host.photometric_redshift = host.photometric_redshift if host.photometric_redshift else host_redshift_mean + host.photometric_redshift_err = host.photometric_redshift_err if host.photometric_redshift_err else host_redshift_std # noqa: E501 + return host + + +def name_from_coords(ra_deg, dec_deg): """ - Finds the information about the host galaxy given the position of the supernova. - Parameters - ---------- - :position : :class:`~astropy.coordinates.SkyCoord` - On Sky position of the source to be matched. - :name : str, default='No name' - Name of the the object. - Returns - ------- - :host_information : ~astropy.coordinates.SkyCoord` - Host position + Generate a name from position coordinates using the International Celestial Reference Frame (ICRF) + and standard J2000 epoch notation. """ - transient_position = SkyCoord( - ra=transient.ra_deg, dec=transient.dec_deg, unit="deg" - ) + host_coord = SkyCoord(ra=ra_deg, dec=dec_deg, unit='deg') + name = (f'''J{host_coord.ra.to_string(unit=u.hour, precision=2, sep='', pad=True)}''' + f'''{host_coord.dec.to_string(unit=u.degree, precision=2, sep='', alwayssign=True, pad=True)}''') + return name - # Define and create output file root directory - output_dir = os.path.join(output_dir_root, transient.name) - os.makedirs(output_dir, exist_ok=True) +def run_prost(transient, output_dir_root=settings.PROST_OUTPUT_ROOT): + """Uses Prost to identify the likely host galaxy for the input transient.""" + transient_catalog = pd.DataFrame({ + 'IAUID': [transient.name], + 'RA': [transient.sky_coord.ra.deg], + 'Dec': [transient.sky_coord.dec.deg] + }) # define priors for properties - priorfunc_z = halfnorm(loc=0.0001, scale=0.5) - priorfunc_offset = uniform(loc=0, scale=5) - priorfunc_absmag = uniform(loc=-30, scale=20) - - # cosmo = LambdaCDM(H0=70, Om0=0.3, Ode0=0.7) - - likefunc_offset = gamma(a=0.75) - likefunc_absmag = SnRateAbsmag(a=-25, b=20) - priors = { - "offset": priorfunc_offset, - "absmag": priorfunc_absmag + "offset": uniform(loc=0, scale=5), + "absmag": uniform(loc=-30, scale=20) } - likes = { - "offset": likefunc_offset, - "absmag": likefunc_absmag - } - - transient_catalog = pd.DataFrame( - {'IAUID': [transient.name], - 'RA': [transient_position.ra.deg], - 'Dec': [transient_position.dec.deg] - } - ) # add the redshift info from the transient if it exists if transient.redshift is not None: - priors['redshift'] = priorfunc_z + priors['redshift'] = halfnorm(loc=0.0001, scale=0.5) transient_catalog['redshift'] = transient.redshift - catalogs = ["glade", "decals", "panstarrs", "skymapper"] - transient_coord_cols = ("RA", "Dec") - # transient_name_col = "IAUID" - parallel = False - save = True - progress_bar = False - cat_cols = False + # Define and create output file root directory + output_dir = os.path.join(output_dir_root, transient.name) + os.makedirs(output_dir, exist_ok=True) + # If host matching throws an unhandled exception, raise it to let the TaskRunner + # catch it and mark the task status as failed. try: hosts = associate_sample( transient_catalog, - coord_cols=transient_coord_cols, + catalogs=["glade", "decals", "panstarrs", "skymapper"], + coord_cols=("RA", "Dec"), priors=priors, - likes=likes, - catalogs=catalogs, - parallel=parallel, - save=save, - save_path=output_dir, - progress_bar=progress_bar, - cat_cols=cat_cols, + likes={ + "offset": gamma(a=0.75), + "absmag": SnRateAbsmag(a=-25, b=20), + }, verbose=0, + parallel=False, + save=True, + save_path=output_dir, + cat_cols=False, + progress_bar=False, ) - host = Host( - ra_deg=hosts["host_ra"][0], - dec_deg=hosts["host_dec"][0], - name=hosts["host_name"][0], - ) - if hosts['host_redshift_info'][0] == 'SPEC': - host.redshift = hosts["host_redshift_mean"][0] - host.redshift_err = hosts["host_redshift_std"][0] - elif hosts['host_redshift_info'][0] == 'PHOT' and hosts['best_cat'][0] != 'panstarrs': - host.photometric_redshift = hosts["host_redshift_mean"][0] - host.photometric_redshift_err = hosts["host_redshift_std"][0] finally: - # Cleanup Prost file cache - # TODO: Over time we may accumulate Prost temp files that are not deleted + # Cleanup scratch file cache + # Note: Depending on the nature of the volume backing PROST_OUTPUT_ROOT, + # over time temp files may accumulate that are not deleted # due to processes aborted before this block is executed. More robust - # garbage collection may be necessary, such as a periodic task triggered - # by Celery Beat. + # garbage collection may be necessary. rmtree(output_dir, ignore_errors=True) + result = {'host': None, 'error': '', 'new': False} + try: + catalog_name = hosts["best_cat"][0] + catalog_release = hosts["best_cat_release"][0] + except KeyError as err: + result['error'] = f'Host matcher did not return expected catalog info fields: {err}' + return result + # If no catalog info was provided, there was no match. + if not catalog_name or not catalog_release: + return result + # If a match was found, the following fields should be available: + try: + name = hosts["host_name"][0] + object_id = hosts["host_objID"][0] + ra_deg = hosts["host_ra"][0] + dec_deg = hosts["host_dec"][0] + host_redshift_info = hosts['host_redshift_info'][0] + host_redshift_mean = hosts["host_redshift_mean"][0] + host_redshift_std = hosts["host_redshift_std"][0] + except KeyError as err: + result['error'] = f'Host matcher did not return expected data fields: {err}' + return result + # If a name is not supplied, generate a name based on the position. + if not name: + name = name_from_coords(ra_deg, dec_deg) + # If the host already exists in the database, use it instead of creating a new Host object. + cone_search = (Q(ra_deg__gte=ra_deg - ARCSEC_RA_IN_DEG) + & Q(ra_deg__lte=ra_deg + ARCSEC_RA_IN_DEG) + & Q(dec_deg__gte=dec_deg - ARCSEC_DEC_IN_DEG) + & Q(dec_deg__lte=dec_deg + ARCSEC_DEC_IN_DEG)) + # + # CASE A: There is an existing object with the same catalog info (object ID, catalog name & release) + # + catalog_info_search = (Q(object_id__exact=object_id) + & Q(catalog_name__exact=catalog_name) + & Q(catalog_release__exact=catalog_release)) + matching_catalog_info = Host.objects.filter(catalog_info_search) + if matching_catalog_info: + matching_position = matching_catalog_info.filter(cone_search) + # CASE A1: Existing object is at same position + # If existing object name is empty, populate with match's name. + # Return existing object. + if matching_position: + logger.debug(f'''{len(matching_position)} existing hosts were found within an arcsecond of ''' + f'''host "{name}" ("{object_id}" from catalog "{catalog_name}", release "{catalog_release}")''' + ''' returned by host matcher.''') + host = matching_position[0] + # Update the host name field if it is empty + host.name = host.name if host.name else name + result['host'] = host + return result + # CASE A2: Existing object with same non-empty object ID is at different position. + # This should not occur. Treat as error and fail. + if object_id: + result['error'] = 'Existing Host object with matching catalog info has different position.' + return result + # CASE A3: Existing object has a blank object ID and is at different position. + # This should be treated as a new Host object. + else: + host = Host(ra_deg=ra_deg, dec_deg=dec_deg, name=name, object_id=object_id, + catalog_name=catalog_name, catalog_release=catalog_release) + result['new'] = True + result['host'] = update_redshifts(host, host_redshift_info, host_redshift_mean, host_redshift_std, + catalog_name) + return result + # + # CASE B: There is an existing object at the same position but not with the same catalog info. + # + matching_position = Host.objects.filter(cone_search) + if matching_position: + logger.debug(f'''{len(matching_position)} existing hosts were found within an arcsecond of ''' + f'''host "{name}" ("{object_id}" from catalog "{catalog_name}", release "{catalog_release}") ''' + '''returned by host matcher.''') + no_catalog_info = matching_position.filter((Q(catalog_name__exact='') | Q(catalog_name__isnull=True)) + & (Q(catalog_release__exact='') | Q(catalog_release__isnull=True))) + # CASE B1: Existing object has no catalog info. + # If existing object name is empty, populate with match's name. + # Populate with match's catalog info and return the object. + if no_catalog_info: + host = no_catalog_info[0] + host.name = host.name if host.name else name + host.object_id = object_id + host.catalog_name = catalog_name + host.catalog_release = catalog_release + result['host'] = update_redshifts(host, host_redshift_info, host_redshift_mean, host_redshift_std, + catalog_name) + return result - return host + # CASE B2: Existing object with different object ID from same catalog release is at the same position. + # This should not occur. Treat as error and fail. + catalog_match = matching_position.filter(Q(catalog_name__exact=catalog_name) + & Q(catalog_release__exact=catalog_release)) + if catalog_match: + result['error'] = 'Existing Host object with matching position and catalog release has different object ID.' + return result + else: + logger.debug('''No existing hosts were found within an arcsecond of ''' + f'''host "{name}" ("{object_id}" from catalog "{catalog_name}", release "{catalog_release}") ''' + '''returned by host matcher.''') + # + # CASE C: There is no existing host with consistent catalog info and position. + # + host = Host(ra_deg=ra_deg, dec_deg=dec_deg, name=name, object_id=object_id, + catalog_name=catalog_name, catalog_release=catalog_release) + result['new'] = True + result['host'] = update_redshifts(host, host_redshift_info, host_redshift_mean, host_redshift_std, catalog_name) + return result diff --git a/app/host/system_tasks.py b/app/host/system_tasks.py index c853e99c..21e15338 100644 --- a/app/host/system_tasks.py +++ b/app/host/system_tasks.py @@ -5,7 +5,6 @@ from django.core import serializers from django.db.models import Q from datetime import datetime, timedelta, timezone -from host.base_tasks import initialize_all_tasks_status from host.base_tasks import SystemTaskRunner from host.base_tasks import task_soft_time_limit from host.base_tasks import task_time_limit @@ -51,17 +50,16 @@ def run_process(self, interval_minutes=200): now - time_delta, tns_credentials=tns_credentials ) logger.info("TNS query complete.") - saved_transients = Transient.objects.all() count = 0 logger.info(f'Processing transient imported from TNS: "{[tr.name for tr in transients_from_tns]}"') for transient_from_tns in transients_from_tns: - # If the transient has not already been ingested, save the TNS # data and proceed to the next transient - saved_transient = saved_transients.filter(name__exact=transient_from_tns.name) + saved_transient = Transient.objects.filter(name__exact=transient_from_tns.name) if not saved_transient: transient_from_tns.save() count += 1 + transient_workflow.delay(transient_from_tns.name) continue # If the transient was previously ingested, compare to the incoming TNS data. saved_transient = saved_transient[0] @@ -90,7 +88,7 @@ def run_process(self, interval_minutes=200): saved_transient.spectroscopic_class = transient_from_tns.spectroscopic_class saved_transient.redshift = transient_from_tns.redshift # Reinitialize the transient state so that its processing workflow will run again if necessary. - saved_transient.tasks_initialized = False + saved_transient.tasks_initialized = "False" saved_transient.save() # If the redshift value was updated, reprocess the entire workflow. @@ -113,26 +111,6 @@ def task_initially_enabled(self): return True -class InitializeTransientTasks(SystemTaskRunner): - def run_process(self): - """ - Initializes all task in the database to not processed for new transients. - """ - - uninitialized_transients = Transient.objects.filter( - tasks_initialized__exact="False" - ) - for transient in uninitialized_transients: - initialize_all_tasks_status(transient) - transient.tasks_initialized = "True" - transient.save() - transient_workflow.delay(transient.name) - - @property - def task_name(self): - return "Initialize transient task" - - class IngestMissedTNSTransients(SystemTaskRunner): def run_process(self): """ @@ -187,7 +165,7 @@ def task_frequency_seconds(self): @property def task_initially_enabled(self): - return True + return False class RetriggerIncompleteWorkflows(SystemTaskRunner): @@ -270,14 +248,6 @@ def get_all_active_tasks(): TNSDataIngestion().run_process() -@shared_task( - time_limit=task_time_limit, - soft_time_limit=task_soft_time_limit, -) -def initialize_transient_task(): - InitializeTransientTasks().run_process() - - @shared_task( time_limit=task_time_limit, soft_time_limit=task_soft_time_limit, diff --git a/app/host/tasks.py b/app/host/tasks.py index 0e5e1d45..d06c04ff 100644 --- a/app/host/tasks.py +++ b/app/host/tasks.py @@ -7,7 +7,6 @@ from host.workflow import transient_workflow from host.models import Transient from host.system_tasks import IngestMissedTNSTransients -from host.system_tasks import InitializeTransientTasks from host.system_tasks import TNSDataIngestion from host.system_tasks import RetriggerIncompleteWorkflows from host.system_tasks import UsageLogRoller @@ -24,7 +23,6 @@ periodic_tasks = [ TNSDataIngestion(), - InitializeTransientTasks(), IngestMissedTNSTransients(), RetriggerIncompleteWorkflows(), GarbageCollector(), @@ -78,13 +76,11 @@ def retrigger_transient(request=None, transient_name=''): ) def import_transient_list(transient_names): '''This function assumes that the input transient_names are not in the database.''' - def process_transient(transient_name): - transient_workflow.delay(transient_name) uploaded_transient_names = [] for transient_name in transient_names: logger.info(f'Triggering workflow for new transient "{transient_name}"...') try: - process_transient(transient_name) + transient_workflow.delay(transient_name) uploaded_transient_names.append(transient_name) except Exception as err: logger.error(f'''Error processing new transient: {err}''') diff --git a/app/host/templates/host/information_card.html b/app/host/templates/host/information_card.html index 06430507..45eaf7f1 100644 --- a/app/host/templates/host/information_card.html +++ b/app/host/templates/host/information_card.html @@ -32,7 +32,16 @@
Transient {% if warning %}(
Host
+ {% if transient.host.name %} + {% endif %} + {% if transient.host.object_id %} + + {% endif %} + {% if transient.host.catalog_name and transient.host.catalog_release %} + + + {% endif %} diff --git a/app/host/transient_tasks.py b/app/host/transient_tasks.py index dc5af650..a50db1a7 100644 --- a/app/host/transient_tasks.py +++ b/app/host/transient_tasks.py @@ -296,36 +296,27 @@ def _failed_status_message(self): """ Emit status message for failure consistent with the available Status objects """ - return "no host match" + return "failed" def _run_process(self, transient): """ Run the host matching algorithm. """ - host = run_prost(transient) - - if host is not None: - host.save() - transient.host = host - transient.save() - - # having a weird error here - # possible issues communicating with the database - transient_check = Transient.objects.get(name=transient.name) - if transient_check.host is None: - # let's try twice just in case - transient.host = host - transient.save() - - transient_check = Transient.objects.get(name=transient.name) - if transient_check.host is None: - raise RuntimeError("problem saving transient to the database!") - - status_message = "processed" - else: - status_message = "no host match" - - return status_message + result = run_prost(transient) + error = result['error'] + if error: + logger.error(f'Error with host matching: {error}') + raise Exception(error) + host = result['host'] + if host is None: + return "no host match" + host.save() + transient.host = host + transient.save() + if result['new']: + logger.info(f'''Created new Host object: "{host.name}" (object ID "{host.object_id}" from ''' + f'''catalog "{host.catalog_name}", release "{host.catalog_release}")''') + return "processed" class MWEBV_Transient(TransientTaskRunner): diff --git a/app/host/views.py b/app/host/views.py index 187be8d3..a909c05d 100644 --- a/app/host/views.py +++ b/app/host/views.py @@ -15,7 +15,6 @@ from django_tables2 import RequestConfig from host.forms import ImageGetForm from host.forms import TransientUploadForm -from host.forms import TransientImportForm from host.host_utils import import_transient_info from host.host_utils import select_aperture from host.host_utils import select_best_cutout diff --git a/app/host/workflow.py b/app/host/workflow.py index 4f901d69..502457db 100644 --- a/app/host/workflow.py +++ b/app/host/workflow.py @@ -1,5 +1,4 @@ from celery import chain -from celery import chord from celery import group from celery import shared_task from host.transient_tasks import crop_transient_images @@ -56,8 +55,10 @@ def reprocess_transient(request=None, transient_name=''): assert transient_name try: transient = Transient.objects.get(name__exact=transient_name) - # TODO: This could be smarter. We don't *always* need to re-process every stage. - initialize_all_tasks_status(transient) + # Setting tasks_initialized to false will cause the + # transient tasks to be reset/created when the workflow starts. + transient.tasks_initialized = "False" + transient.save() result = transient_workflow.delay(transient_name) except Transient.DoesNotExist: result = None @@ -76,27 +77,20 @@ def transient_workflow(transient_name=None): assert transient_name try: Transient.objects.get(name__exact=transient_name) - print(f'Transient already exists: "{transient_name}"...') + logger.debug(f'Starting workflow for existing transient "{transient_name}"') except Transient.DoesNotExist: - print(f'Downloading transient info from TNS: "{transient_name}"...') + logger.info(f'''Workflow triggered for transient that does not exist: {transient_name}''') + logger.info(f'''Downloading transient info from TNS for "{transient_name}"''') blast_transients = get_transients_from_tns_by_name([transient_name]) for transient in blast_transients: - # TO DO: User object is not JSON-serializable, and this task is also launched - # by a periodic system task, so we could consider replacing the - # added_by value with a simple string of the username. - # transient.added_by = request.User - transient.save() - print(f'New transient added from TNS: "{transient_name}"...') - # Initialize the tasks - uninitialized_transients = Transient.objects.filter( - tasks_initialized__exact="False", - name__exact=transient_name, - ) - for transient in uninitialized_transients: - if transient.name == transient_name: - initialize_all_tasks_status(transient) - transient.tasks_initialized = "True" transient.save() + logger.debug(f'New transient downloaded from TNS: "{transient_name}"') + # Initialize the tasks if necessary + for transient in Transient.objects.filter(tasks_initialized__exact="False", name__exact=transient_name): + logger.debug(f'Initializing all transient tasks for "{transient_name}"') + initialize_all_tasks_status(transient) + transient.tasks_initialized = "True" + transient.save() # Execute the workflow workflow = chain( # workflow_init.si(transient_name),
Name:{{transient.host.name}}
Object ID:{{transient.host.object_id}}
Catalog Name:{{transient.host.catalog_name}}
Catalog Release:{{transient.host.catalog_release}}
Right ascension: {{transient.host.ra}}
Declination:{{transient.host.dec}}
Redshift:{% if transient.host.redshift %}{{transient.host.redshift|floatformat:6}}{% else %}None{% endif %}