Description
Hi
Potentially this is a minor problem but as I stumbled across it I figured that I might as well report it here.
Running the seedlink_plotter for many streams I noted that the CPU usage quickly increased to 100% on my machine. There may of course many reasons for this but focusing on the data retrieval part I note that merging newly retrieved traces to the Stream object seems to account for most of the load. Looking deeper into the merge method of the stream object I suspect that this is due to the fact that trace list is regenerated every time the the merge method is invoked. If the stream object contains only few traces and this is not done very often this is not a problem, however for an application as the seedlink_plotter were the method may be called several times per second, adding one trace at the time, and the Stream object may contain many traces this adds an unnecessary overhead.
In the long term run perhaps adding a method to the Stream object that merges (or adds) a single trace to the list of traces is the desired solution and perhaps this should have been brought up as an issue on obspy instead (I report here instead as for now the issue seems more application specific and knowing that you are involved in the obspy development perhaps the suggestion if found useful eventually make its way in there).
My current (simplistic) workaround to the issue is to rip the core of the _cleanup method (used by merge(-1)) from the Stream object and make sure that only the traces with the same trace.id as the trace to merge in (if any) are touched. This reduces the CPU load on my machine from 100% to 10-20%
More exactly I came up with the following method that I added to the SeedlinkUpdater object
def addTrace(self,trace,misalignment_threshold=1e-2):
"""
function add_trace() adds an obspy Trace object to an obspy Stream object
normally this would be handled by the merge method of the Stream object,
however this iterates over all traces in the stream object which yields an
unneccesary large overhead in case of addition of a single Trace object to
a Stream object that already contains a large number of Trace objects
Core of this function is directly copied from the _cleanup method of the
Stream object and stripped from large parts of the documentation
======= parameters =======
:type trace: `obspy.Trace`
:param trace: Trace object to add
---
:type misalignment_threshold: float
:param misalignment_threshold: Threshold value for sub-sample
misalignments of sampling points of two traces that should be
merged together (fraction of sampling interval, from 0 to 0.5).
"""
ind = [i for i in range(len(self.stream.traces)) if self.stream.traces[i].id == trace.id]
if not ind:
# trace is not in stream, simply append it
self.stream.traces.append(trace)
return
# extract the matching traces
ind.sort(reverse=True) # make sure items are poped from back first to not mess up index to items to pop
trace_list = [self.stream.traces.pop(i) for i in ind]
trace_list.append(trace)
trace_list.sort(key=lambda i:i.stats.starttime)
# merge the traces
cur_trace = trace_list.pop(0)
delta = cur_trace.stats.delta
allowed_micro_shift = misalignment_threshold * delta
# work through all traces of same id
while trace_list:
trace = trace_list.pop(0)
gap = trace.stats.starttime - (cur_trace.stats.endtime + delta)
if misalignment_threshold > 0 and gap <= allowed_micro_shift:
# `gap` is smaller than allowed shift (or equal)
misalignment = gap % delta
if misalignment != 0:
misalign_percentage = misalignment / delta
if (misalign_percentage <= misalignment_threshold or misalign_percentage >= 1 - misalignment_threshold):
# now we align the sampling points of both traces
trace.stats.starttime = (cur_trace.stats.starttime+round((trace.stats.starttime - cur_trace.stats.starttime) / delta) * delta)
# we have some common parts: check if consistent
subsample_shift_percentage = (trace.stats.starttime.timestamp-cur_trace.stats.starttime.timestamp) % delta / delta
subsample_shift_percentage = min(subsample_shift_percentage, 1 - subsample_shift_percentage)
if (trace.stats.starttime <= cur_trace.stats.endtime and subsample_shift_percentage < misalignment_threshold):
# check if common time slice [t1 --> t2] is equal:
t1 = trace.stats.starttime
t2 = min(cur_trace.stats.endtime, trace.stats.endtime)
if np.array_equal(cur_trace.slice(t1, t2).data,trace.slice(t1, t2).data):
# if consistent: add them together
cur_trace += trace
else:
# if not consistent: leave them alone
self.stream.traces.append(cur_trace)
cur_trace = trace
elif trace.stats.starttime == cur_trace.stats.endtime+cur_trace.stats.delta:
# traces are perfectly adjacent: add them together
cur_trace += trace
else:
# no common parts (gap) leave traces alone and add current to list
self.stream.traces.append(cur_trace)
cur_trace = trace
self.stream.traces.append(cur_trace)
Activity