#!/usr/bin/perl

use strict;
use warnings;
use Data::Dumper qw(Dumper);
use Fcntl qw(:flock SEEK_END);
use Getopt::Long qw(GetOptionsFromArray);
use File::Copy qw(move);
use File::Path qw(make_path);
use JSON;
use IO::File;
use String::ShellQuote 'shell_quote';

my $PROGNAME = "pve-zsync";
my $CONFIG_PATH = "/var/lib/${PROGNAME}";
my $STATE = "${CONFIG_PATH}/sync_state";
my $CRONJOBS = "/etc/cron.d/$PROGNAME";
my $PATH = "/usr/sbin";
my $PVE_DIR = "/etc/pve/local";
my $QEMU_CONF = "${PVE_DIR}/qemu-server";
my $LXC_CONF = "${PVE_DIR}/lxc";
my $LOCKFILE = "$CONFIG_PATH/${PROGNAME}.lock";
my $PROG_PATH = "$PATH/${PROGNAME}";
my $INTERVAL = 15;
my $DEBUG = 0;

my $IPV4OCTET = "(?:25[0-5]|(?:[1-9]|1[0-9]|2[0-4])?[0-9])";
my $IPV4RE = "(?:(?:$IPV4OCTET\\.){3}$IPV4OCTET)";
my $IPV6H16 = "(?:[0-9a-fA-F]{1,4})";
my $IPV6LS32 = "(?:(?:$IPV4RE|$IPV6H16:$IPV6H16))";

my $IPV6RE = "(?:" .
    "(?:(?:" .                             "(?:$IPV6H16:){6})$IPV6LS32)|" .
    "(?:(?:" .                           "::(?:$IPV6H16:){5})$IPV6LS32)|" .
    "(?:(?:(?:" .              "$IPV6H16)?::(?:$IPV6H16:){4})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,1}$IPV6H16)?::(?:$IPV6H16:){3})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,2}$IPV6H16)?::(?:$IPV6H16:){2})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,3}$IPV6H16)?::(?:$IPV6H16:){1})$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,4}$IPV6H16)?::" .           ")$IPV6LS32)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,5}$IPV6H16)?::" .            ")$IPV6H16)|" .
    "(?:(?:(?:(?:$IPV6H16:){0,6}$IPV6H16)?::" .                    ")))";

my $HOSTv4RE0 = "(?:[\\w\\.\\-_]+|$IPV4RE)";       # hostname or ipv4 address
my $HOSTv4RE1 = "(?:$HOSTv4RE0|\\[$HOSTv4RE0\\])"; # these may be in brackets, too
my $HOSTRE = "(?:$HOSTv4RE1|\\[$IPV6RE\\])";       # ipv6 must always be in brackets
# targets are either a VMID, or a 'host:zpool/path' with 'host:' being optional
my $TARGETRE = qr!^(?:($HOSTRE):)?(\d+|(?:[\w\-_]+)(/.+)?)$!;

check_bin ('cstream');
check_bin ('zfs');
check_bin ('ssh');
check_bin ('scp');

$SIG{TERM} = $SIG{QUIT} = $SIG{PIPE} = $SIG{HUP} = $SIG{KILL} = $SIG{INT} =
    sub {
	die "Signal aborting sync\n";
    };

sub check_bin {
    my ($bin)  = @_;

    foreach my $p (split (/:/, $ENV{PATH})) {
	my $fn = "$p/$bin";
	if (-x $fn) {
	    return $fn;
	}
    }

    die "unable to find command '$bin'\n";
}

sub cut_target_width {
    my ($path, $maxlen) = @_;
    $path =~ s@/+@/@g;

    return $path if length($path) <= $maxlen;

    return '..'.substr($path, -$maxlen+2) if $path !~ m@/@;

    $path =~ s@/([^/]+/?)$@@;
    my $tail = $1;

    if (length($tail)+3 == $maxlen) {
	return "../$tail";
    } elsif (length($tail)+2 >= $maxlen) {
	return '..'.substr($tail, -$maxlen+2)
    }

    $path =~ s@(/[^/]+)(?:/|$)@@;
    my $head = $1;
    my $both = length($head) + length($tail);
    my $remaining = $maxlen-$both-4; # -4 for "/../"

    if ($remaining < 0) {
	return substr($head, 0, $maxlen - length($tail) - 3) . "../$tail"; # -3 for "../"
    }

    substr($path, ($remaining/2), (length($path)-$remaining), '..');
    return "$head/" . $path . "/$tail";
}

sub lock {
    my ($fh) = @_;
    flock($fh, LOCK_EX) || die "Can't lock config - $!\n";
}

sub unlock {
    my ($fh) = @_;
    flock($fh, LOCK_UN) || die "Can't unlock config- $!\n";
}

sub get_status {
    my ($source, $name, $status) = @_;

    if ($status->{$source->{all}}->{$name}->{status}) {
	return $status;
    }

    return undef;
}

sub check_pool_exists {
    my ($target) = @_;

    my $cmd = [];

    if ($target->{ip}) {
	push @$cmd, 'ssh', "root\@$target->{ip}", '--';
    }
    push @$cmd, 'zfs', 'list', '-H', '--', $target->{all};
    eval {
	run_cmd($cmd);
    };

    if ($@) {
	return 0;
    }
    return 1;
}

