Bases: KPFFunction
Get all the ObservingBlocks from all of the programs using the KPF-CC
instrument on the current semester schedule.
Functions Called:
kpf.observatoryAPIs.GetScheduledPrograms
kpf.observatoryAPIs.GetObservingBlocksByProgram
Source code in kpf/observatoryAPIs/GetKPFCCObservingBlocks.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 | class GetKPFCCObservingBlocks(KPFFunction):
'''Get all the ObservingBlocks from all of the programs using the KPF-CC
instrument on the current semester schedule.
Functions Called:
- `kpf.observatoryAPIs.GetScheduledPrograms`
- `kpf.observatoryAPIs.GetObservingBlocksByProgram`
'''
@classmethod
def pre_condition(cls, args):
pass
@classmethod
def perform(cls, args):
log.info('Getting OBs for all KPF-CC programs this semester')
OBs = []
classical, cadence = GetScheduledPrograms.execute({'semester': 'current'})
progIDs = set([p['ProjCode'] for p in cadence])
# Iterate of KPF-CC programIDs and retrieve their OBs from DB
for i,progID in enumerate(progIDs):
log.debug(f'Retrieving OBs for {progID}')
programOBs = GetObservingBlocksByProgram.execute({'program': progID})
OBs.extend(programOBs)
log.info(f'Retrieved {len(OBs)} OBs in KPF-CC programs')
# Save to files if requested
if args.get('save', False) == True:
semester, start, end = get_semester_dates(datetime.datetime.now())
OBpath = Path('/s/sdata1701/OBs/KPFCC') / semester
if OBpath.exists() is False:
OBpath.mkdir(mode=0o777)
log.info(f'Writing {len(OBs)} OBs to disk at: {OBpath}')
for OB in OBs:
OBfile = OBpath / f"{str(OB.Target.TargetName).replace(' ', '_')}.yaml"
log.debug(f' Writing: {OBfile.name}')
OB.write_to(OBfile, overwrite=True)
else:
return OBs
@classmethod
def post_condition(cls, args):
pass
@classmethod
def add_cmdline_args(cls, parser):
parser.add_argument('--save', dest="save",
default=False, action="store_true",
help='Save OBs to disk?')
return super().add_cmdline_args(parser)
|