# singer-tap-development


09/10/2022, 11:47 AM
Hi everyone, First of all, thank you all for your help. Currently, I'm trying to develop a custom singer tap with aircall api. I'm using first endpoint
to fetch all teams then I call
endpoint to retrieve details for each Team. To do that I'm using parent-child streams like below where aircallStream is RESTStream object.
Copy code
class TeamsStream(aircallStream):
    name = "teams"
    path = "v1/teams"
    primary_keys = ["id"]
    replication_key = "created_at"
    schema = teams_properties.to_dict()
    records_jsonpath = "$.teams[*]"

    def get_child_context(self, record: dict, context: Optional[dict]) -> dict:
        return {"team_id": record["id"]}

class TeamStream(aircallStream):
    name = "team"
    parent_stream_type = TeamsStream
    path = "v1/teams/{team_id}"
    primary_keys = ["id"]
    schema = teams_properties.to_dict()
    records_jsonpath = "$.team[*]"
    state_partitioning_keys = []
At first run, the tap retrieve all data for each team and create state json file. Then, when I used the state file the tap seems not to use the bookmark. I retrieve all data like first run. The
endpoint not take date params, so we can't use
. So, I guess at each time
send all teams to
but in RESTStream are there a method which filter child according to bookmark replication_key ?
More clearly, to use replication_key and replication method we must have a stream where the endpoint take replication_key as params ? Last question about parent-child streams, with class above, Can you confirm me that the parent stream calls its endpoint once and not as many times as there are child streams? Thanks.