Differences between LinearWorkflow and linearly_connect_tasks function

I am repurposing the following code from the field-delineation GitHub:

def get_tiffs_to_eopatches_workflow(config: TiffsToEopatchConfig, delete_tiffs: bool = False) -> EOWorkflow:
    """ Set up workflow to ingest tiff files into EOPatches """

    # Set up credentials in sh config
    sh_config = set_sh_config(config)

    import_bands = [(ImportFromTiffTask((FeatureType.DATA, band),
                                    folder=f's3://{config.bucket_name}/{config.tiffs_folder}',
                                    config=sh_config), f'Import band {band}')
                    for band in config.band_names]
    import_clp = (ImportFromTiffTask((FeatureType.DATA, config.clp_name),
                                 folder=f's3://{config.bucket_name}/{config.tiffs_folder}',
                                 config=sh_config), f'Import {config.clp_name}')

    import_mask = (ImportFromTiffTask((FeatureType.MASK, config.mask_name),
                                  folder=f's3://{config.bucket_name}/{config.tiffs_folder}',
                                  config=sh_config), f'Import {config.mask_name}')

    rearrange_bands = (RearrangeBands(), 'Swap time and band axis')
    add_timestamps = (AddTimestampsUpdateTime(f's3://{config.bucket_name}/{config.tiffs_folder}'), 'Load timestamps')

    merge_bands = (MergeFeatureTask(
        input_features={FeatureType.DATA: config.band_names},
        output_feature=(FeatureType.DATA, config.data_name)), 'Merge band features')

    remove_bands = (RemoveFeatureTask(features={FeatureType.DATA: config.band_names}), 'Remove bands')

    rename_mask = (RenameFeatureTask((FeatureType.MASK, config.mask_name, config.is_data_mask)), 'Rename is data mask')

    calculate_clm = (CloudMasking(), 'Get CLM mask from CLP')

    save_task = (SaveTask(path=f's3://{config.bucket_name}/{config.eopatches_folder}', config=sh_config,
                          overwrite_permission=OverwritePermission.OVERWRITE_FEATURES),  'Save EOPatch')

    filenames = [f'{band}.tif' for band in config.band_names] + \
                [f'{config.mask_name}.tif', f'{config.clp_name}.tif', 'userdata.json']
    delete_files = (DeleteFiles(path=config.tiffs_folder, filenames=filenames), 'Delete batch files')

    workflow = [*import_bands,
                import_clp,
                import_mask,
                rearrange_bands,
                add_timestamps,
                merge_bands,
                remove_bands,
                rename_mask,
                calculate_clm,
                save_task]

    if delete_tiffs:
        workflow.append(delete_files)

    return linearly_connect_tasks(*workflow) #LinearWorkflow(*workflow)

Would linearly_connect_tasks function the same way as the old LinearWorkflow function? I can’t find any old documentation regarding LinearWorkflow

Hi @pj.ganotisi

you are correct, linearly_connect_tasks is a replacement for LinearWorkflow, however it is not a complete drop-in replacement.

LinearWorkflow was removed in the release of eo-learn 1.0 where we also exposed the EONode class, which is used to construct execution graphs, from which EOWorkflows are built. If you’re interested you can look those up in the documentation, but to avoid overexplaining, if you previously had:

workflow = LinearWorkflow(*list_of_tasks)
execution_args = {list_of_tasks[2]: {"my_param": 2}}

you should now use

eonodes = linearly_connect_tasks(*list_of_tasks)
workflow = EOWorkflow(eonodes)
# execution arguments are now provided for nodes, not tasks
execution_args = {eonodes[2]: {"my_param": 2}} 

Wow, thank you so much, I actually put in the same exact code as a place holder after reading through a bunch of the documentation. Thanks for clearing things up and confirming.

Just to clarify with the execution args, I also repurposed the old field-delineation exec_args function as the following:

def get_exec_args(workflow: EOWorkflow, eopatch_list: List[str]) -> List[dict]:
    """ Utility function to get execution arguments """
    exec_args = []
    nodes = workflow.get_nodes()
    # tasks = workflow.get_tasks()

    for name in eopatch_list:
        single_exec_dict = {}

        for node in nodes: # task_name, task in tasks.items():
            task_name = node.name
            task = node.task

            if isinstance(task, ImportFromTiffTask):
                tiff_name = task_name.split()[-1]
                path = f'{name}/{tiff_name}.tif'
                single_exec_dict[task] = dict(filename=path)

            if isinstance(task, SaveTask):
                single_exec_dict[task] = dict(eopatch_folder=name)

            if isinstance(task, (AddTimestampsUpdateTime, DeleteFiles)):
                single_exec_dict[task] = dict(tile_name=name)

        print(single_exec_dict)
        exec_args.append(single_exec_dict)

    return exec_args

The one in the github used to have workflow.get_tasks(), however, that function obviously does not exist anymore. I tried to recreate it from my understanding, but the EOExecutor no longer uses EOTasks for the execution arguments. Due to these changes, would I be using single_exec_dic[node] instead of single_exec_dic[task]?

That is correct, yes

I am having an issue with the conversion from tiffs to eopatches. As of right now, this is all the changes I made to these specific functions:

class CloudMasking(EOTask):
    """ Compute cloud mask from SH cloud probability CLP data map """
    def __init__(self, clp_feature: Tuple = (FeatureType.DATA, 'CLP'), clm_feature: Tuple = (FeatureType.MASK, 'CLM'),
                 average_over: int = 24, max_clp: float = 255.):
        """
        :param clp_feature: Feature type and name of input CLP mask
        :param clm_feature: Feature type and name of output CLM mask
        :param average_over: Parameter used ot smooth the CLP data map
        :param max_clp: Maximum value of CLP map used for normalization
        """
        # _parse_features replaced with `get feature parser`
        # self.clm_feature = next(self._parse_features(iter(clm_feature))())
        self.clm_feature = self.parse_feature(clm_feature)
        # self.clp_feature = next(self._parse_features(iter(clp_feature))())
        self.clp_feature = self.parse_feature(clp_feature)
        self.s2_cd = S2PixelCloudDetector(average_over=average_over)
        self.max_clp = max_clp

    def execute(self, eopatch: EOPatch) -> EOPatch:
        """ Compute and add CLM from CLP """
        clc = self.s2_cd.get_mask_from_prob(eopatch[self.clp_feature].squeeze() / self.max_clp)
        eopatch[self.clm_feature] = clc[..., np.newaxis]
        return eopatch