sub parse_target {
    my ($text) = @_;

    my $errstr = "$text : is not a valid input! Use [IP:]<VMID> or [IP:]<ZFSPool>[/Path]";
    my $target = {};

    if ($text !~ $TARGETRE) {
	die "$errstr\n";
    }
    $target->{all} = $2;
    $target->{ip} = $1 if $1;
    my @parts = split('/', $2);

    $target->{ip} =~ s/^\[(.*)\]$/$1/ if $target->{ip};

    my $pool = $target->{pool} = shift(@parts);
    die "$errstr\n" if !$pool;

    if ($pool =~ m/^\d+$/) {
	$target->{vmid} = $pool;
	delete $target->{pool};
    }

    return $target if (@parts == 0);
    $target->{last_part} = pop(@parts);

    if ($target->{ip}) {
	pop(@parts);
    }
    if (@parts > 0) {
	$target->{path} = join('/', @parts);
    }

    return $target;
}

sub read_cron {

    #This is for the first use to init file;
    if (!-e $CRONJOBS) {
	my $new_fh = IO::File->new("> $CRONJOBS");
	die "Could not create $CRONJOBS: $!\n" if !$new_fh;
	close($new_fh);
	return undef;
    }

    my $fh = IO::File->new("< $CRONJOBS");
    die "Could not open file $CRONJOBS: $!\n" if !$fh;

    my @text = <$fh>;

    close($fh);

    return encode_cron(@text);
}

sub parse_argv {
    my (@arg) = @_;

    my $param = {};
    $param->{dest} = undef;
    $param->{source} = undef;
    $param->{verbose} = undef;
    $param->{limit} = undef;
    $param->{maxsnap} = undef;
    $param->{name} = undef;
    $param->{skip} = undef;
    $param->{method} = undef;

    my ($ret, $ar) = GetOptionsFromArray(\@arg,
					 'dest=s' => \$param->{dest},
					 'source=s' => \$param->{source},
					 'verbose' => \$param->{verbose},
					 'limit=i' => \$param->{limit},
					 'maxsnap=i' => \$param->{maxsnap},
					 'name=s' => \$param->{name},
					 'skip' => \$param->{skip},
					 'method=s' => \$param->{method});

    if ($ret == 0) {
	die "can't parse options\n";
    }

    $param->{name} = "default" if !$param->{name};
    $param->{maxsnap} = 1 if !$param->{maxsnap};
    $param->{method} = "ssh" if !$param->{method};

    return $param;
}

sub add_state_to_job {
    my ($job) = @_;

    my $states = read_state();
    my $state = $states->{$job->{source}}->{$job->{name}};

    $job->{state} = $state->{state};
    $job->{lsync} = $state->{lsync};
    $job->{vm_type} = $state->{vm_type};

    for (my $i = 0; $state->{"snap$i"}; $i++) {
	$job->{"snap$i"} = $state->{"snap$i"};
    }

    return $job;
}

sub encode_cron {
    my (@text) = @_;

    my $cfg = {};

    while (my $line = shift(@text)) {

	my @arg = split('\s', $line);
	my $param = parse_argv(@arg);

	if ($param->{source} && $param->{dest}) {
	    $cfg->{$param->{source}}->{$param->{name}}->{dest} = $param->{dest};
	    $cfg->{$param->{source}}->{$param->{name}}->{verbose} = $param->{verbose};
	    $cfg->{$param->{source}}->{$param->{name}}->{limit} = $param->{limit};
	    $cfg->{$param->{source}}->{$param->{name}}->{maxsnap} = $param->{maxsnap};
	    $cfg->{$param->{source}}->{$param->{name}}->{skip} = $param->{skip};
	    $cfg->{$param->{source}}->{$param->{name}}->{method} = $param->{method};
	}
    }

    return $cfg;
}

sub param_to_job {
    my ($param) = @_;

    my $job = {};

    my $source = parse_target($param->{source});
    my $dest = parse_target($param->{dest}) if $param->{dest};

    $job->{name} = !$param->{name} ? "default" : $param->{name};
    $job->{dest} = $param->{dest} if $param->{dest};
    $job->{method} = "local" if !$dest->{ip} && !$source->{ip};
    $job->{method} = "ssh" if !$job->{method};
    $job->{limit} = $param->{limit};
    $job->{maxsnap} = $param->{maxsnap} if $param->{maxsnap};
    $job->{source} = $param->{source};

    return $job;
}

sub read_state {

    if (!-e $STATE) {
	make_path $CONFIG_PATH;
	my $new_fh = IO::File->new("> $STATE");
	die "Could not create $STATE: $!\n" if !$new_fh;
	print $new_fh "{}";
	close($new_fh);
	return undef;
    }

    my $fh = IO::File->new("< $STATE");
    die "Could not open file $STATE: $!\n" if !$fh;

    my $text = <$fh>;
    my $states = decode_json($text);

    close($fh);

    return $states;
}

