Runtime error while executing EO Workflow

i have created a EO workflow. I am encountering an error when i try to execute it. I am providing the source code and screenshot of the error for better understanding.

class rvi_layer(EOTask):
    def __init__(self, output_feature):
        self.output_feature = output_feature

    def execute(self, eopatch):
        bands = eopatch[FeatureType.DATA]["BANDS"]
        band_8 = bands[..., 6]
        band_4 = bands[..., 2]
        rvi = band_8 / band_4
        eopatch[self.output_feature] = rvi[..., np.newaxis]
        return eopatch

rvi = rvi_layer(
    (FeatureType.DATA, "RVI")
)

EOPATCH_FOLDER = '/Users/sid/Documents/SUGARCANE_CLASSIFICATION/DISTRICTS/BAGHPAT/EOPATCH'
save = SaveTask(EOPATCH_FOLDER, overwrite_permission=OverwritePermission.OVERWRITE_PATCH)

workflow_nodes = linearly_connect_tasks(
    rvi,  save
)
workflow = EOWorkflow(workflow_nodes)
workflow.dependency_graph()


%%time

# Time interval for the SH request
time_interval = ["2021-10-15", "2021-10-17"]

# Define additional parameters of the workflow
input_node = workflow_nodes[0]
save_node = workflow_nodes[-1]
execution_args = []
for idx, bbox in enumerate(bbox_list):
    execution_args.append(
        {
            input_node: {"bbox": bbox, "time_interval": time_interval},
            save_node: {"eopatch_folder": f"EOPatch_{idx}"},
        }
    )

# Execute the workflow
executor = EOExecutor(workflow, execution_args, save_logs=True)
executor.run(workers=8)

executor.make_report()


I am getting the follwoing error : 
Process SpawnProcess-3:
Process SpawnProcess-4:
Process SpawnProcess-2:
Process SpawnProcess-5:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
AttributeError: Can't get attribute 'rvi_layer' on <module '__main__' (built-in)>
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/concurrent/futures/process.py", line 240, in _process_worker
    call_item = call_queue.get(block=True)
AttributeError: Can't get attribute 'rvi_layer' on <module '__main__' (built-in)>
  File "/Users/sid/anaconda3/envs/crop_classification/lib/python3.10/multiprocessing/queues.py", line 122, in get
    return _ForkingPickler.loads(res)
AttributeError: Can't get attribute 'rvi_layer' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'rvi_layer' on <module '__main__' (built-in)>

Can you help in determining where i am going wrong.

Thank you,
Sidharrth

Hi @sidharrth25 ,

It seems that you’re missing the EOPatches which contain band data for RVI calculation. In the example, the first task in the workflow, add_data, is defined and executed at the beginning of the workflow to collect Sentinel-2 L1C data.

You need to either use SentinelHubInputTask or SentinelHubEvalscriptTask to download required data as EOPatches or build EOPatches from local data yourself, then use them as the input of your rvi_layer task.

Hi @chung.horng,
I just executed the entire workflow again but with the number of workers as 1 (executor.run(workers=1)) and it worked. The data got downloaded.

Why is it that it worked for 1 worker but is not working for workers > 1??

Hi @sidharrth25 ,

From the code you provides, there’s something missing at the beginning, i.e., I don’t see any input data for RVI calculation. It’s difficult for me to pin down the issue without knowing the full workflow.

Hi @chung.horng,
Here is the input data being used for RVI calculations.

band_names = [“B02”, “B03”, “B04”, “B05”, “B06”, “B07”, “B08”, “B8A”, “B11”, “B12”]
add_data = SentinelHubInputTask(
bands_feature = (FeatureType.DATA, “BANDS”),
bands = band_names,
resolution = 10,
maxcc = 0.05,
time_difference = datetime.timedelta(minutes = 120),
data_collection = DataCollection.SENTINEL2_L2A,
additional_data = [(FeatureType.MASK, “dataMask”, “IS_DATA”), (FeatureType.MASK, “CLM”), (FeatureType.DATA, “CLP”)],
max_threads = 5,
)

Did you run your script as a jupyter notebook or a python file?

I ran the script as a jupyter notebook.

It is a known issue that sometimes multiprocessing can’t be run in a Jupyter notebook. I’d suggest running your script as python files if multiprocessing is a must.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.