def get_tiffs_to_eopatches_workflow(config: TiffsToEopatchConfig, delete_tiffs: bool = False) -> EOWorkflow:
    """ Set up workflow to ingest tiff files into EOPatches """

    # Set up credentials in sh config
    sh_config = set_sh_config(config)

    import_bands = [(ImportFromTiffTask((FeatureType.DATA, band),
                                    folder=f's3://{config.bucket_name}/{config.tiffs_folder}',
                                    config=sh_config), f'Import band {band}')
                    for band in config.band_names]
    import_clp = (ImportFromTiffTask((FeatureType.DATA, config.clp_name),
                                 folder=f's3://{config.bucket_name}/{config.tiffs_folder}',
                                 config=sh_config), f'Import {config.clp_name}')

    import_mask = (ImportFromTiffTask((FeatureType.MASK, config.mask_name),
                                  folder=f's3://{config.bucket_name}/{config.tiffs_folder}',
                                  config=sh_config), f'Import {config.mask_name}')

    rearrange_bands = (RearrangeBands(), 'Swap time and band axis')
    add_timestamps = (AddTimestampsUpdateTime(f's3://{config.bucket_name}/{config.tiffs_folder}'), 'Load timestamps')

    merge_bands = (MergeFeatureTask(
        input_features={FeatureType.DATA: config.band_names},
        output_feature=(FeatureType.DATA, config.data_name)), 'Merge band features')

    remove_bands = (RemoveFeatureTask(features={FeatureType.DATA: config.band_names}), 'Remove bands')

    rename_mask = (RenameFeatureTask((FeatureType.MASK, config.mask_name, config.is_data_mask)), 'Rename is data mask')

    calculate_clm = (CloudMasking(), 'Get CLM mask from CLP')

    save_task = (SaveTask(path=f's3://{config.bucket_name}/{config.eopatches_folder}', config=sh_config,
                          overwrite_permission=OverwritePermission.OVERWRITE_FEATURES),  'Save EOPatch')

    filenames = [f'{band}.tif' for band in config.band_names] + \
                [f'{config.mask_name}.tif', f'{config.clp_name}.tif', 'userdata.json']
    delete_files = (DeleteFiles(path=config.tiffs_folder, filenames=filenames), 'Delete batch files')

    workflow = [*import_bands,
                import_clp,
                import_mask,
                rearrange_bands,
                add_timestamps,
                merge_bands,
                remove_bands,
                rename_mask,
                calculate_clm,
                save_task]

    if delete_tiffs:
        workflow.append(delete_files)
    # Old version created an EOWorkflow LinearWorkflow with the paremeters
    # Newer version 
    parameters = linearly_connect_tasks(*workflow)
    return EOWorkflow(parameters) #LinearWorkflow(*workflow)

def get_exec_args(workflow: EOWorkflow, eopatch_list: List[str]) -> List[dict]:
    """ Utility function to get execution arguments """
    exec_args = []
    nodes = workflow.get_nodes()
    # tasks = workflow.get_tasks()

    for name in eopatch_list:
        single_exec_dict = {}

        for node in nodes: # task_name, task in tasks.items():
            task_name = node.name
            task = node.task

            if isinstance(task, ImportFromTiffTask):
                tiff_name = task_name.split()[-1]
                path = f'{name}/{tiff_name}.tif'
                single_exec_dict[node] = dict(filename=path)

            if isinstance(task, SaveTask):
                single_exec_dict[node] = dict(eopatch_folder=name)

            if isinstance(task, (AddTimestampsUpdateTime, DeleteFiles)):
                single_exec_dict[node] = dict(tile_name=name)

        exec_args.append(single_exec_dict)

    return exec_args

My eo-patch conversion is also giving me the following:

I don’t think I made any mistakes with the repurposed code, but I am not getting anything inside the eo-patch folder.
These are my current configs:

What could be the possible issue here? I personally do not know. The conversion is going really fast even though there are .TIFF files within my S3 bucket as well.

It seems as if the LinearWorkflow substitute isn’t working as expected since there is no saved EOPatch although save_task is configured as SaveTask with a specific bucket name and eopatches folder.

There is a bunch of other tasks that might be important to add such as listed in the following link:
https://eo-learn.readthedocs.io/en/latest/eotasks.html

I am not entirely sure what the problem is, but that could be a start.

You should first investigate precisely what was going on in the workflow. I suggest you turn on logging first (like it is here) and then check the logs for each execution. In there you can see what was in the EOPatch when it was forwarded to the SaveTask

Almost every single one of my log errors are the following:


I am somehow getting an issue for my ImportFromTiffTask execution. Something related to a HeadObject request operation. Not sure as to why this is happening. The code provided to me is purely from the provided github. Any specific thoughts?

I also have the default AWS bucket settings listed here:

It is successful in doing the batch processing operation, but the eo-patches isn’t working.

you can try creating an S3 filesystem object (our helper function) which is what the task does behind the scenes. It could be that you need to set a profile or region. Check if you can see the tiffs by using this filesystem object (for instance by calling the listdir method).

I suspect that you should run into similar problems. You can also pass the filesystem object to the task (once you figure out what was the issue), and then use relative paths.