sub update_state {
    my ($job) = @_;
    my $text;
    my $in_fh;

    eval {

	$in_fh = IO::File->new("< $STATE");
	die "Could not open file $STATE: $!\n" if !$in_fh;
	lock($in_fh);
	$text = <$in_fh>;
    };

    my $out_fh = IO::File->new("> $STATE.new");
    die "Could not open file ${STATE}.new: $!\n" if !$out_fh;

    my $states = {};
    my $state = {};
    if ($text){
	$states = decode_json($text);
	$state = $states->{$job->{source}}->{$job->{name}};
    }

    if ($job->{state} ne "del") {
	$state->{state} = $job->{state};
	$state->{lsync} = $job->{lsync};
	$state->{vm_type} = $job->{vm_type};

	for (my $i = 0; $job->{"snap$i"} ; $i++) {
	    $state->{"snap$i"} = $job->{"snap$i"};
	}
	$states->{$job->{source}}->{$job->{name}} = $state;
    } else {

	delete $states->{$job->{source}}->{$job->{name}};
	delete $states->{$job->{source}} if !keys %{$states->{$job->{source}}};
    }

    $text = encode_json($states);
    print $out_fh $text;

    close($out_fh);
    move("$STATE.new", $STATE);
    eval {
	close($in_fh);
    };

    return $states;
}

sub update_cron {
    my ($job) = @_;

    my $updated;
    my $has_header;
    my $line_no = 0;
    my $text = "";
    my $header = "SHELL=/bin/sh\n";
    $header .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n\n";

    my $fh = IO::File->new("< $CRONJOBS");
    die "Could not open file $CRONJOBS: $!\n" if !$fh;
    lock($fh);

    my @test = <$fh>;

    while (my $line = shift(@test)) {
	chomp($line);
	if ($line =~ m/source $job->{source} .*name $job->{name} /) {
	    $updated = 1;
	    next if $job->{state} eq "del";
	    $text .= format_job($job, $line);
	} else {
	    if (($line_no < 3) && ($line =~ /^(PATH|SHELL)/ )) {
		$has_header = 1;
	    }
	    $text .= "$line\n";
	}
	$line_no++;
    }

    if (!$has_header) {
	$text = "$header$text";
    }

    if (!$updated) {
	$text .= format_job($job);
    }
    my $new_fh = IO::File->new("> ${CRONJOBS}.new");
    die "Could not open file ${CRONJOBS}.new: $!\n" if !$new_fh;

    die "can't write to $CRONJOBS.new\n" if !print($new_fh $text);
    close ($new_fh);

    die "can't move $CRONJOBS.new: $!\n" if !move("${CRONJOBS}.new", "$CRONJOBS");
    close ($fh);
}

sub format_job {
    my ($job, $line) = @_;
    my $text = "";

    if ($job->{state} eq "stopped") {
	$text = "#";
    }
    if ($line) {
	$line =~ /^#*(.+) root/;
	$text .= $1;
    } else {
	$text .= "*/$INTERVAL * * * *";
    }
    $text .= " root";
    $text .= " $PROGNAME sync --source $job->{source} --dest $job->{dest}";
    $text .= " --name $job->{name} --maxsnap $job->{maxsnap}";
    $text .= " --limit $job->{limit}" if $job->{limit};
    $text .= " --method $job->{method}";
    $text .= " --verbose" if $job->{verbose};
    $text .= "\n";

    return $text;
}

sub list {

    my $cfg = read_cron();

    my $list = sprintf("%-25s%-25s%-10s%-20s%-6s%-5s\n" , "SOURCE", "NAME", "STATE", "LAST SYNC", "TYPE", "CON");

    my $states = read_state();
    foreach my $source (sort keys%{$cfg}) {
	foreach my $name (sort keys%{$cfg->{$source}}) {
	    $list .= sprintf("%-25s", cut_target_width($source, 25));
	    $list .= sprintf("%-25s", cut_target_width($name, 25));
	    $list .= sprintf("%-10s", $states->{$source}->{$name}->{state});
	    $list .= sprintf("%-20s", $states->{$source}->{$name}->{lsync});
	    $list .= sprintf("%-6s", defined($states->{$source}->{$name}->{vm_type}) ? $states->{$source}->{$name}->{vm_type} : "undef");
	    $list .= sprintf("%-5s\n", $cfg->{$source}->{$name}->{method});
	}
    }

    return $list;
}

sub vm_exists {
    my ($target) = @_;

    my @cmd = ('ssh', "root\@$target->{ip}", '--') if $target->{ip};

    my $res = undef;

    return undef if !defined($target->{vmid});

    eval { $res = run_cmd([@cmd, 'ls',  "$QEMU_CONF/$target->{vmid}.conf"]) };

    return "qemu" if $res;

    eval { $res = run_cmd([@cmd, 'ls',  "$LXC_CONF/$target->{vmid}.conf"]) };

    return "lxc" if $res;

    return undef;
}

sub init {
    my ($param) = @_;

    my $cfg = read_cron();

    my $job = param_to_job($param);

    $job->{state} = "ok";
    $job->{lsync} = 0;

    my $source = parse_target($param->{source});
    my $dest = parse_target($param->{dest});

    if (my $ip =  $dest->{ip}) {
	run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
    }

    if (my $ip =  $source->{ip}) {
	run_cmd(['ssh-copy-id', '-i', '/root/.ssh/id_rsa.pub', "root\@$ip"]);
    }

    die "Pool $dest->{all} does not exists\n" if !check_pool_exists($dest);

    if (!defined($source->{vmid})) {
	die "Pool $source->{all} does not exists\n" if !check_pool_exists($source);
    }

    my $vm_type = vm_exists($source);
    $job->{vm_type} = $vm_type;
    $source->{vm_type} = $vm_type;

    die "VM $source->{vmid} doesn't exist\n" if $source->{vmid} && !$vm_type;

    die "Config already exists\n" if $cfg->{$job->{source}}->{$job->{name}};

    #check if vm has zfs disks if not die;
    get_disks($source, 1) if $source->{vmid};

    update_cron($job);
    update_state($job);

    eval {
	sync($param) if !$param->{skip};
    };
    if(my $err = $@) {
	destroy_job($param);
	print $err;
    }
}

