I've been trying to get `tap-slack` and `target-sn...
# plugins-general
m
I've been trying to get
tap-slack
and
target-snowflake
to work, but I keep getting this error:
target-snowflake | INFO channels: buffer has expired, flushing.
The tap will eventually finish syncing, but then it just stalls.
d
buffer has expired, flushing
isn't an error, it's actually the target working as designed, saving up records to insert in a buffer, that is flushed when the buffer hits a certain size (batch size), or when the buffer hasn't been flushed in a while (when it expires)
The target stalling after it finishes syncing definitely doesn't sound right though! Can you provide some more detail on what you're seeing, with log messages etc?
Are you running in debug mode or not?
Are you sure the target is stalling by itself, or is it waiting for the tap to finish and close its output pipe?
m
The tap finishes both when doing
elt
and
invoke
.
d
@michael_cooper But the target still appears to be stalling? Can you please share the relevant
meltano --log-level=debug elt
output?
m
Copy code
tap-slack              | INFO Finished Sync..
tap-slack (out)        | {"type": "STATE", "value": {"bookmarks": {"users": {"updated": "2020-08-26T18:01:37.000000Z"}, "messages": {"C0GMF42VB": "2020-08-01T00:00:00", "C0GMKAS5U": "2020-08-01T00:00:00", "C11UGLU0J": "2020-08-01T00:00:00", "C247XTP42": "2020-08-01T00:00:00", "C6Q8YA1K8": "2020-08-01T00:00:00", "C6TUXLR8S": "2020-08-01T00:00:00", "CAS4RGG65": "2020-08-01T00:00:00", "CBRGR0067": "2020-08-01T00:00:00", "CCMTMS5A9": "2020-08-01T00:00:00", "CD59FU3QU": "2020-08-01T00:00:00", "CDBNTD2EL": "2020-08-01T00:00:00", "CJ58P5WHZ": "2020-08-01T00:00:00", "CKBP65GAZ": "2020-08-01T00:00:00", "CKDN39H62": "2020-08-01T00:00:00", "CLN8CB132": "2020-08-01T00:00:00", "CLT52B5DY": "2020-08-01T00:00:00", "CM0TEAAM6": "2020-08-01T00:00:00", "CP83KKPF1": "2020-08-01T00:00:00", "CRC8PNVLG": "2020-08-01T00:00:00", "CSY0793RT": "2020-08-01T00:00:00", "C010RE8P77Y": "2020-08-01T00:00:00", "C0140JDFC56": "2020-08-01T00:00:00", "C0147J5NG95": "2020-08-01T00:00:00", "C014CS4EL74": "2020-08-01T00:00:00", "C015SS4DBV2": "2020-08-01T00:00:00", "C0162DW9399": "2020-08-01T00:00:00", "C016B9CGW30": "2020-08-01T00:00:00", "C016GL175K7": "2020-08-01T00:00:00", "C016J1PDJGJ": "2020-08-01T00:00:00", "C016J7QNR8T": "2020-08-01T00:00:00", "C016S52NXDY": "2020-08-01T00:00:00", "C01728YC7JB": "2020-08-01T00:00:00", "C017Z1XQS76": "2020-08-01T00:00:00", "CQ19GT0RY": "2020-08-01T00:00:00", "CQBP6QYBZ": "2020-08-01T00:00:00", "C0136S1HRQU": "2020-08-01T00:00:00", "C0180Q695A7": "2020-08-01T00:00:00"}, "files": "2020-08-01T00:00:00", "remote_files": "2020-08-01T00:00:00"}}}
That's the final one before it stalls.
d
@michael_cooper Very odd 😕 Have you tried running this tap with a different target (e.g.
target-jsonl
: https://meltano.com/plugins/loaders/jsonl.html) so that we can figure out if this an issue in target-snowflake or Meltano itself?
m
Doesn't work either:
Copy code
meltano | Loading failed (1): FileNotFoundError: [Errno 2] No such file or directory: 'output/channels.jsonl'
d
Ah yeah, it expects the
output
directory to already exist, can you briefly create one in your project root?
m
Sounds like a new feature to add the directory if it doesn't exist!
Anyway, it completes with JSONL.
d
Sounds like a new feature to add the directory if it doesn't exist!
I know! I created https://gitlab.com/meltano/meltano/-/issues/2185 a month ago, but I'm not sure if this should be fixed in Meltano or in
target-jsonl
itself
Anyway, it completes with JSONL.
All right, that's interesting. Then I'm curious what's going on in
target-snowflake
that's preventing it from flushing its records and quitting when the tap quits. Are you comfortable enough with Python to add some print-based debugging statements to the the
process_input
function at https://gitlab.com/meltano/target-snowflake/-/blob/master/target_snowflake/__init__.py#L29, so that we can get a better idea of where it's getting stuck? In your project, that file would live at
.meltano/loaders/target-snowflake/venv/lib/python3.6/site-packages/target_snowflake/__init__.py
You may have more luck with https://github.com/datamill-co/target-snowflake or https://github.com/transferwise/pipelinewise-target-snowflake (which we're considering recommending to new users instead of our own variant: https://gitlab.com/meltano/meltano/-/issues/2134), but I'd obviously still like to find out why it's not working as it should 🙂
m
So I've added a print statement right at the beginning of
process_input
and I wanted to see what the
lines
parameter was. that print statement only ever triggers once thoughout the
tap-slack
sync, and it prints this:
target-snowflake (out) | lines: <_io.TextIOWrapper name='<stdin>' encoding='utf-8'>
d
All right, that makes sense since
for line in lines:
will wait to yield new lines as long as
stdin
as open. I'm curious if we ever get to the statement after
for line in lines
when the tap quits and stdin is closed:
target.flush_all_cached_records()
, and if that call ever completes, or if that's the actual call we're getting stuck on. I suggest putting log statements before and after that call!
I'd be happy to jump on another call to help you get to the bottom of this, by the way 🙂
m
So this is how I modified
process_inputs
Copy code
def process_input(config, lines):
    print("Starting process_input")
    """
    The core processing loop for any Target

    Loop through the lines sent in sys.stdin, process each one and run DDL and
    batch DML operations.
    """
    target = TargetSnowflake(config)

    # Loop over lines from stdin
    for line in lines:
        print("Processing Line: ", line)
        target.process_line(line)
        print("Finished Line: ", line)

    # If the tap finished its execution, flush the records for any remaining
    #  streams that still have records cached (i.e. row_count < batch_size)
    print("Flushing all cached records")
    target.flush_all_cached_records()
    print("Flushed all cached records")
And these are the logs right before it fails: ```meltano | WARNING Received state is invalid, incremental state has not been updated target-snowflake (out) | Processing Line: {"type": "STATE", "value": {"bookmarks": {"users": {"updated": "2020-08-27T152907.000000Z"}, "messages": {"C0GMF42VB": "2020-08-01T000000", "C0GMKAS5U": "2020-08-01T000000", "C11UGLU0J": "2020-08-01T000000", "C247XTP42": "2020-08-01T000000", "C6Q8YA1K8": "2020-08-01T000000", "C6TUXLR8S": "2020-08-01T000000", "CAS4RGG65": "2020-08-01T000000", "CBRGR0067": "2020-08-01T000000", "CCMTMS5A9": "2020-08-01T000000", "CD59FU3QU": "2020-08-01T000000", "CDBNTD2EL": "2020-08-01T000000", "CJ58P5WHZ": "2020-08-01T000000", "CKBP65GAZ": "2020-08-01T000000", "CKDN39H62": "2020-08-01T000000", "CLN8CB132": "2020-08-01T000000", "CLT52B5DY": "2020-08-01T000000", "CM0TEAAM6": "2020-08-01T000000", "CP83KKPF1": "2020-08-01T000000", "CRC8PNVLG": "2020-08-01T000000", "CSY0793RT": "2020-08-01T000000", "C010RE8P77Y": "2020-08-01T000000", "C0140JDFC56": "2020-08-01T000000", "C0147J5NG95": "2020-08-01T000000", "C014CS4EL74": "2020-08-01T000000", "C015SS4DBV2": "2020-08-01T000000", "C0162DW9399": "2020-08-01T000000", "C016B9CGW30": "2020-08-01T000000", "C016GL175K7": "2020-08-01T000000", "C016J1PDJGJ": "2020-08-01T000000", "C016J7QNR8T": "2020-08-01T000000", "C016S52NXDY": "2020-08-01T000000", "C01728YC7JB": "2020-08-01T000000", "C017Z1XQS76": "2020-08-01T000000", "CQ19GT0RY": "2020-08-01T000000", "CQBP6QYBZ": "2020-08-01T000000", "C0136S1HRQU": "2020-08-01T000000", "C0180Q695A7": "2020-08-01T000000"}}}} meltano | WARNING Received state is invalid, incremental state has not been updated target-snowflake (out) | meltano | WARNING Received state is invalid, incremental state has not been updated target-snowflake (out) | Finished Line: {"type": "STATE", "value": {"bookmarks": {"users": {"updated": "2020-08-27T152907.000000Z"}, "messages": {"C0GMF42VB": "2020-08-01T000000", "C0GMKAS5U": "2020-08-01T000000", "C11UGLU0J": "2020-08-01T000000", "C247XTP42": "2020-08-01T000000", "C6Q8YA1K8": "2020-08-01T000000", "C6TUXLR8S": "2020-08-01T000000", "CAS4RGG65": "2020-08-01T000000", "CBRGR0067": "2020-08-01T000000", "CCMTMS5A9": "2020-08-01T000000", "CD59FU3QU": "2020-08-01T000000", "CDBNTD2EL": "2020-08-01T000000", "CJ58P5WHZ": "2020-08-01T000000", "CKBP65GAZ": "2020-08-01T000000", "CKDN39H62": "2020-08-01T000000", "CLN8CB132": "2020-08-01T000000", "CLT52B5DY": "2020-08-01T000000", "CM0TEAAM6": "2020-08-01T000000", "CP83KKPF1": "2020-08-01T000000", "CRC8PNVLG": "2020-08-01T000000", "CSY0793RT": "2020-08-01T000000", "C010RE8P77Y": "2020-08-01T000000", "C0140JDFC56": "2020-08-01T000000", "C0147J5NG95": "2020-08-01T000000", "C014CS4EL74": "2020-08-01T000000", "C015SS4DBV2": "2020-08-01T000000", "C0162DW9399": "2020-08-01T000000", "C016B9CGW30": "2020-08-01T000000", "C016GL175K7": "2020-08-01T000000", "C016J1PDJGJ": "2020-08-01T000000", "C016J7QNR8T": "2020-08-01T000000", "C016S52NXDY": "2020-08-01T000000", "C01728YC7JB": "2020-08-01T000000", "C017Z1XQS76": "20…
d
But now it appears to be failing, not stalling? Or does it stall after that ERROR is printed?
Ah, I think your print statements are messing something up! Can you make sure you write them to stderr, not stdout? Stdout is reserved for state messages, so if you're printing arbitrary stuff there, that explains "Received state is invalid" 🙂
You can use
<http://LOGGER.info|LOGGER.info>(message)
m
It'll be easier just to give you the log with this. It still fails in the same way.
Line 10625 is where the tap starts syncing the
teams
d
@michael_cooper It looks like the target is simply much slower at loading records than the tap is at extracting them, so what you may have experienced as a stall, could just have been the target dutifully working through its queue. The failure at the end (
CRITICAL Not allowed type update for teams.id: ('FLOAT', 'VARCHAR(16777216)')
) indicates that the
teams.id
column already exists with type
FLOAT
, but that the latest
SCHEMA
message defines the
id
column as a
VARCHAR
instead, which is a change that's not supported:
"id": { "type": ["null", "string"] }
Do you know where the float
teams.id
column may be coming from? I expect that if you simply drop the table and try again, the target will be able to create the table correctly this time.
m
Wow, it's another instance of user error. I had the snowflake schema defined for the Github tap, which also has a
teams
table. When I ran the Slack tap, the two tables for slack and Github then conflicted.
Which gives me another question. How do I deal with taps that use the same Snowflake target but keep them on different schemas within the same database?
d
The default value for target-snowflake's
schema
setting is
$MELTANO_EXTRACTOR_NAMESPACE
, which automatically expands to the ELT pipeline's extractor's namespace (
tap_github
,
tap_slack
, etc) as documented under https://meltano.com/docs/integration.html#pipeline-environment-variables. Would that suffice? If you'd like to have more control over the schema, you can add new
preferred_schema: MY_SCHEMA
properties to your extractor definitions in
meltano.yml
, which you can then reference from target-snowflake's
schema
value as
$MELTANO_EXTRACT__PREFERRED_SCHEMA
. We're going to make that the default behavior in https://gitlab.com/meltano/meltano/-/issues/2282, because it makes it easier to override the schema and more clear what's going on.
That
preferred_schema
property would be an example of a custom (https://meltano.com/docs/configuration.html#custom-settings) plugin extra (https://meltano.com/docs/configuration.html#plugin-extras)
m
Does that mean you don't have to define the
SF_SCHEMA
within a
.env
file? It thus it defaults to produce the schema as
tap_<TAP_NAME>
within your target? And with the new setting, does that mean all you have to do is add
preferred_schema: CUSTOM_SCHEMA_NAME
to the extractor definition, and then it will automatically use the preferred schema without having to add any other settings elsewhere?
d
Does that mean you don't have to define the 
SF_SCHEMA
 within a 
.env
 file?
Correct!
It thus it defaults to produce the schema as 
tap_<TAP_NAME>
 within your target?
By default, it'll use the extractor's
namespace
, as defined in
meltano.yml
for your custom plugins. If you've followed the recommendation, it'll look like
tap_github
for
tap-github
.
And with the new setting, does that mean all you have to do is add 
preferred_schema: CUSTOM_SCHEMA_NAME
 to the extractor definition, and then it will automatically use the preferred schema without having to add any other settings elsewhere?
Until we implement https://gitlab.com/meltano/meltano/-/issues/2282 which would make this the default, you'd also have to explicitly add
schema: $MELTANO_EXTRACT__PREFERRED_SCHEMA
to
target-snowflake
's
config
object in
meltano.yml
.