Hi @pj.ganotisi

Let’s try to go down the rabbit hole. I did just this recently for our internal purposes as well. It’s possible that there are issues due to other things as well, not because of your changes.

First, can you please provide the version of rasterio and eo-learn you are using in your python environment?

Secondly, since your EOPatches are empty, and also due to the logs you posted, it seems the issue is in the first task of your version of the get_tiffs_to_eopatches_workflow, which is the ImportFromTiffTask. To that end, I suggest first trying to manually import one of the tiffs from the bucket:

debug_task = ImportFromTiffTask((FeatureType.DATA, "<band_name>"), folder="<path_to_tiff_on_s3>", config=sh_config)

# execute task
debug_task.execute()

then you just need to make sure that the task in the workflow is executed in the same way and with the same arguments

Let’s see if you get a valid output here.

Good luck!

Thank you for the swift reply,

rasterio version: 1.3.8
eo-learn version: 1.4.2
The reason why I have eo-learn as 1.4.2 is I am repurposing the code to work for that specific version.
If I do choose to install 0.10.2 instead, it gives me the no sentinelhub.os_utils error.

When attaching this debug_task to the workflow, do I simply add it to the following array for parameters to the inearly_connect_tasks function?

workflow = [*import_bands,
                import_clp,
                import_mask,
                rearrange_bands,
                add_timestamps,
                merge_bands,
                remove_bands,
                rename_mask,
                calculate_clm,
                save_task]

And just to be sure, I am currently debugging this on it’s own separate entity correct? I won’t be using an EOExecutor?

Similar to the code within tiffs_to_eopatches.py?

LOGGER.info(f'Read grid definition file {config["grid_filename"]}')
    grid_definition = gpd.read_file(config['grid_filename'])

    workflow = get_tiffs_to_eopatches_workflow(tiffs_to_eops_config, delete_tiffs=False)

    eopatch_list = grid_definition.name.values
    
    exec_args = get_exec_args(workflow, eopatch_list)

    executor = EOExecutor(workflow, exec_args, save_logs=True, logs_folder='.')

I ran the following code:

debug_task = ImportFromTiffTask((FeatureType.DATA, "B02"), folder=f's3://{config.bucket_name}/{config.tiffs_folder}', config=sh_config)

    debug_task.execute()

I used B02 as the band being imported.

I am still getting the same error code similar to the first task from the get_tiffs_to_eopatches workflow.

Hi @pj.ganotisi

The reason why I have eo-learn as 1.4.2 is I am repurposing the code to work for that specific version.
If I do choose to install 0.10.2 instead, it gives me the no sentinelhub.os_utils error.

Yes, the difference in versions here is too large. I think that should be fine and eo-learn is not the issue here.

rasterio version: 1.3.8
eo-learn version: 1.4.2

Recently, there have been a lot of issues related to AWS S3 with rasterio==1.3.8, and the error you get might be connected to that. Please try downgrading with pip install "rasterio<1.3.8" and check again if you have the same issue.

I am currently losing hope.
Here’s the current rasterio version I downloaded:
image


Same Error code for the debug_task execution.


This is also what my tiff folder looks like, not sure if that has anything to do with it.

The tiff folder looks fine. Are you able to download the tiff locally and use the task by importing the local tiff?

config=sh_config

are there AWS credentials in your sh_config? Otherwise you could also provide the filesystem object to the task, to more explicitly provide the access to S3:

from eolearn.core.utils.fs import load_s3_filesystem

fs = load_s3_filesystem('s3://{config.bucket_name}/{config.tiffs_folder}', sh_config)  # sh_config should have AWS credentials

debug_task = ImportFromTiffTask((FeatureType.DATA, "B02"), folder='/', filesystem=fs)
debug_task.execute()

Additionally, any chance you could give me read access to your bucket folder, so I can try it myself? The account ID you can give access to is 949250452282