sub get_job {
    my ($param) = @_;

    my $cfg = read_cron();

    if (!$cfg->{$param->{source}}->{$param->{name}}) {
	die "Job  with source $param->{source} and name $param->{name} does not exist\n" ;
    }
    my $job = $cfg->{$param->{source}}->{$param->{name}};
    $job->{name} = $param->{name};
    $job->{source} = $param->{source};
    $job = add_state_to_job($job);

    return $job;
}

sub destroy_job {
    my ($param) = @_;

    my $job = get_job($param);
    $job->{state} = "del";

    update_cron($job);
    update_state($job);
}

sub sync {
    my ($param) = @_;

    my $lock_fh = IO::File->new("> $LOCKFILE");
    die "Can't open Lock File: $LOCKFILE $!\n" if !$lock_fh;
    lock($lock_fh);

    my $date = get_date();
    my $job;
    eval {
	$job = get_job($param);
    };

    if ($job && $job->{state} eq "syncing") {
	die "Job --source $param->{source} --name $param->{name} is syncing at the moment";
    }

    my $dest = parse_target($param->{dest});
    my $source = parse_target($param->{source});

    my $sync_path = sub {
	my ($source, $dest, $job, $param, $date) = @_;

	($source->{old_snap}, $source->{last_snap}) = snapshot_get($source, $dest, $param->{maxsnap}, $param->{name});

	snapshot_add($source, $dest, $param->{name}, $date);

	send_image($source, $dest, $param);

	snapshot_destroy($source, $dest, $param->{method}, $source->{old_snap}) if ($source->{destroy} && $source->{old_snap});

    };

    my $vm_type = vm_exists($source);
    $source->{vm_type} = $vm_type;

    if ($job) {
	$job->{state} = "syncing";
	$job->{vm_type} = $vm_type if !$job->{vm_type};
	update_state($job);
    }

    eval{
	if ($source->{vmid}) {
	    die "VM $source->{vmid} doesn't exist\n" if !$vm_type;
	    my $disks = get_disks($source);

	    foreach my $disk (sort keys %{$disks}) {
		$source->{all} = $disks->{$disk}->{all};
		$source->{pool} = $disks->{$disk}->{pool};
		$source->{path} = $disks->{$disk}->{path} if $disks->{$disk}->{path};
		$source->{last_part} = $disks->{$disk}->{last_part};
		&$sync_path($source, $dest, $job, $param, $date);
	    }
	    if ($param->{method} eq "ssh" && ($source->{ip} || $dest->{ip})) {
		send_config($source, $dest,'ssh');
	    } else {
		send_config($source, $dest,'local');
	    }
	} else {
	    &$sync_path($source, $dest, $job, $param, $date);
	}
    };
    if(my $err = $@) {
	if ($job) {
	    $job->{state} = "error";
	    update_state($job);
	    unlock($lock_fh);
	    close($lock_fh);
	    print "Job --source $param->{source} --name $param->{name} got an ERROR!!!\nERROR Message:\n";
	}
	die "$err\n";
    }

    if ($job) {
	$job->{state} = "ok";
	$job->{lsync} = $date;
	update_state($job);
    }

    unlock($lock_fh);
    close($lock_fh);
}

sub snapshot_get{
    my ($source, $dest, $max_snap, $name) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
    push @$cmd, 'zfs', 'list', '-r', '-t', 'snapshot', '-Ho', 'name', '-S', 'creation';
    push @$cmd, $source->{all};

    my $raw = run_cmd($cmd);
    my $index = 0;
    my $line = "";
    my $last_snap = undef;
    my $old_snap;

    while ($raw && $raw =~ s/^(.*?)(\n|$)//) {
	$line = $1;
	if ($line =~ m/(rep_\Q${name}\E_\d{4}-\d{2}-\d{2}_\d{2}:\d{2}:\d{2})$/) {

	    $last_snap = $1 if (!$last_snap);
	    $old_snap = $1;
	    $index++;
	    if ($index == $max_snap) {
		$source->{destroy} = 1;
		last;
	    };
	}
    }

    return ($old_snap, $last_snap) if $last_snap;

    return undef;
}

sub snapshot_add {
    my ($source, $dest, $name, $date) = @_;

    my $snap_name = "rep_$name\_".$date;

    $source->{new_snap} = $snap_name;

    my $path = "$source->{all}\@$snap_name";

    my $cmd = [];
    push @$cmd, 'ssh', "root\@$source->{ip}", '--', if $source->{ip};
    push @$cmd, 'zfs', 'snapshot', $path;
    eval{
	run_cmd($cmd);
    };

    if (my $err = $@) {
	snapshot_destroy($source, $dest, 'ssh', $snap_name);
	die "$err\n";
    }
}

sub write_cron {
    my ($cfg) = @_;

    my $text = "SHELL=/bin/sh\n";
    $text .= "PATH=/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin\n";

    my $fh = IO::File->new("> $CRONJOBS");
    die "Could not open file: $!\n" if !$fh;

    foreach my $source (sort keys%{$cfg}) {
	foreach my $sync_name (sort keys%{$cfg->{$source}}) {
	    next if $cfg->{$source}->{$sync_name}->{status} ne 'ok';
	    $text .= "$PROG_PATH sync";
	    $text .= " -source  ";
	    if ($cfg->{$source}->{$sync_name}->{vmid}) {
		$text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
		$text .= "$cfg->{$source}->{$sync_name}->{vmid} ";
	    } else {
		$text .= "$cfg->{$source}->{$sync_name}->{source_ip}:" if $cfg->{$source}->{$sync_name}->{source_ip};
		$text .= "$cfg->{$source}->{$sync_name}->{source_pool}";
		$text .= "$cfg->{$source}->{$sync_name}->{source_path}" if $cfg->{$source}->{$sync_name}->{source_path};
	    }
	    $text .= " -dest  ";
	    $text .= "$cfg->{$source}->{$sync_name}->{dest_ip}:" if $cfg->{$source}->{$sync_name}->{dest_ip};
	    $text .= "$cfg->{$source}->{$sync_name}->{dest_pool}";
	    $text .= "$cfg->{$source}->{$sync_name}->{dest_path}" if $cfg->{$source}->{$sync_name}->{dest_path};
	    $text .= " -name $sync_name ";
	    $text .= " -limit $cfg->{$source}->{$sync_name}->{limit}" if $cfg->{$source}->{$sync_name}->{limit};
	    $text .= " -maxsnap $cfg->{$source}->{$sync_name}->{maxsnap}" if $cfg->{$source}->{$sync_name}->{maxsnap};
	    $text .= "\n";
	}
    }
    die "Can't write to cron\n" if (!print($fh $text));
    close($fh);
}

sub get_disks {
    my ($target, $get_err) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "root\@$target->{ip}", '--', if $target->{ip};

    if ($target->{vm_type} eq 'qemu') {
	push @$cmd, 'qm', 'config', $target->{vmid};
    } elsif ($target->{vm_type} eq 'lxc') {
	push @$cmd, 'pct', 'config', $target->{vmid};
    } else {
	die "VM Type unknown\n";
    }

    my $res = run_cmd($cmd);

    my $disks = parse_disks($res, $target->{ip}, $target->{vm_type}, $get_err);

    return $disks;
}

sub run_cmd {
    my ($cmd) = @_;
    print "Start CMD\n" if $DEBUG;
    print Dumper $cmd if $DEBUG;
    if (ref($cmd) eq 'ARRAY') {
	$cmd = join(' ', map { ref($_) ? $$_ : shell_quote($_) } @$cmd);
    }
    my $output = `$cmd 2>&1`;

    die "COMMAND:\n\t$cmd\nGET ERROR:\n\t$output" if 0 != $?;

    chomp($output);
    print Dumper $output if $DEBUG;
    print "END CMD\n" if $DEBUG;
    return $output;
}

sub parse_disks {
    my ($text, $ip, $vm_type, $get_err) = @_;

    my $disks;

    my $num = 0;
    while ($text && $text =~ s/^(.*?)(\n|$)//) {
	my $line = $1;
	my $error = $vm_type eq 'qemu' ? 1 : 0 ;

	next if $line =~ /cdrom|none/;
	next if $line !~ m/^(?:((?:virtio|ide|scsi|sata|mp)\d+)|rootfs): /;

	#QEMU if backup is not set include in  sync
	next if $vm_type eq 'qemu && ($line =~ m/backup=(?i:0|no|off|false)/)';

	#LXC if backup is not set do no in sync
	$error = ($line =~ m/backup=(?i:1|yes|on|true)/) if $vm_type eq 'lxc';

	my $disk = undef;
	my $stor = undef;
	if($line =~ m/^(?:(?:(?:virtio|ide|scsi|sata|mp)\d+)|rootfs): (.*)$/) {
	    my @parameter = split(/,/,$1);

	    foreach my $opt (@parameter) {
		if ($opt =~ m/^(?:file=|volume=)?([^:]+:)([A-Za-z0-9\-]+)$/){
		    $disk = $2;
		    $stor = $1;
		    last;
		}
	    }

	} else {
	    print "Disk: \"$line\" will not include in pve-sync\n" if $get_err || $error;
	    next;
	}

	my $cmd = [];
	push @$cmd, 'ssh', "root\@$ip", '--' if $ip;
	push @$cmd, 'pvesm', 'path', "$stor$disk";
	my $path = run_cmd($cmd);

	die "Get no path from pvesm path $stor$disk\n" if !$path;
 
	if ($vm_type eq 'qemu' && $path =~ m/^\/dev\/zvol\/(\w+.*)(\/$disk)$/) {

	    my @array = split('/', $1);
	    $disks->{$num}->{pool} = shift(@array);
	    $disks->{$num}->{all} = $disks->{$num}->{pool};
	    if (0 < @array) {
		$disks->{$num}->{path} = join('/', @array);
		$disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
	    }
	    $disks->{$num}->{last_part} = $disk;
	    $disks->{$num}->{all} .= "\/$disk";

	    $num++;
	} elsif ($vm_type eq 'lxc' && $path =~ m/^\/(\w+.+)(\/(\w+.*))*(\/$disk)$/) {

	    $disks->{$num}->{pool} = $1;
	    $disks->{$num}->{all} = $disks->{$num}->{pool};

	    if ($2) {
		$disks->{$num}->{path} = $3;
		$disks->{$num}->{all} .= "\/$disks->{$num}->{path}";
	    }

	    $disks->{$num}->{last_part} = $disk;
	    $disks->{$num}->{all} .= "\/$disk";

	    $num++;

	} else {
	    die "ERROR: in path\n";
	}
    }

    die "Vm include no disk on zfs.\n" if !$disks->{0};
    return $disks;
}

sub snapshot_destroy {
    my ($source, $dest, $method, $snap) = @_;

    my @zfscmd = ('zfs', 'destroy');
    my $snapshot = "$source->{all}\@$snap";

    eval {
	if($source->{ip} && $method eq 'ssh'){
	    run_cmd(['ssh', "root\@$source->{ip}", '--', @zfscmd, $snapshot]);
	} else {
	    run_cmd([@zfscmd, $snapshot]);
	}
    };
    if (my $erro = $@) {
	warn "WARN: $erro";
    }
    if ($dest) {
	my @ssh = $dest->{ip} ? ('ssh', "root\@$dest->{ip}", '--') : ();

	my $path = "$dest->{all}\/$source->{last_part}";

	eval {
	    run_cmd([@ssh, @zfscmd, "$path\@$snap"]);
	};
	if (my $erro = $@) {
	    warn "WARN: $erro";
	}
    }
}

sub snapshot_exist {
    my ($source , $dest, $method) = @_;

    my $cmd = [];
    push @$cmd, 'ssh', "root\@$dest->{ip}", '--' if $dest->{ip};
    push @$cmd, 'zfs', 'list', '-rt', 'snapshot', '-Ho', 'name';
    push @$cmd, "$dest->{all}/$source->{last_part}\@$source->{old_snap}";

    my $text = "";
    eval {$text =run_cmd($cmd);};
    if (my $erro =$@) {
	warn "WARN: $erro";
	return undef;
    }

    while ($text && $text =~ s/^(.*?)(\n|$)//) {
	my $line =$1;
	return 1 if $line =~ m/^.*$source->{old_snap}$/;
    }
}

sub send_image {
    my ($source, $dest, $param) = @_;

    my $cmd = [];

    push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$source->{ip}", '--' if $source->{ip};
    push @$cmd, 'zfs', 'send';
    push @$cmd, '-v' if $param->{verbose};

    if($source->{last_snap} && snapshot_exist($source , $dest, $param->{method})) {
	push @$cmd, '-i', "$source->{all}\@$source->{last_snap}";
    }
    push @$cmd, '--', "$source->{all}\@$source->{new_snap}";

    if ($param->{limit}){
	my $bwl = $param->{limit}*1024;
	push @$cmd, \'|', 'cstream', '-t', $bwl;
    }
    my $target = "$dest->{all}/$source->{last_part}";
    $target =~ s!/+!/!g;

    push @$cmd, \'|';
	push @$cmd, 'ssh', '-o', 'BatchMode=yes', "root\@$dest->{ip}", '--' if $dest->{ip};
	push @$cmd, 'zfs', 'recv', '-F', '--';
	push @$cmd, "$target";

	eval {
	    run_cmd($cmd)
	};

	if (my $erro = $@) {
	    snapshot_destroy($source, undef, $param->{method}, $source->{new_snap});
	    die $erro;
	};
    }


    sub send_config{
	my ($source, $dest, $method) = @_;

	my $source_target = $source->{vm_type} eq 'qemu' ? "$QEMU_CONF/$source->{vmid}.conf": "$LXC_CONF/$source->{vmid}.conf";
	my $dest_target_new ="$source->{vmid}.conf.$source->{vm_type}.$source->{new_snap}";

	my $config_dir = $dest->{last_part} ?  "${CONFIG_PATH}/$dest->{last_part}" : $CONFIG_PATH;

	$dest_target_new = $config_dir.'/'.$dest_target_new;

	if ($method eq 'ssh'){
	    if ($dest->{ip} && $source->{ip}) {
		run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
		run_cmd(['scp', '--', "root\@[$source->{ip}]:$source_target", "root\@[$dest->{ip}]:$dest_target_new"]);
	    } elsif ($dest->{ip}) {
		run_cmd(['ssh', "root\@$dest->{ip}", '--', 'mkdir', '-p', '--', $config_dir]);
		run_cmd(['scp', '--', $source_target, "root\@[$dest->{ip}]:$dest_target_new"]);
	    } elsif ($source->{ip}) {
		run_cmd(['mkdir', '-p', '--', $config_dir]);
		run_cmd(['scp', '--', "root\@$source->{ip}:$source_target", $dest_target_new]);
	    }

	    if ($source->{destroy}){
		my $dest_target_old ="${config_dir}/$source->{vmid}.conf.$source->{vm_type}.$source->{old_snap}";
		if($dest->{ip}){
		    run_cmd(['ssh', "root\@$dest->{ip}", '--', 'rm', '-f', '--', $dest_target_old]);
		} else {
		    run_cmd(['rm', '-f', '--', $dest_target_old]);
		}
	    }
	} elsif ($method eq 'local') {
	    run_cmd(['mkdir', '-p', '--', $config_dir]);
	    run_cmd(['cp', $source_target, $dest_target_new]);
	}
    }

    sub get_date {
	my ($sec, $min, $hour, $mday, $mon, $year, $wday, $yday, $isdst) = localtime(time);
	my $datestamp = sprintf ("%04d-%02d-%02d_%02d:%02d:%02d", $year+1900, $mon+1, $mday, $hour, $min, $sec);

	return $datestamp;
    }

    sub status {
	my $cfg = read_cron();

	my $status_list = sprintf("%-25s%-25s%-10s\n", "SOURCE", "NAME", "STATUS");

	my $states = read_state();

	foreach my $source (sort keys%{$cfg}) {
	    foreach my $sync_name (sort keys%{$cfg->{$source}}) {
		$status_list .= sprintf("%-25s", cut_target_width($source, 25));
		$status_list .= sprintf("%-25s", cut_target_width($sync_name, 25));
		$status_list .= "$states->{$source}->{$sync_name}->{state}\n";
	    }
	}

	return $status_list;
    }

    sub enable_job {
	my ($param) = @_;

	my $job = get_job($param);
	$job->{state} = "ok";
	update_state($job);
	update_cron($job);
    }

    sub disable_job {
	my ($param) = @_;

	my $job = get_job($param);
	$job->{state} = "stopped";
	update_state($job);
	update_cron($job);
    }

    my $command = $ARGV[0];

    my $commands = {'destroy' => 1,
		    'create' => 1,
		    'sync' => 1,
		    'list' => 1,
		    'status' => 1,
		    'help' => 1,
		    'enable' => 1,
		    'disable' => 1};

    if (!$command || !$commands->{$command}) {
	usage();
	die "\n";
    }

    my $help_sync = "$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n
\twill sync one time\n
\t-dest\tstring\n
\t\tthe destination target is like [IP:]<Pool>[/Path]\n
\t-limit\tinteger\n
\t\tmax sync speed in kBytes/s, default unlimited\n
\t-maxsnap\tinteger\n
\t\thow much snapshots will be kept before get erased, default 1/n
\t-name\tstring\n
\t\tname of the sync job, if not set it is default.
\tIt is only necessary if scheduler allready contains this source.\n
\t-source\tstring\n
\t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";

    my $help_create = "$PROGNAME create -dest <string> -source <string> [OPTIONS]/n
\tCreate a sync Job\n
\t-dest\tstring\n
\t\tthe destination target is like [IP]:<Pool>[/Path]\n
\t-limit\tinteger\n
\t\tmax sync speed in kBytes/s, default unlimited\n
\t-maxsnap\tstring\n
\t\thow much snapshots will be kept before get erased, default 1\n
\t-name\tstring\n
\t\tname of the sync job, if not set it is default\n
\t-skip\tboolean\n
\t\tif this flag is set it will skip the first sync\n
\t-source\tstring\n
\t\tthe source can be an <VMID> or [IP:]<ZFSPool>[/Path]\n";

    my $help_destroy = "$PROGNAME destroy -source <string> [OPTIONS]\n
\tremove a sync Job from the scheduler\n
\t-name\tstring\n
\t\tname of the sync job, if not set it is default\n
\t-source\tstring\n
\t\tthe source can be an  <VMID> or [IP:]<ZFSPool>[/Path]\n";

    my $help_help = "$PROGNAME help <cmd> [OPTIONS]\n
\tGet help about specified command.\n
\t<cmd>\tstring\n
\t\tCommand name\n
\t-verbose\tboolean\n
\t\tVerbose output format.\n";

    my $help_list = "$PROGNAME list\n
\tGet a List of all scheduled Sync Jobs\n";

    my $help_status = "$PROGNAME status\n
\tGet the status of all scheduled Sync Jobs\n";

    my $help_enable = "$PROGNAME enable -source <string> [OPTIONS]\n
\tenable a syncjob and reset error\n
\t-name\tstring\n
\t\tname of the sync job, if not set it is default\n
\t-source\tstring\n
\t\tthe source can be an  <VMID> or [IP:]<ZFSPool>[/Path]\n";

    my $help_disable = "$PROGNAME disable -source <string> [OPTIONS]\n
\tpause a syncjob\n
\t-name\tstring\n
\t\tname of the sync job, if not set it is default\n
\t-source\tstring\n
\t\tthe source can be an  <VMID> or [IP:]<ZFSPool>[/Path]\n";

    sub help {
	my ($command) = @_;

	if ($command eq 'help') {
	    die "$help_help\n";

	} elsif ($command eq 'sync') {
	    die "$help_sync\n";

	} elsif ($command eq 'destroy') {
	    die "$help_destroy\n";

	} elsif ($command eq 'create') {
	    die "$help_create\n";

	} elsif ($command eq 'list') {
	    die "$help_list\n";

	} elsif ($command eq 'status') {
	    die "$help_status\n";

	} elsif ($command eq 'enable') {
	    die "$help_enable\n";

	} elsif ($command eq 'disable') {
	    die "$help_disable\n";

	}

    }

    my @arg = @ARGV;
    my $param = parse_argv(@arg);

    if ($command eq 'destroy') {
	die "$help_destroy\n" if !$param->{source};

	check_target($param->{source});
	destroy_job($param);

    } elsif ($command eq 'sync') {
	die "$help_sync\n" if !$param->{source} || !$param->{dest};

	check_target($param->{source});
	check_target($param->{dest});
	sync($param);

    } elsif ($command eq 'create') {
	die "$help_create\n" if !$param->{source} || !$param->{dest};

	check_target($param->{source});
	check_target($param->{dest});
	init($param);

    } elsif ($command eq 'status') {
	print status();

    } elsif ($command eq 'list') {
	print list();

    } elsif ($command eq 'help') {
	my $help_command = $ARGV[1];

	if ($help_command && $commands->{$help_command}) {
	    print help($help_command);

	}
	if ($param->{verbose} == 1){
	    exec("man $PROGNAME");

	} else {
	    usage(1);

	}

    } elsif ($command eq 'enable') {
	die "$help_enable\n" if !$param->{source};

	check_target($param->{source});
	enable_job($param);

    } elsif ($command eq 'disable') {
	die "$help_disable\n" if !$param->{source};

	check_target($param->{source});
	disable_job($param);

    }

    sub usage {
	my ($help) = @_;

	print("ERROR:\tno command specified\n") if !$help;
	print("USAGE:\t$PROGNAME <COMMAND> [ARGS] [OPTIONS]\n");
	print("\t$PROGNAME help [<cmd>] [OPTIONS]\n\n");
	print("\t$PROGNAME create -dest <string> -source <string> [OPTIONS]\n");
	print("\t$PROGNAME destroy -source <string> [OPTIONS]\n");
	print("\t$PROGNAME disable -source <string> [OPTIONS]\n");
	print("\t$PROGNAME enable -source <string> [OPTIONS]\n");
	print("\t$PROGNAME list\n");
	print("\t$PROGNAME status\n");
	print("\t$PROGNAME sync -dest <string> -source <string> [OPTIONS]\n");
    }

    sub check_target {
	my ($target) = @_;
	parse_target($target);
    }

__END__

=head1 NAME

pve-zsync - PVE ZFS Replication Manager

=head1 SYNOPSIS

pve-zsync <COMMAND> [ARGS] [OPTIONS]

pve-zsync help <cmd> [OPTIONS]

	Get help about specified command.

        <cmd>      string

		Command name

	-verbose   boolean

		Verbose output format.

pve-zsync create -dest <string> -source <string> [OPTIONS]

        Create a sync Job

        -dest      string

		the destination target is like [IP]:<Pool>[/Path]

        -limit     integer

		max sync speed in kBytes/s, default unlimited

        -maxsnap   string

		how much snapshots will be kept before get erased, default 1

        -name      string

		name of the sync job, if not set it is default

        -skip      boolean

		if this flag is set it will skip the first sync

        -source    string

		the source can be an <VMID> or [IP:]<ZFSPool>[/Path]

pve-zsync destroy -source <string> [OPTIONS]

        remove a sync Job from the scheduler

        -name      string

		name of the sync job, if not set it is default

        -source    string

                the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]

pve-zsync disable -source <string> [OPTIONS]

        pause a sync job

        -name      string

		name of the sync job, if not set it is default

        -source    string

                the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]

pve-zsync enable -source <string> [OPTIONS]

        enable a syncjob and reset error

        -name      string

		name of the sync job, if not set it is default

        -source    string

                the source can be an  <VMID> or [IP:]<ZFSPool>[/Path]
pve-zsync list

	Get a List of all scheduled Sync Jobs

pve-zsync status

	Get the status of all scheduled Sync Jobs

pve-zsync sync -dest <string> -source <string> [OPTIONS]

	will sync one time

        -dest      string

		the destination target is like [IP:]<Pool>[/Path]

	-limit     integer

		max sync speed in kBytes/s, default unlimited

        -maxsnap   integer

		how much snapshots will be kept before get erased, default 1

        -name      string

		name of the sync job, if not set it is default.
		It is only necessary if scheduler allready contains this source.

        -source    string

		the source can be an <VMID> or [IP:]<ZFSPool>[/Path]

=head1 DESCRIPTION

This Tool helps you to sync your VM or directory which stored on ZFS between 2 servers.
This tool also has the capability to add jobs to cron so the sync will be automatically done.
The default syncing interval is set to 15 min, if you want to change this value you can do this in /etc/cron.d/pve-zsync.
To config cron see man crontab.

=head2 PVE ZFS Storage sync Tool

This Tool can get remote pool on other PVE or send Pool to others ZFS machines

=head1 EXAMPLES

add sync job from local VM to remote ZFS Server
pve-zsync create -source=100 -dest=192.168.1.2:zfspool

=head1 IMPORTANT FILES

Cron jobs and config are stored at                      /etc/cron.d/pve-zsync

The VM config get copied on the destination machine to  /var/lib/pve-zsync/

=head1 COPYRIGHT AND DISCLAIMER

Copyright (C) 2007-2015 Proxmox Server Solutions GmbH

This program is free software: you can redistribute it and/or modify it
under the terms of the GNU Affero General Public License as published
by the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Affero General Public License for more details.

You should have received a copy of the GNU Affero General Public
License along with this program.  If not, see
<http://www.gnu.org/licenses/>.
